sc_domains/domain_block_er/
receipt_receiver.rs

1//! This module provides features for domains integration: snap sync synchronization primitives,
2//! custom protocols for last confirmed block execution receipts, etc...
3
4#![warn(missing_docs)]
5
6use crate::domain_block_er::execution_receipt_protocol::{
7    DomainBlockERRequest, DomainBlockERResponse, generate_protocol_name,
8};
9use domain_runtime_primitives::Balance;
10use futures::channel::oneshot;
11use parity_scale_codec::{Decode, Encode};
12use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
13use sc_network_sync::SyncingService;
14use sp_blockchain::HeaderBackend;
15use sp_core::{Hasher, KeccakHasher};
16use sp_domains::{DomainId, ExecutionReceiptFor};
17use sp_runtime::traits::{Block as BlockT, Header};
18use std::collections::BTreeMap;
19use std::marker::PhantomData;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::time::sleep;
23use tracing::{debug, error, trace};
24
25const REQUEST_PAUSE: Duration = Duration::from_secs(15);
26const ATTEMPTS_NUMBER: u32 = 20;
27const PEERS_THRESHOLD: usize = 20;
28
29/// Last confirmed domain block info error.
30#[derive(Debug, thiserror::Error)]
31pub enum DomainBlockERResponseError {
32    /// Last confirmed domain block info request failed.
33    #[error("Last confirmed domain block info request failed: {0}")]
34    RequestFailed(#[from] RequestFailure),
35
36    /// Last confirmed domain block info request canceled.
37    #[error("Last confirmed domain block info request canceled")]
38    RequestCanceled,
39
40    /// "Last confirmed domain block info request failed: invalid protocol.
41    #[error("Last confirmed domain block info request failed: invalid protocol")]
42    InvalidProtocol,
43
44    /// Failed to decode response.
45    #[error("Failed to decode response: {0}")]
46    DecodeFailed(parity_scale_codec::Error),
47}
48
49/// Provides execution receipts for domain block.
50pub struct DomainBlockERReceiver<Block, CBlock, CClient, NR>
51where
52    Block: BlockT,
53    CBlock: BlockT,
54    NR: NetworkRequest,
55    CClient: HeaderBackend<CBlock>,
56{
57    domain_id: DomainId,
58    fork_id: Option<String>,
59    consensus_client: Arc<CClient>,
60    network_service: NR,
61    sync_service: Arc<SyncingService<Block>>,
62    _marker: PhantomData<CBlock>,
63}
64
65impl<Block, CBlock, CClient, NR> DomainBlockERReceiver<Block, CBlock, CClient, NR>
66where
67    CBlock: BlockT,
68    Block: BlockT,
69    NR: NetworkRequest,
70    CClient: HeaderBackend<CBlock>,
71{
72    /// Constructor.
73    pub fn new(
74        domain_id: DomainId,
75        fork_id: Option<String>,
76        client: Arc<CClient>,
77        network_service: NR,
78        sync_service: Arc<SyncingService<Block>>,
79    ) -> Self {
80        Self {
81            domain_id,
82            fork_id,
83            consensus_client: client,
84            network_service,
85            sync_service,
86            _marker: PhantomData,
87        }
88    }
89
90    /// Returns execution receipts for the last confirmed domain block.
91    pub async fn get_last_confirmed_domain_block_receipt(
92        &self,
93    ) -> Option<ExecutionReceiptFor<Block::Header, CBlock, Balance>> {
94        let info = self.consensus_client.info();
95        let protocol_name = generate_protocol_name(info.genesis_hash, self.fork_id.as_deref());
96        // Used to debug failures
97        let mut peers_not_synced = 0;
98        let mut peer_request_errors = 0;
99        let mut peer_responses = 0;
100
101        debug!(
102            domain_id=%self.domain_id,
103            %protocol_name,
104            "Started obtaining last confirmed domain block ER..."
105        );
106
107        let mut receipts = BTreeMap::new();
108        let mut receipts_hashes = BTreeMap::new();
109        let mut peers_hashes = BTreeMap::new();
110
111        for attempt in 0..ATTEMPTS_NUMBER {
112            let peers_info = match self.sync_service.peers_info().await {
113                Ok(peers_info) => peers_info,
114                Err(error) => {
115                    error!("Peers info request returned an error: {error}",);
116                    sleep(REQUEST_PAUSE).await;
117
118                    continue;
119                }
120            };
121
122            //  Enumerate peers until we find a suitable source for domain info
123            'peers: for (peer_id, peer_info) in peers_info.iter() {
124                debug!(
125                    %peers_not_synced,
126                    %peer_request_errors,
127                    %peer_responses,
128                    "Domain block ER request. peer = {peer_id}, info = {:?}",
129                    peer_info
130                );
131
132                if peers_hashes.contains_key(peer_id) {
133                    trace!( %attempt, %peer_id, "Peer receipt has been already collected.");
134                    continue 'peers;
135                }
136
137                if !peer_info.is_synced {
138                    peers_not_synced += 1;
139                    trace!(
140                        %attempt,
141                        %peer_id,
142                        %peers_not_synced,
143                        %peer_request_errors,
144                        %peer_responses,
145                        "Domain data request skipped (not synced).",
146                    );
147                    continue 'peers;
148                }
149
150                let request = DomainBlockERRequest::LastConfirmedER(self.domain_id);
151                let response = send_request::<NR, CBlock, Block::Header>(
152                    protocol_name.clone(),
153                    *peer_id,
154                    request,
155                    &self.network_service,
156                )
157                .await;
158
159                match response {
160                    Ok(response) => {
161                        let DomainBlockERResponse::LastConfirmedER(receipt) = response;
162                        peer_responses += 1;
163                        trace!(
164                            %attempt,
165                            %peers_not_synced,
166                            %peer_request_errors,
167                            %peer_responses,
168                            "Response from a peer {peer_id}: {receipt:?}",
169                        );
170
171                        let receipt_hash = KeccakHasher::hash(&receipt.encode());
172                        peers_hashes.insert(*peer_id, receipt_hash);
173                        receipts.insert(receipt_hash, receipt);
174                        receipts_hashes
175                            .entry(receipt_hash)
176                            .and_modify(|count: &mut u32| *count += 1)
177                            .or_insert(1u32);
178                    }
179                    Err(error) => {
180                        peer_request_errors += 1;
181                        debug!(
182                            %attempt,
183                            %peers_not_synced,
184                            %peer_request_errors,
185                            %peer_responses,
186                            "Domain block ER request failed. peer = {peer_id}: {error}",
187                        );
188                        continue 'peers;
189                    }
190                }
191            }
192
193            if peers_hashes.len() >= PEERS_THRESHOLD {
194                break;
195            }
196
197            debug!(
198                domain_id=%self.domain_id,
199                %peers_not_synced,
200                %peer_request_errors,
201                %peer_responses,
202                "No synced peers to handle the domain confirmed block info request. Pausing..."
203            );
204            sleep(REQUEST_PAUSE).await;
205        }
206
207        if peers_hashes.len() < PEERS_THRESHOLD {
208            debug!(
209                peers=%peers_hashes.len(),
210                %PEERS_THRESHOLD,
211                %peers_not_synced,
212                %peer_request_errors,
213                %peer_responses,
214                "Couldn't pass peer threshold for receipts, trying snap sync anyway.",
215            );
216        }
217
218        // Find the receipt with the maximum votes
219        if let Some((max_voted_receipt_hash, _max_receipt_votes)) = receipts_hashes
220            .into_iter()
221            .max_by_key(|(_receipt_hash, receipt_votes)| *receipt_votes)
222        {
223            // We're about to drop receipts, so removing the receipt saves a clone.
224            // This is always Some, because every receipt has a hash and a vote.
225            return receipts.remove(&max_voted_receipt_hash);
226        }
227
228        error!(
229            %peers_not_synced,
230            %peer_request_errors,
231            %peer_responses,
232            "Couldn't find last confirmed domain block execution receipt: no receipts.",
233        );
234        None
235    }
236}
237
238async fn send_request<NR: NetworkRequest, Block: BlockT, DomainHeader: Header>(
239    protocol_name: String,
240    peer_id: PeerId,
241    request: DomainBlockERRequest,
242    network_service: &NR,
243) -> Result<DomainBlockERResponse<Block, DomainHeader>, DomainBlockERResponseError> {
244    let (tx, rx) = oneshot::channel();
245    debug!("Sending request: {request:?}  (peer={peer_id})");
246
247    let encoded_request = request.encode();
248    network_service.start_request(
249        peer_id,
250        protocol_name.clone().into(),
251        encoded_request,
252        None,
253        tx,
254        IfDisconnected::ImmediateError,
255    );
256
257    let result = rx
258        .await
259        .map_err(|_| DomainBlockERResponseError::RequestCanceled)?;
260
261    match result {
262        Ok((data, response_protocol_name)) => {
263            if response_protocol_name != protocol_name.into() {
264                return Err(DomainBlockERResponseError::InvalidProtocol);
265            }
266
267            let response = DomainBlockERResponse::decode(&mut data.as_slice())
268                .map_err(DomainBlockERResponseError::DecodeFailed)?;
269
270            Ok(response)
271        }
272        Err(error) => Err(error.into()),
273    }
274}