sc_domains/domain_block_er/
receipt_receiver.rs

1//! This module provides features for domains integration: snap sync synchronization primitives,
2//! custom protocols for last confirmed block execution receipts, etc...
3
4#![warn(missing_docs)]
5
6use crate::domain_block_er::execution_receipt_protocol::{
7    generate_protocol_name, DomainBlockERRequest, DomainBlockERResponse, LOG_TARGET,
8};
9use domain_runtime_primitives::Balance;
10use futures::channel::oneshot;
11use parity_scale_codec::{Decode, Encode};
12use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
13use sc_network_sync::SyncingService;
14use sp_blockchain::HeaderBackend;
15use sp_core::{Hasher, KeccakHasher};
16use sp_domains::{DomainId, ExecutionReceiptFor};
17use sp_runtime::traits::{Block as BlockT, Header};
18use std::collections::BTreeMap;
19use std::marker::PhantomData;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::time::sleep;
23use tracing::{debug, error, trace};
24
25const REQUEST_PAUSE: Duration = Duration::from_secs(15);
26const ATTEMPTS_NUMBER: u32 = 20;
27const PEERS_THRESHOLD: usize = 20;
28
29/// Last confirmed domain block info error.
30#[derive(Debug, thiserror::Error)]
31pub enum DomainBlockERResponseError {
32    /// Last confirmed domain block info request failed.
33    #[error("Last confirmed domain block info request failed: {0}")]
34    RequestFailed(#[from] RequestFailure),
35
36    /// Last confirmed domain block info request canceled.
37    #[error("Last confirmed domain block info request canceled")]
38    RequestCanceled,
39
40    /// "Last confirmed domain block info request failed: invalid protocol.
41    #[error("Last confirmed domain block info request failed: invalid protocol")]
42    InvalidProtocol,
43
44    /// Failed to decode response.
45    #[error("Failed to decode response: {0}")]
46    DecodeFailed(parity_scale_codec::Error),
47}
48
49/// Provides execution receipts for domain block.
50pub struct DomainBlockERReceiver<Block, CBlock, CClient, NR>
51where
52    Block: BlockT,
53    CBlock: BlockT,
54    NR: NetworkRequest,
55    CClient: HeaderBackend<CBlock>,
56{
57    domain_id: DomainId,
58    fork_id: Option<String>,
59    consensus_client: Arc<CClient>,
60    network_service: NR,
61    sync_service: Arc<SyncingService<Block>>,
62    _marker: PhantomData<CBlock>,
63}
64
65impl<Block, CBlock, CClient, NR> DomainBlockERReceiver<Block, CBlock, CClient, NR>
66where
67    CBlock: BlockT,
68    Block: BlockT,
69    NR: NetworkRequest,
70    CClient: HeaderBackend<CBlock>,
71{
72    /// Constructor.
73    pub fn new(
74        domain_id: DomainId,
75        fork_id: Option<String>,
76        client: Arc<CClient>,
77        network_service: NR,
78        sync_service: Arc<SyncingService<Block>>,
79    ) -> Self {
80        Self {
81            domain_id,
82            fork_id,
83            consensus_client: client,
84            network_service,
85            sync_service,
86            _marker: PhantomData,
87        }
88    }
89
90    /// Returns execution receipts for the last confirmed domain block.
91    pub async fn get_last_confirmed_domain_block_receipt(
92        &self,
93    ) -> Option<ExecutionReceiptFor<Block::Header, CBlock, Balance>> {
94        let info = self.consensus_client.info();
95        let protocol_name = generate_protocol_name(info.genesis_hash, self.fork_id.as_deref());
96        // Used to debug failures
97        let mut peers_not_synced = 0;
98        let mut peer_request_errors = 0;
99        let mut peer_responses = 0;
100
101        debug!(
102            target: LOG_TARGET,
103            domain_id=%self.domain_id,
104            %protocol_name,
105            "Started obtaining last confirmed domain block ER..."
106        );
107
108        let mut receipts = BTreeMap::new();
109        let mut receipts_hashes = BTreeMap::new();
110        let mut peers_hashes = BTreeMap::new();
111
112        for attempt in 0..ATTEMPTS_NUMBER {
113            let peers_info = match self.sync_service.peers_info().await {
114                Ok(peers_info) => peers_info,
115                Err(error) => {
116                    error!(target: LOG_TARGET, "Peers info request returned an error: {error}",);
117                    sleep(REQUEST_PAUSE).await;
118
119                    continue;
120                }
121            };
122
123            //  Enumerate peers until we find a suitable source for domain info
124            'peers: for (peer_id, peer_info) in peers_info.iter() {
125                debug!(
126                    target: LOG_TARGET,
127                    %peers_not_synced,
128                    %peer_request_errors,
129                    %peer_responses,
130                    "Domain block ER request. peer = {peer_id}, info = {:?}",
131                    peer_info
132                );
133
134                if peers_hashes.contains_key(peer_id) {
135                    trace!(target: LOG_TARGET, %attempt, %peer_id, "Peer receipt has been already collected.");
136                    continue 'peers;
137                }
138
139                if !peer_info.is_synced {
140                    peers_not_synced += 1;
141                    trace!(
142                        target: LOG_TARGET,
143                        %attempt,
144                        %peer_id,
145                        %peers_not_synced,
146                        %peer_request_errors,
147                        %peer_responses,
148                        "Domain data request skipped (not synced).",
149                    );
150                    continue 'peers;
151                }
152
153                let request = DomainBlockERRequest::LastConfirmedER(self.domain_id);
154                let response = send_request::<NR, CBlock, Block::Header>(
155                    protocol_name.clone(),
156                    *peer_id,
157                    request,
158                    &self.network_service,
159                )
160                .await;
161
162                match response {
163                    Ok(response) => {
164                        let DomainBlockERResponse::LastConfirmedER(receipt) = response;
165                        peer_responses += 1;
166                        trace!(
167                            target: LOG_TARGET,
168                            %attempt,
169                            %peers_not_synced,
170                            %peer_request_errors,
171                            %peer_responses,
172                            "Response from a peer {peer_id}: {receipt:?}",
173                        );
174
175                        let receipt_hash = KeccakHasher::hash(&receipt.encode());
176                        peers_hashes.insert(*peer_id, receipt_hash);
177                        receipts.insert(receipt_hash, receipt);
178                        receipts_hashes
179                            .entry(receipt_hash)
180                            .and_modify(|count: &mut u32| *count += 1)
181                            .or_insert(1u32);
182                    }
183                    Err(error) => {
184                        peer_request_errors += 1;
185                        debug!(
186                            target: LOG_TARGET,
187                            %attempt,
188                            %peers_not_synced,
189                            %peer_request_errors,
190                            %peer_responses,
191                            "Domain block ER request failed. peer = {peer_id}: {error}",
192                        );
193                        continue 'peers;
194                    }
195                }
196            }
197
198            if peers_hashes.len() >= PEERS_THRESHOLD {
199                break;
200            }
201
202            debug!(
203                target: LOG_TARGET,
204                domain_id=%self.domain_id,
205                %peers_not_synced,
206                %peer_request_errors,
207                %peer_responses,
208                "No synced peers to handle the domain confirmed block info request. Pausing..."
209            );
210            sleep(REQUEST_PAUSE).await;
211        }
212
213        if peers_hashes.len() < PEERS_THRESHOLD {
214            debug!(
215                target: LOG_TARGET,
216                peers=%peers_hashes.len(),
217                %PEERS_THRESHOLD,
218                %peers_not_synced,
219                %peer_request_errors,
220                %peer_responses,
221                "Couldn't pass peer threshold for receipts, trying snap sync anyway.",
222            );
223        }
224
225        // Find the receipt with the maximum votes
226        if let Some((max_voted_receipt_hash, _max_receipt_votes)) = receipts_hashes
227            .into_iter()
228            .max_by_key(|(_receipt_hash, receipt_votes)| *receipt_votes)
229        {
230            // We're about to drop receipts, so removing the receipt saves a clone.
231            // This is always Some, because every receipt has a hash and a vote.
232            return receipts.remove(&max_voted_receipt_hash);
233        }
234
235        error!(
236            target: LOG_TARGET,
237            %peers_not_synced,
238            %peer_request_errors,
239            %peer_responses,
240            "Couldn't find last confirmed domain block execution receipt: no receipts.",
241        );
242        None
243    }
244}
245
246async fn send_request<NR: NetworkRequest, Block: BlockT, DomainHeader: Header>(
247    protocol_name: String,
248    peer_id: PeerId,
249    request: DomainBlockERRequest,
250    network_service: &NR,
251) -> Result<DomainBlockERResponse<Block, DomainHeader>, DomainBlockERResponseError> {
252    let (tx, rx) = oneshot::channel();
253    debug!(target: LOG_TARGET, "Sending request: {request:?}  (peer={peer_id})");
254
255    let encoded_request = request.encode();
256    network_service.start_request(
257        peer_id,
258        protocol_name.clone().into(),
259        encoded_request,
260        None,
261        tx,
262        IfDisconnected::ImmediateError,
263    );
264
265    let result = rx
266        .await
267        .map_err(|_| DomainBlockERResponseError::RequestCanceled)?;
268
269    match result {
270        Ok((data, response_protocol_name)) => {
271            if response_protocol_name != protocol_name.into() {
272                return Err(DomainBlockERResponseError::InvalidProtocol);
273            }
274
275            let response = DomainBlockERResponse::decode(&mut data.as_slice())
276                .map_err(DomainBlockERResponseError::DecodeFailed)?;
277
278            Ok(response)
279        }
280        Err(error) => Err(error.into()),
281    }
282}