sc_domains/domain_block_er/
receipt_receiver.rs1#![warn(missing_docs)]
5
6use crate::domain_block_er::execution_receipt_protocol::{
7 DomainBlockERRequest, DomainBlockERResponse, generate_protocol_name,
8};
9use domain_runtime_primitives::Balance;
10use futures::channel::oneshot;
11use parity_scale_codec::{Decode, Encode};
12use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
13use sc_network_sync::SyncingService;
14use sp_blockchain::HeaderBackend;
15use sp_core::{Hasher, KeccakHasher};
16use sp_domains::{DomainId, ExecutionReceiptFor};
17use sp_runtime::traits::{Block as BlockT, Header};
18use std::collections::BTreeMap;
19use std::marker::PhantomData;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::time::sleep;
23use tracing::{debug, error, trace};
24
25const REQUEST_PAUSE: Duration = Duration::from_secs(15);
26const ATTEMPTS_NUMBER: u32 = 20;
27const PEERS_THRESHOLD: usize = 20;
28
29#[derive(Debug, thiserror::Error)]
31pub enum DomainBlockERResponseError {
32 #[error("Last confirmed domain block info request failed: {0}")]
34 RequestFailed(#[from] RequestFailure),
35
36 #[error("Last confirmed domain block info request canceled")]
38 RequestCanceled,
39
40 #[error("Last confirmed domain block info request failed: invalid protocol")]
42 InvalidProtocol,
43
44 #[error("Failed to decode response: {0}")]
46 DecodeFailed(parity_scale_codec::Error),
47}
48
49pub struct DomainBlockERReceiver<Block, CBlock, CClient, NR>
51where
52 Block: BlockT,
53 CBlock: BlockT,
54 NR: NetworkRequest,
55 CClient: HeaderBackend<CBlock>,
56{
57 domain_id: DomainId,
58 fork_id: Option<String>,
59 consensus_client: Arc<CClient>,
60 network_service: NR,
61 sync_service: Arc<SyncingService<Block>>,
62 _marker: PhantomData<CBlock>,
63}
64
65impl<Block, CBlock, CClient, NR> DomainBlockERReceiver<Block, CBlock, CClient, NR>
66where
67 CBlock: BlockT,
68 Block: BlockT,
69 NR: NetworkRequest,
70 CClient: HeaderBackend<CBlock>,
71{
72 pub fn new(
74 domain_id: DomainId,
75 fork_id: Option<String>,
76 client: Arc<CClient>,
77 network_service: NR,
78 sync_service: Arc<SyncingService<Block>>,
79 ) -> Self {
80 Self {
81 domain_id,
82 fork_id,
83 consensus_client: client,
84 network_service,
85 sync_service,
86 _marker: PhantomData,
87 }
88 }
89
90 pub async fn get_last_confirmed_domain_block_receipt(
92 &self,
93 ) -> Option<ExecutionReceiptFor<Block::Header, CBlock, Balance>> {
94 let info = self.consensus_client.info();
95 let protocol_name = generate_protocol_name(info.genesis_hash, self.fork_id.as_deref());
96 let mut peers_not_synced = 0;
98 let mut peer_request_errors = 0;
99 let mut peer_responses = 0;
100
101 debug!(
102 domain_id=%self.domain_id,
103 %protocol_name,
104 "Started obtaining last confirmed domain block ER..."
105 );
106
107 let mut receipts = BTreeMap::new();
108 let mut receipts_hashes = BTreeMap::new();
109 let mut peers_hashes = BTreeMap::new();
110
111 for attempt in 0..ATTEMPTS_NUMBER {
112 let peers_info = match self.sync_service.peers_info().await {
113 Ok(peers_info) => peers_info,
114 Err(error) => {
115 error!("Peers info request returned an error: {error}",);
116 sleep(REQUEST_PAUSE).await;
117
118 continue;
119 }
120 };
121
122 'peers: for (peer_id, peer_info) in peers_info.iter() {
124 debug!(
125 %peers_not_synced,
126 %peer_request_errors,
127 %peer_responses,
128 "Domain block ER request. peer = {peer_id}, info = {:?}",
129 peer_info
130 );
131
132 if peers_hashes.contains_key(peer_id) {
133 trace!( %attempt, %peer_id, "Peer receipt has been already collected.");
134 continue 'peers;
135 }
136
137 if !peer_info.is_synced {
138 peers_not_synced += 1;
139 trace!(
140 %attempt,
141 %peer_id,
142 %peers_not_synced,
143 %peer_request_errors,
144 %peer_responses,
145 "Domain data request skipped (not synced).",
146 );
147 continue 'peers;
148 }
149
150 let request = DomainBlockERRequest::LastConfirmedER(self.domain_id);
151 let response = send_request::<NR, CBlock, Block::Header>(
152 protocol_name.clone(),
153 *peer_id,
154 request,
155 &self.network_service,
156 )
157 .await;
158
159 match response {
160 Ok(response) => {
161 let DomainBlockERResponse::LastConfirmedER(receipt) = response;
162 peer_responses += 1;
163 trace!(
164 %attempt,
165 %peers_not_synced,
166 %peer_request_errors,
167 %peer_responses,
168 "Response from a peer {peer_id}: {receipt:?}",
169 );
170
171 let receipt_hash = KeccakHasher::hash(&receipt.encode());
172 peers_hashes.insert(*peer_id, receipt_hash);
173 receipts.insert(receipt_hash, receipt);
174 receipts_hashes
175 .entry(receipt_hash)
176 .and_modify(|count: &mut u32| *count += 1)
177 .or_insert(1u32);
178 }
179 Err(error) => {
180 peer_request_errors += 1;
181 debug!(
182 %attempt,
183 %peers_not_synced,
184 %peer_request_errors,
185 %peer_responses,
186 "Domain block ER request failed. peer = {peer_id}: {error}",
187 );
188 continue 'peers;
189 }
190 }
191 }
192
193 if peers_hashes.len() >= PEERS_THRESHOLD {
194 break;
195 }
196
197 debug!(
198 domain_id=%self.domain_id,
199 %peers_not_synced,
200 %peer_request_errors,
201 %peer_responses,
202 "No synced peers to handle the domain confirmed block info request. Pausing..."
203 );
204 sleep(REQUEST_PAUSE).await;
205 }
206
207 if peers_hashes.len() < PEERS_THRESHOLD {
208 debug!(
209 peers=%peers_hashes.len(),
210 %PEERS_THRESHOLD,
211 %peers_not_synced,
212 %peer_request_errors,
213 %peer_responses,
214 "Couldn't pass peer threshold for receipts, trying snap sync anyway.",
215 );
216 }
217
218 if let Some((max_voted_receipt_hash, _max_receipt_votes)) = receipts_hashes
220 .into_iter()
221 .max_by_key(|(_receipt_hash, receipt_votes)| *receipt_votes)
222 {
223 return receipts.remove(&max_voted_receipt_hash);
226 }
227
228 error!(
229 %peers_not_synced,
230 %peer_request_errors,
231 %peer_responses,
232 "Couldn't find last confirmed domain block execution receipt: no receipts.",
233 );
234 None
235 }
236}
237
238async fn send_request<NR: NetworkRequest, Block: BlockT, DomainHeader: Header>(
239 protocol_name: String,
240 peer_id: PeerId,
241 request: DomainBlockERRequest,
242 network_service: &NR,
243) -> Result<DomainBlockERResponse<Block, DomainHeader>, DomainBlockERResponseError> {
244 let (tx, rx) = oneshot::channel();
245 debug!("Sending request: {request:?} (peer={peer_id})");
246
247 let encoded_request = request.encode();
248 network_service.start_request(
249 peer_id,
250 protocol_name.clone().into(),
251 encoded_request,
252 None,
253 tx,
254 IfDisconnected::ImmediateError,
255 );
256
257 let result = rx
258 .await
259 .map_err(|_| DomainBlockERResponseError::RequestCanceled)?;
260
261 match result {
262 Ok((data, response_protocol_name)) => {
263 if response_protocol_name != protocol_name.into() {
264 return Err(DomainBlockERResponseError::InvalidProtocol);
265 }
266
267 let response = DomainBlockERResponse::decode(&mut data.as_slice())
268 .map_err(DomainBlockERResponseError::DecodeFailed)?;
269
270 Ok(response)
271 }
272 Err(error) => Err(error.into()),
273 }
274}