sc_domains/domain_block_er/
receipt_receiver.rs1#![warn(missing_docs)]
5
6use crate::domain_block_er::execution_receipt_protocol::{
7 generate_protocol_name, DomainBlockERRequest, DomainBlockERResponse, LOG_TARGET,
8};
9use domain_runtime_primitives::Balance;
10use futures::channel::oneshot;
11use parity_scale_codec::{Decode, Encode};
12use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
13use sc_network_sync::SyncingService;
14use sp_blockchain::HeaderBackend;
15use sp_core::{Hasher, KeccakHasher};
16use sp_domains::{DomainId, ExecutionReceiptFor};
17use sp_runtime::traits::{Block as BlockT, Header};
18use std::collections::BTreeMap;
19use std::marker::PhantomData;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::time::sleep;
23use tracing::{debug, error, trace};
24
25const REQUEST_PAUSE: Duration = Duration::from_secs(15);
26const ATTEMPTS_NUMBER: u32 = 20;
27const PEERS_THRESHOLD: usize = 20;
28
29#[derive(Debug, thiserror::Error)]
31pub enum DomainBlockERResponseError {
32 #[error("Last confirmed domain block info request failed: {0}")]
34 RequestFailed(#[from] RequestFailure),
35
36 #[error("Last confirmed domain block info request canceled")]
38 RequestCanceled,
39
40 #[error("Last confirmed domain block info request failed: invalid protocol")]
42 InvalidProtocol,
43
44 #[error("Failed to decode response: {0}")]
46 DecodeFailed(parity_scale_codec::Error),
47}
48
49pub struct DomainBlockERReceiver<Block, CBlock, CClient, NR>
51where
52 Block: BlockT,
53 CBlock: BlockT,
54 NR: NetworkRequest,
55 CClient: HeaderBackend<CBlock>,
56{
57 domain_id: DomainId,
58 fork_id: Option<String>,
59 consensus_client: Arc<CClient>,
60 network_service: NR,
61 sync_service: Arc<SyncingService<Block>>,
62 _marker: PhantomData<CBlock>,
63}
64
65impl<Block, CBlock, CClient, NR> DomainBlockERReceiver<Block, CBlock, CClient, NR>
66where
67 CBlock: BlockT,
68 Block: BlockT,
69 NR: NetworkRequest,
70 CClient: HeaderBackend<CBlock>,
71{
72 pub fn new(
74 domain_id: DomainId,
75 fork_id: Option<String>,
76 client: Arc<CClient>,
77 network_service: NR,
78 sync_service: Arc<SyncingService<Block>>,
79 ) -> Self {
80 Self {
81 domain_id,
82 fork_id,
83 consensus_client: client,
84 network_service,
85 sync_service,
86 _marker: PhantomData,
87 }
88 }
89
90 pub async fn get_last_confirmed_domain_block_receipt(
92 &self,
93 ) -> Option<ExecutionReceiptFor<Block::Header, CBlock, Balance>> {
94 let info = self.consensus_client.info();
95 let protocol_name = generate_protocol_name(info.genesis_hash, self.fork_id.as_deref());
96 let mut peers_not_synced = 0;
98 let mut peer_request_errors = 0;
99 let mut peer_responses = 0;
100
101 debug!(
102 target: LOG_TARGET,
103 domain_id=%self.domain_id,
104 %protocol_name,
105 "Started obtaining last confirmed domain block ER..."
106 );
107
108 let mut receipts = BTreeMap::new();
109 let mut receipts_hashes = BTreeMap::new();
110 let mut peers_hashes = BTreeMap::new();
111
112 for attempt in 0..ATTEMPTS_NUMBER {
113 let peers_info = match self.sync_service.peers_info().await {
114 Ok(peers_info) => peers_info,
115 Err(error) => {
116 error!(target: LOG_TARGET, "Peers info request returned an error: {error}",);
117 sleep(REQUEST_PAUSE).await;
118
119 continue;
120 }
121 };
122
123 'peers: for (peer_id, peer_info) in peers_info.iter() {
125 debug!(
126 target: LOG_TARGET,
127 %peers_not_synced,
128 %peer_request_errors,
129 %peer_responses,
130 "Domain block ER request. peer = {peer_id}, info = {:?}",
131 peer_info
132 );
133
134 if peers_hashes.contains_key(peer_id) {
135 trace!(target: LOG_TARGET, %attempt, %peer_id, "Peer receipt has been already collected.");
136 continue 'peers;
137 }
138
139 if !peer_info.is_synced {
140 peers_not_synced += 1;
141 trace!(
142 target: LOG_TARGET,
143 %attempt,
144 %peer_id,
145 %peers_not_synced,
146 %peer_request_errors,
147 %peer_responses,
148 "Domain data request skipped (not synced).",
149 );
150 continue 'peers;
151 }
152
153 let request = DomainBlockERRequest::LastConfirmedER(self.domain_id);
154 let response = send_request::<NR, CBlock, Block::Header>(
155 protocol_name.clone(),
156 *peer_id,
157 request,
158 &self.network_service,
159 )
160 .await;
161
162 match response {
163 Ok(response) => {
164 let DomainBlockERResponse::LastConfirmedER(receipt) = response;
165 peer_responses += 1;
166 trace!(
167 target: LOG_TARGET,
168 %attempt,
169 %peers_not_synced,
170 %peer_request_errors,
171 %peer_responses,
172 "Response from a peer {peer_id}: {receipt:?}",
173 );
174
175 let receipt_hash = KeccakHasher::hash(&receipt.encode());
176 peers_hashes.insert(*peer_id, receipt_hash);
177 receipts.insert(receipt_hash, receipt);
178 receipts_hashes
179 .entry(receipt_hash)
180 .and_modify(|count: &mut u32| *count += 1)
181 .or_insert(1u32);
182 }
183 Err(error) => {
184 peer_request_errors += 1;
185 debug!(
186 target: LOG_TARGET,
187 %attempt,
188 %peers_not_synced,
189 %peer_request_errors,
190 %peer_responses,
191 "Domain block ER request failed. peer = {peer_id}: {error}",
192 );
193 continue 'peers;
194 }
195 }
196 }
197
198 if peers_hashes.len() >= PEERS_THRESHOLD {
199 break;
200 }
201
202 debug!(
203 target: LOG_TARGET,
204 domain_id=%self.domain_id,
205 %peers_not_synced,
206 %peer_request_errors,
207 %peer_responses,
208 "No synced peers to handle the domain confirmed block info request. Pausing..."
209 );
210 sleep(REQUEST_PAUSE).await;
211 }
212
213 if peers_hashes.len() < PEERS_THRESHOLD {
214 debug!(
215 target: LOG_TARGET,
216 peers=%peers_hashes.len(),
217 %PEERS_THRESHOLD,
218 %peers_not_synced,
219 %peer_request_errors,
220 %peer_responses,
221 "Couldn't pass peer threshold for receipts, trying snap sync anyway.",
222 );
223 }
224
225 if let Some((max_voted_receipt_hash, _max_receipt_votes)) = receipts_hashes
227 .into_iter()
228 .max_by_key(|(_receipt_hash, receipt_votes)| *receipt_votes)
229 {
230 return receipts.remove(&max_voted_receipt_hash);
233 }
234
235 error!(
236 target: LOG_TARGET,
237 %peers_not_synced,
238 %peer_request_errors,
239 %peer_responses,
240 "Couldn't find last confirmed domain block execution receipt: no receipts.",
241 );
242 None
243 }
244}
245
246async fn send_request<NR: NetworkRequest, Block: BlockT, DomainHeader: Header>(
247 protocol_name: String,
248 peer_id: PeerId,
249 request: DomainBlockERRequest,
250 network_service: &NR,
251) -> Result<DomainBlockERResponse<Block, DomainHeader>, DomainBlockERResponseError> {
252 let (tx, rx) = oneshot::channel();
253 debug!(target: LOG_TARGET, "Sending request: {request:?} (peer={peer_id})");
254
255 let encoded_request = request.encode();
256 network_service.start_request(
257 peer_id,
258 protocol_name.clone().into(),
259 encoded_request,
260 None,
261 tx,
262 IfDisconnected::ImmediateError,
263 );
264
265 let result = rx
266 .await
267 .map_err(|_| DomainBlockERResponseError::RequestCanceled)?;
268
269 match result {
270 Ok((data, response_protocol_name)) => {
271 if response_protocol_name != protocol_name.into() {
272 return Err(DomainBlockERResponseError::InvalidProtocol);
273 }
274
275 let response = DomainBlockERResponse::decode(&mut data.as_slice())
276 .map_err(DomainBlockERResponseError::DecodeFailed)?;
277
278 Ok(response)
279 }
280 Err(error) => Err(error.into()),
281 }
282}