sc_domains/domain_block_er/
receipt_receiver.rs

1//! This module provides features for domains integration: snap sync synchronization primitives,
2//! custom protocols for last confirmed block execution receipts, etc...
3
4#![warn(missing_docs)]
5
6use crate::domain_block_er::execution_receipt_protocol::{
7    DomainBlockERRequest, DomainBlockERResponse, generate_protocol_name,
8};
9use domain_runtime_primitives::Balance;
10use futures::channel::oneshot;
11use parity_scale_codec::{Decode, Encode};
12use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
13use sc_network_sync::SyncingService;
14use sp_blockchain::HeaderBackend;
15use sp_core::{Hasher, KeccakHasher};
16use sp_domains::DomainId;
17use sp_domains::execution_receipt::ExecutionReceiptFor;
18use sp_runtime::traits::{Block as BlockT, Header};
19use std::collections::BTreeMap;
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::time::sleep;
24use tracing::{debug, error, trace};
25
26const REQUEST_PAUSE: Duration = Duration::from_secs(15);
27const ATTEMPTS_NUMBER: u32 = 20;
28const PEERS_THRESHOLD: usize = 20;
29
30/// Last confirmed domain block info error.
31#[derive(Debug, thiserror::Error)]
32pub enum DomainBlockERResponseError {
33    /// Last confirmed domain block info request failed.
34    #[error("Last confirmed domain block info request failed: {0}")]
35    RequestFailed(#[from] RequestFailure),
36
37    /// Last confirmed domain block info request canceled.
38    #[error("Last confirmed domain block info request canceled")]
39    RequestCanceled,
40
41    /// "Last confirmed domain block info request failed: invalid protocol.
42    #[error("Last confirmed domain block info request failed: invalid protocol")]
43    InvalidProtocol,
44
45    /// Failed to decode response.
46    #[error("Failed to decode response: {0}")]
47    DecodeFailed(parity_scale_codec::Error),
48}
49
50/// Provides execution receipts for domain block.
51pub struct DomainBlockERReceiver<Block, CBlock, CClient, NR>
52where
53    Block: BlockT,
54    CBlock: BlockT,
55    NR: NetworkRequest,
56    CClient: HeaderBackend<CBlock>,
57{
58    domain_id: DomainId,
59    fork_id: Option<String>,
60    consensus_client: Arc<CClient>,
61    network_service: NR,
62    sync_service: Arc<SyncingService<Block>>,
63    _marker: PhantomData<CBlock>,
64}
65
66impl<Block, CBlock, CClient, NR> DomainBlockERReceiver<Block, CBlock, CClient, NR>
67where
68    CBlock: BlockT,
69    Block: BlockT,
70    NR: NetworkRequest,
71    CClient: HeaderBackend<CBlock>,
72{
73    /// Constructor.
74    pub fn new(
75        domain_id: DomainId,
76        fork_id: Option<String>,
77        client: Arc<CClient>,
78        network_service: NR,
79        sync_service: Arc<SyncingService<Block>>,
80    ) -> Self {
81        Self {
82            domain_id,
83            fork_id,
84            consensus_client: client,
85            network_service,
86            sync_service,
87            _marker: PhantomData,
88        }
89    }
90
91    /// Returns execution receipts for the last confirmed domain block.
92    pub async fn get_last_confirmed_domain_block_receipt(
93        &self,
94    ) -> Option<ExecutionReceiptFor<Block::Header, CBlock, Balance>> {
95        let info = self.consensus_client.info();
96        let protocol_name = generate_protocol_name(info.genesis_hash, self.fork_id.as_deref());
97        // Used to debug failures
98        let mut peers_not_synced = 0;
99        let mut peer_request_errors = 0;
100        let mut peer_responses = 0;
101
102        debug!(
103            domain_id=%self.domain_id,
104            %protocol_name,
105            "Started obtaining last confirmed domain block ER..."
106        );
107
108        let mut receipts = BTreeMap::new();
109        let mut receipts_hashes = BTreeMap::new();
110        let mut peers_hashes = BTreeMap::new();
111
112        for attempt in 0..ATTEMPTS_NUMBER {
113            let peers_info = match self.sync_service.peers_info().await {
114                Ok(peers_info) => peers_info,
115                Err(error) => {
116                    error!("Peers info request returned an error: {error}",);
117                    sleep(REQUEST_PAUSE).await;
118
119                    continue;
120                }
121            };
122
123            //  Enumerate peers until we find a suitable source for domain info
124            'peers: for (peer_id, peer_info) in peers_info.iter() {
125                debug!(
126                    %peers_not_synced,
127                    %peer_request_errors,
128                    %peer_responses,
129                    "Domain block ER request. peer = {peer_id}, info = {:?}",
130                    peer_info
131                );
132
133                if peers_hashes.contains_key(peer_id) {
134                    trace!( %attempt, %peer_id, "Peer receipt has been already collected.");
135                    continue 'peers;
136                }
137
138                if !peer_info.is_synced {
139                    peers_not_synced += 1;
140                    trace!(
141                        %attempt,
142                        %peer_id,
143                        %peers_not_synced,
144                        %peer_request_errors,
145                        %peer_responses,
146                        "Domain data request skipped (not synced).",
147                    );
148                    continue 'peers;
149                }
150
151                let request = DomainBlockERRequest::LastConfirmedER(self.domain_id);
152                let response = send_request::<NR, CBlock, Block::Header>(
153                    protocol_name.clone(),
154                    *peer_id,
155                    request,
156                    &self.network_service,
157                )
158                .await;
159
160                match response {
161                    Ok(response) => {
162                        let DomainBlockERResponse::LastConfirmedER(receipt) = response;
163                        peer_responses += 1;
164                        trace!(
165                            %attempt,
166                            %peers_not_synced,
167                            %peer_request_errors,
168                            %peer_responses,
169                            "Response from a peer {peer_id}: {receipt:?}",
170                        );
171
172                        let receipt_hash = KeccakHasher::hash(&receipt.encode());
173                        peers_hashes.insert(*peer_id, receipt_hash);
174                        receipts.insert(receipt_hash, receipt);
175                        receipts_hashes
176                            .entry(receipt_hash)
177                            .and_modify(|count: &mut u32| *count += 1)
178                            .or_insert(1u32);
179                    }
180                    Err(error) => {
181                        peer_request_errors += 1;
182                        debug!(
183                            %attempt,
184                            %peers_not_synced,
185                            %peer_request_errors,
186                            %peer_responses,
187                            "Domain block ER request failed. peer = {peer_id}: {error}",
188                        );
189                        continue 'peers;
190                    }
191                }
192            }
193
194            if peers_hashes.len() >= PEERS_THRESHOLD {
195                break;
196            }
197
198            debug!(
199                domain_id=%self.domain_id,
200                %peers_not_synced,
201                %peer_request_errors,
202                %peer_responses,
203                "No synced peers to handle the domain confirmed block info request. Pausing..."
204            );
205            sleep(REQUEST_PAUSE).await;
206        }
207
208        if peers_hashes.len() < PEERS_THRESHOLD {
209            debug!(
210                peers=%peers_hashes.len(),
211                %PEERS_THRESHOLD,
212                %peers_not_synced,
213                %peer_request_errors,
214                %peer_responses,
215                "Couldn't pass peer threshold for receipts, trying snap sync anyway.",
216            );
217        }
218
219        // Find the receipt with the maximum votes
220        if let Some((max_voted_receipt_hash, _max_receipt_votes)) = receipts_hashes
221            .into_iter()
222            .max_by_key(|(_receipt_hash, receipt_votes)| *receipt_votes)
223        {
224            // We're about to drop receipts, so removing the receipt saves a clone.
225            // This is always Some, because every receipt has a hash and a vote.
226            return receipts.remove(&max_voted_receipt_hash);
227        }
228
229        error!(
230            %peers_not_synced,
231            %peer_request_errors,
232            %peer_responses,
233            "Couldn't find last confirmed domain block execution receipt: no receipts.",
234        );
235        None
236    }
237}
238
239async fn send_request<NR: NetworkRequest, Block: BlockT, DomainHeader: Header>(
240    protocol_name: String,
241    peer_id: PeerId,
242    request: DomainBlockERRequest,
243    network_service: &NR,
244) -> Result<DomainBlockERResponse<Block, DomainHeader>, DomainBlockERResponseError> {
245    let (tx, rx) = oneshot::channel();
246    debug!("Sending request: {request:?}  (peer={peer_id})");
247
248    let encoded_request = request.encode();
249    network_service.start_request(
250        peer_id,
251        protocol_name.clone().into(),
252        encoded_request,
253        None,
254        tx,
255        IfDisconnected::ImmediateError,
256    );
257
258    let result = rx
259        .await
260        .map_err(|_| DomainBlockERResponseError::RequestCanceled)?;
261
262    match result {
263        Ok((data, response_protocol_name)) => {
264            if response_protocol_name != protocol_name.into() {
265                return Err(DomainBlockERResponseError::InvalidProtocol);
266            }
267
268            let response = DomainBlockERResponse::decode(&mut data.as_slice())
269                .map_err(DomainBlockERResponseError::DecodeFailed)?;
270
271            Ok(response)
272        }
273        Err(error) => Err(error.into()),
274    }
275}