sc_domains/domain_block_er/
receipt_receiver.rs1#![warn(missing_docs)]
5
6use crate::domain_block_er::execution_receipt_protocol::{
7 DomainBlockERRequest, DomainBlockERResponse, generate_protocol_name,
8};
9use domain_runtime_primitives::Balance;
10use futures::channel::oneshot;
11use parity_scale_codec::{Decode, Encode};
12use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
13use sc_network_sync::SyncingService;
14use sp_blockchain::HeaderBackend;
15use sp_core::{Hasher, KeccakHasher};
16use sp_domains::DomainId;
17use sp_domains::execution_receipt::ExecutionReceiptFor;
18use sp_runtime::traits::{Block as BlockT, Header};
19use std::collections::BTreeMap;
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::time::sleep;
24use tracing::{debug, error, trace};
25
26const REQUEST_PAUSE: Duration = Duration::from_secs(15);
27const ATTEMPTS_NUMBER: u32 = 20;
28const PEERS_THRESHOLD: usize = 20;
29
30#[derive(Debug, thiserror::Error)]
32pub enum DomainBlockERResponseError {
33 #[error("Last confirmed domain block info request failed: {0}")]
35 RequestFailed(#[from] RequestFailure),
36
37 #[error("Last confirmed domain block info request canceled")]
39 RequestCanceled,
40
41 #[error("Last confirmed domain block info request failed: invalid protocol")]
43 InvalidProtocol,
44
45 #[error("Failed to decode response: {0}")]
47 DecodeFailed(parity_scale_codec::Error),
48}
49
50pub struct DomainBlockERReceiver<Block, CBlock, CClient, NR>
52where
53 Block: BlockT,
54 CBlock: BlockT,
55 NR: NetworkRequest,
56 CClient: HeaderBackend<CBlock>,
57{
58 domain_id: DomainId,
59 fork_id: Option<String>,
60 consensus_client: Arc<CClient>,
61 network_service: NR,
62 sync_service: Arc<SyncingService<Block>>,
63 _marker: PhantomData<CBlock>,
64}
65
66impl<Block, CBlock, CClient, NR> DomainBlockERReceiver<Block, CBlock, CClient, NR>
67where
68 CBlock: BlockT,
69 Block: BlockT,
70 NR: NetworkRequest,
71 CClient: HeaderBackend<CBlock>,
72{
73 pub fn new(
75 domain_id: DomainId,
76 fork_id: Option<String>,
77 client: Arc<CClient>,
78 network_service: NR,
79 sync_service: Arc<SyncingService<Block>>,
80 ) -> Self {
81 Self {
82 domain_id,
83 fork_id,
84 consensus_client: client,
85 network_service,
86 sync_service,
87 _marker: PhantomData,
88 }
89 }
90
91 pub async fn get_last_confirmed_domain_block_receipt(
93 &self,
94 ) -> Option<ExecutionReceiptFor<Block::Header, CBlock, Balance>> {
95 let info = self.consensus_client.info();
96 let protocol_name = generate_protocol_name(info.genesis_hash, self.fork_id.as_deref());
97 let mut peers_not_synced = 0;
99 let mut peer_request_errors = 0;
100 let mut peer_responses = 0;
101
102 debug!(
103 domain_id=%self.domain_id,
104 %protocol_name,
105 "Started obtaining last confirmed domain block ER..."
106 );
107
108 let mut receipts = BTreeMap::new();
109 let mut receipts_hashes = BTreeMap::new();
110 let mut peers_hashes = BTreeMap::new();
111
112 for attempt in 0..ATTEMPTS_NUMBER {
113 let peers_info = match self.sync_service.peers_info().await {
114 Ok(peers_info) => peers_info,
115 Err(error) => {
116 error!("Peers info request returned an error: {error}",);
117 sleep(REQUEST_PAUSE).await;
118
119 continue;
120 }
121 };
122
123 'peers: for (peer_id, peer_info) in peers_info.iter() {
125 debug!(
126 %peers_not_synced,
127 %peer_request_errors,
128 %peer_responses,
129 "Domain block ER request. peer = {peer_id}, info = {:?}",
130 peer_info
131 );
132
133 if peers_hashes.contains_key(peer_id) {
134 trace!( %attempt, %peer_id, "Peer receipt has been already collected.");
135 continue 'peers;
136 }
137
138 if !peer_info.is_synced {
139 peers_not_synced += 1;
140 trace!(
141 %attempt,
142 %peer_id,
143 %peers_not_synced,
144 %peer_request_errors,
145 %peer_responses,
146 "Domain data request skipped (not synced).",
147 );
148 continue 'peers;
149 }
150
151 let request = DomainBlockERRequest::LastConfirmedER(self.domain_id);
152 let response = send_request::<NR, CBlock, Block::Header>(
153 protocol_name.clone(),
154 *peer_id,
155 request,
156 &self.network_service,
157 )
158 .await;
159
160 match response {
161 Ok(response) => {
162 let DomainBlockERResponse::LastConfirmedER(receipt) = response;
163 peer_responses += 1;
164 trace!(
165 %attempt,
166 %peers_not_synced,
167 %peer_request_errors,
168 %peer_responses,
169 "Response from a peer {peer_id}: {receipt:?}",
170 );
171
172 let receipt_hash = KeccakHasher::hash(&receipt.encode());
173 peers_hashes.insert(*peer_id, receipt_hash);
174 receipts.insert(receipt_hash, receipt);
175 receipts_hashes
176 .entry(receipt_hash)
177 .and_modify(|count: &mut u32| *count += 1)
178 .or_insert(1u32);
179 }
180 Err(error) => {
181 peer_request_errors += 1;
182 debug!(
183 %attempt,
184 %peers_not_synced,
185 %peer_request_errors,
186 %peer_responses,
187 "Domain block ER request failed. peer = {peer_id}: {error}",
188 );
189 continue 'peers;
190 }
191 }
192 }
193
194 if peers_hashes.len() >= PEERS_THRESHOLD {
195 break;
196 }
197
198 debug!(
199 domain_id=%self.domain_id,
200 %peers_not_synced,
201 %peer_request_errors,
202 %peer_responses,
203 "No synced peers to handle the domain confirmed block info request. Pausing..."
204 );
205 sleep(REQUEST_PAUSE).await;
206 }
207
208 if peers_hashes.len() < PEERS_THRESHOLD {
209 debug!(
210 peers=%peers_hashes.len(),
211 %PEERS_THRESHOLD,
212 %peers_not_synced,
213 %peer_request_errors,
214 %peer_responses,
215 "Couldn't pass peer threshold for receipts, trying snap sync anyway.",
216 );
217 }
218
219 if let Some((max_voted_receipt_hash, _max_receipt_votes)) = receipts_hashes
221 .into_iter()
222 .max_by_key(|(_receipt_hash, receipt_votes)| *receipt_votes)
223 {
224 return receipts.remove(&max_voted_receipt_hash);
227 }
228
229 error!(
230 %peers_not_synced,
231 %peer_request_errors,
232 %peer_responses,
233 "Couldn't find last confirmed domain block execution receipt: no receipts.",
234 );
235 None
236 }
237}
238
239async fn send_request<NR: NetworkRequest, Block: BlockT, DomainHeader: Header>(
240 protocol_name: String,
241 peer_id: PeerId,
242 request: DomainBlockERRequest,
243 network_service: &NR,
244) -> Result<DomainBlockERResponse<Block, DomainHeader>, DomainBlockERResponseError> {
245 let (tx, rx) = oneshot::channel();
246 debug!("Sending request: {request:?} (peer={peer_id})");
247
248 let encoded_request = request.encode();
249 network_service.start_request(
250 peer_id,
251 protocol_name.clone().into(),
252 encoded_request,
253 None,
254 tx,
255 IfDisconnected::ImmediateError,
256 );
257
258 let result = rx
259 .await
260 .map_err(|_| DomainBlockERResponseError::RequestCanceled)?;
261
262 match result {
263 Ok((data, response_protocol_name)) => {
264 if response_protocol_name != protocol_name.into() {
265 return Err(DomainBlockERResponseError::InvalidProtocol);
266 }
267
268 let response = DomainBlockERResponse::decode(&mut data.as_slice())
269 .map_err(DomainBlockERResponseError::DecodeFailed)?;
270
271 Ok(response)
272 }
273 Err(error) => Err(error.into()),
274 }
275}