domain_client_operator/
snap_sync.rs

1use domain_runtime_primitives::{Balance, BlockNumber};
2use futures::channel::mpsc;
3use futures::{SinkExt, Stream, StreamExt};
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sc_consensus::{
6    BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, StateAction, StorageChanges,
7};
8use sc_network::PeerId;
9use sc_network_common::sync::message::{
10    BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
11};
12use sc_network_sync::SyncingService;
13use sc_network_sync::block_relay_protocol::BlockDownloader;
14use sc_network_sync::service::network::NetworkServiceHandle;
15use sc_subspace_sync_common::snap_sync_engine::SnapSyncingEngine;
16use sp_blockchain::HeaderBackend;
17use sp_consensus::BlockOrigin;
18use sp_domains::ExecutionReceiptFor;
19use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Header, NumberFor};
20use std::collections::HashSet;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::time::{Duration, SystemTime, UNIX_EPOCH};
24use tokio::sync::broadcast;
25use tokio::time::sleep;
26use tracing::{Instrument, debug, error, trace};
27
28/// Notification with number of the block that is about to be imported and acknowledgement sender
29/// that pauses block production until the previous block is acknowledged.
30#[derive(Debug, Clone)]
31pub struct BlockImportingAcknowledgement<Block>
32where
33    Block: BlockT,
34{
35    /// Block number
36    pub block_number: NumberFor<Block>,
37    /// Sender for pausing the block import when operator is not fast enough to process
38    /// the consensus block.
39    pub acknowledgement_sender: mpsc::Sender<()>,
40}
41
42/// Provides parameters for domain snap sync synchronization with the consensus chain snap sync.
43pub struct ConsensusChainSyncParams<Block, DomainHeader>
44where
45    Block: BlockT,
46    DomainHeader: Header,
47{
48    /// Synchronizes consensus snap sync stages.
49    pub snap_sync_orchestrator: Arc<SnapSyncOrchestrator>,
50    /// Confirmed last Domain block ER
51    pub last_domain_block_er: ExecutionReceiptFor<DomainHeader, Block, Balance>,
52    /// Consensus chain block importing stream
53    pub block_importing_notification_stream:
54        Box<dyn Stream<Item = BlockImportingAcknowledgement<Block>> + Sync + Send + Unpin>,
55}
56
57/// Synchronizes consensus and domain chain snap sync.
58pub struct SnapSyncOrchestrator {
59    consensus_snap_sync_target_block_tx: broadcast::Sender<BlockNumber>,
60    domain_snap_sync_finished: Arc<AtomicBool>,
61}
62
63impl Default for SnapSyncOrchestrator {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl SnapSyncOrchestrator {
70    /// Constructor
71    pub fn new() -> Self {
72        let (tx, _) = broadcast::channel(1);
73        Self {
74            consensus_snap_sync_target_block_tx: tx,
75            domain_snap_sync_finished: Arc::new(AtomicBool::new(false)),
76        }
77    }
78
79    /// Unblocks (allows) consensus chain snap sync with the given target block.
80    pub fn unblock_consensus_snap_sync(&self, target_block_number: BlockNumber) {
81        debug!(%target_block_number, "Allowed starting consensus chain snap sync.");
82
83        let target_block_send_result = self
84            .consensus_snap_sync_target_block_tx
85            .send(target_block_number);
86
87        debug!(
88            ?target_block_send_result,
89            "Target block sending result: {target_block_number}"
90        );
91    }
92
93    /// Returns shared variable signaling domain snap sync finished.
94    pub fn domain_snap_sync_finished(&self) -> Arc<AtomicBool> {
95        self.domain_snap_sync_finished.clone()
96    }
97
98    /// Subscribes to a channel to receive target block numbers for consensus chain snap sync.
99    pub fn consensus_snap_sync_target_block_receiver(&self) -> broadcast::Receiver<BlockNumber> {
100        self.consensus_snap_sync_target_block_tx.subscribe()
101    }
102
103    /// Signal that domain snap sync finished.
104    pub fn mark_domain_snap_sync_finished(&self) {
105        debug!("Signal that domain snap sync finished.");
106        self.domain_snap_sync_finished
107            .store(true, Ordering::Release);
108    }
109}
110
111pub struct SyncParams<DomainClient, Block, CBlock>
112where
113    Block: BlockT,
114    CBlock: BlockT,
115{
116    pub domain_client: Arc<DomainClient>,
117    pub sync_service: Arc<SyncingService<Block>>,
118    pub domain_fork_id: Option<String>,
119    pub domain_network_service_handle: NetworkServiceHandle,
120    pub domain_block_downloader: Arc<dyn BlockDownloader<Block>>,
121    pub consensus_chain_sync_params: ConsensusChainSyncParams<CBlock, Block::Header>,
122    pub challenge_period: NumberFor<CBlock>,
123}
124
125async fn get_last_confirmed_block<Block: BlockT>(
126    block_downloader: Arc<dyn BlockDownloader<Block>>,
127    sync_service: &SyncingService<Block>,
128    block_number: BlockNumber,
129) -> Result<BlockData<Block>, sp_blockchain::Error> {
130    const LAST_CONFIRMED_BLOCK_RETRIES: u32 = 5;
131    const LOOP_PAUSE: Duration = Duration::from_secs(20);
132    const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
133
134    for attempt in 1..=LAST_CONFIRMED_BLOCK_RETRIES {
135        debug!(%attempt, %block_number, "Starting last confirmed block request...");
136
137        debug!(%block_number, "Gathering peers for last confirmed block request.");
138        let mut tried_peers = HashSet::<PeerId>::new();
139
140        let current_peer_id = match get_currently_connected_peer(
141            sync_service,
142            &mut tried_peers,
143            LOOP_PAUSE,
144            MAX_GET_PEERS_ATTEMPT_NUMBER,
145        )
146        .instrument(tracing::info_span!("last confirmed block"))
147        .await
148        {
149            Ok(peer_id) => peer_id,
150            Err(err) => {
151                debug!(?err, "Getting peers for the last confirmed block failed");
152                continue;
153            }
154        };
155        tried_peers.insert(current_peer_id);
156
157        let id = {
158            let now = SystemTime::now();
159            let duration_since_epoch = now
160                .duration_since(UNIX_EPOCH)
161                .expect("Time usually goes forward");
162
163            duration_since_epoch.as_nanos() as u64
164        };
165
166        let block_request = BlockRequest::<Block> {
167            id,
168            direction: Direction::Ascending,
169            from: FromBlock::Number(block_number.into()),
170            max: Some(1),
171            fields: BlockAttributes::HEADER
172                | BlockAttributes::JUSTIFICATION
173                | BlockAttributes::BODY
174                | BlockAttributes::RECEIPT
175                | BlockAttributes::MESSAGE_QUEUE
176                | BlockAttributes::INDEXED_BODY,
177        };
178        let block_response_result = block_downloader
179            .download_blocks(current_peer_id, block_request.clone())
180            .await;
181
182        match block_response_result {
183            Ok(block_response_inner_result) => {
184                trace!(
185                    %block_number,
186                    "Sync worker handle result: {:?}",
187                    block_response_inner_result.as_ref().map(|(block_data, protocol_name)| (hex::encode(block_data), protocol_name))
188                );
189
190                match block_response_inner_result {
191                    Ok(data) => {
192                        match block_downloader.block_response_into_blocks(&block_request, data.0) {
193                            Ok(mut blocks) => {
194                                trace!(%block_number, "Domain block parsing result: {:?}", blocks);
195
196                                if let Some(blocks) = blocks.pop() {
197                                    return Ok(blocks);
198                                } else {
199                                    trace!( %current_peer_id, "Got empty state blocks",);
200                                    continue;
201                                }
202                            }
203                            Err(error) => {
204                                error!(%block_number, ?error, "Domain block parsing error");
205                                continue;
206                            }
207                        }
208                    }
209                    Err(error) => {
210                        error!(%block_number, ?error, "Domain block sync error (inner)");
211                        continue;
212                    }
213                }
214            }
215            Err(error) => {
216                error!(%block_number, ?error, "Domain block sync error");
217                continue;
218            }
219        }
220    }
221
222    Err(sp_blockchain::Error::Application(
223        format!("Failed to get block {block_number}").into(),
224    ))
225}
226
227fn convert_block_number<Block: BlockT>(block_number: NumberFor<Block>) -> u32 {
228    let block_number: u32 = match block_number.try_into() {
229        Ok(block_number) => block_number,
230        Err(_) => {
231            panic!("Can't convert block number.")
232        }
233    };
234
235    block_number
236}
237
238pub(crate) async fn snap_sync<Block, Client, CBlock>(
239    sync_params: SyncParams<Client, Block, CBlock>,
240) -> Result<(), sp_blockchain::Error>
241where
242    Block: BlockT,
243    Client: HeaderBackend<Block>
244        + BlockImport<Block>
245        + AuxStore
246        + ProofProvider<Block>
247        + BlockchainEvents<Block>
248        + Send
249        + Sync
250        + 'static,
251    for<'a> &'a Client: BlockImport<Block>,
252    CBlock: BlockT,
253{
254    let last_confirmed_block_receipt = sync_params.consensus_chain_sync_params.last_domain_block_er;
255
256    // TODO: Handle the special case when we just added the domain
257    if last_confirmed_block_receipt.domain_block_number == 0u32.into() {
258        return Err(sp_blockchain::Error::Application(
259            "Can't snap sync from genesis.".into(),
260        ));
261    }
262
263    let consensus_block_hash = last_confirmed_block_receipt.consensus_block_hash;
264
265    let mut block_importing_notification_stream = sync_params
266        .consensus_chain_sync_params
267        .block_importing_notification_stream;
268
269    let mut consensus_target_block_acknowledgement_sender = None;
270    while let Some(mut block_notification) = block_importing_notification_stream.next().await {
271        if block_notification.block_number <= last_confirmed_block_receipt.consensus_block_number {
272            if block_notification
273                .acknowledgement_sender
274                .send(())
275                .await
276                .is_err()
277            {
278                return Err(sp_blockchain::Error::Application(
279                    format!(
280                        "Can't acknowledge block import #{}",
281                        block_notification.block_number
282                    )
283                    .into(),
284                ));
285            };
286        } else {
287            consensus_target_block_acknowledgement_sender
288                .replace(block_notification.acknowledgement_sender);
289            break;
290        }
291    }
292
293    let domain_block_number =
294        convert_block_number::<Block>(last_confirmed_block_receipt.domain_block_number);
295
296    let domain_block_hash = last_confirmed_block_receipt.domain_block_hash;
297    let domain_block = get_last_confirmed_block(
298        sync_params.domain_block_downloader,
299        &sync_params.sync_service,
300        domain_block_number,
301    )
302    .await?;
303
304    let Some(domain_block_header) = domain_block.header else {
305        return Err(sp_blockchain::Error::MissingHeader(
306            "Can't obtain domain block header for snap sync".to_string(),
307        ));
308    };
309
310    let state_result = download_state(
311        &domain_block_header,
312        &sync_params.domain_client,
313        sync_params.domain_fork_id,
314        &sync_params.domain_network_service_handle,
315        &sync_params.sync_service,
316    )
317    .await;
318
319    trace!("State downloaded: {:?}", state_result);
320
321    {
322        let client = sync_params.domain_client.clone();
323        // Import first block as finalized
324        let mut block =
325            BlockImportParams::new(BlockOrigin::NetworkInitialSync, domain_block_header);
326        block.body = domain_block.body;
327        block.justifications = domain_block.justifications;
328        block.state_action = StateAction::ApplyChanges(StorageChanges::Import(state_result?));
329        block.finalized = true;
330        block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
331        client.as_ref().import_block(block).await.map_err(|error| {
332            sp_blockchain::Error::Backend(format!("Failed to import state block: {error}"))
333        })?;
334    }
335
336    trace!(
337        "Domain client info after waiting: {:?}",
338        sync_params.domain_client.info()
339    );
340
341    // Verify domain state block creation.
342    if let Ok(Some(created_domain_block_hash)) =
343        sync_params.domain_client.hash(domain_block_number.into())
344    {
345        if created_domain_block_hash == domain_block_hash {
346            trace!(
347                ?created_domain_block_hash,
348                ?domain_block_hash,
349                "Created hash matches after the domain block import with state",
350            );
351        } else {
352            debug!(
353                ?created_domain_block_hash,
354                ?domain_block_hash,
355                "Created hash doesn't match after the domain block import with state",
356            );
357
358            return Err(sp_blockchain::Error::Backend(
359                "Created hash doesn't match after the domain block import with state".to_string(),
360            ));
361        }
362    } else {
363        return Err(sp_blockchain::Error::Backend(
364            "Can't obtain domain block hash after state importing for snap sync".to_string(),
365        ));
366    }
367
368    crate::aux_schema::track_domain_hash_and_consensus_hash::<_, Block, CBlock>(
369        &sync_params.domain_client,
370        domain_block_hash,
371        consensus_block_hash,
372        // skip cleaning up finalized hash so that operator can pick after snap sync
373        // and continue where snap sync left off
374        false,
375    )?;
376
377    crate::aux_schema::write_execution_receipt::<_, Block, CBlock>(
378        sync_params.domain_client.as_ref(),
379        None,
380        &last_confirmed_block_receipt,
381        sync_params.challenge_period,
382    )?;
383
384    sync_params
385        .consensus_chain_sync_params
386        .snap_sync_orchestrator
387        .mark_domain_snap_sync_finished();
388
389    debug!(info = ?sync_params.domain_client.info(), "Client info after successful domain snap sync.");
390
391    // Unblock consensus block importing
392    drop(consensus_target_block_acknowledgement_sender);
393    drop(block_importing_notification_stream);
394
395    Ok(())
396}
397
398/// Download and return state for specified block
399async fn download_state<Block, Client>(
400    header: &Block::Header,
401    client: &Arc<Client>,
402    fork_id: Option<String>,
403    network_service_handle: &NetworkServiceHandle,
404    sync_service: &SyncingService<Block>,
405) -> Result<ImportedState<Block>, sp_blockchain::Error>
406where
407    Block: BlockT,
408    Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
409{
410    let block_number = *header.number();
411
412    const STATE_SYNC_RETRIES: u32 = 20;
413    const LOOP_PAUSE: Duration = Duration::from_secs(20);
414    const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
415
416    for attempt in 1..=STATE_SYNC_RETRIES {
417        debug!(%block_number, %attempt, "Starting state sync...");
418
419        debug!(%block_number, "Gathering peers for state sync.");
420        let mut tried_peers = HashSet::<PeerId>::new();
421
422        let current_peer_id = match get_currently_connected_peer(
423            sync_service,
424            &mut tried_peers,
425            LOOP_PAUSE,
426            MAX_GET_PEERS_ATTEMPT_NUMBER,
427        )
428        .instrument(tracing::info_span!("download state"))
429        .await
430        {
431            Ok(peer_id) => peer_id,
432            Err(err) => {
433                debug!(?err, "Getting peers for state downloading failed");
434                continue;
435            }
436        };
437        tried_peers.insert(current_peer_id);
438
439        let sync_engine = SnapSyncingEngine::<Block>::new(
440            client.clone(),
441            fork_id.as_deref(),
442            header.clone(),
443            false,
444            (current_peer_id, block_number),
445            network_service_handle,
446        )?;
447
448        let last_block_from_sync_result = sync_engine.download_state().await;
449
450        match last_block_from_sync_result {
451            Ok(block_to_import) => {
452                debug!(%block_number, "Sync worker handle result: {:?}", block_to_import);
453
454                return block_to_import.state.ok_or_else(|| {
455                    sp_blockchain::Error::Backend(
456                        "Imported state was missing in synced block".into(),
457                    )
458                });
459            }
460            Err(error) => {
461                error!(%block_number, %error, "State sync error");
462                continue;
463            }
464        }
465    }
466
467    Err(sp_blockchain::Error::Backend(
468        "All snap sync retries failed".into(),
469    ))
470}
471
472async fn get_currently_connected_peer<Block>(
473    sync_service: &SyncingService<Block>,
474    tried_peers: &mut HashSet<PeerId>,
475    loop_pause: Duration,
476    max_attempts: usize,
477) -> Result<PeerId, sp_blockchain::Error>
478where
479    Block: BlockT,
480{
481    for current_attempt in 0..max_attempts {
482        let all_connected_peers = sync_service
483            .peers_info()
484            .await
485            .expect("Network service must be available.");
486
487        debug!(
488            %current_attempt,
489            ?all_connected_peers,
490            "Connected peers"
491        );
492
493        let connected_full_peers = all_connected_peers
494            .iter()
495            .filter_map(|(peer_id, info)| (info.roles.is_full()).then_some(*peer_id))
496            .collect::<Vec<_>>();
497
498        debug!(
499            %current_attempt,
500            ?tried_peers,
501            "Sync peers: {:?}", connected_full_peers
502        );
503
504        let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter());
505
506        if let Some(peer_id) = active_peers_set.difference(tried_peers).next().cloned() {
507            tried_peers.insert(peer_id);
508            return Ok(peer_id);
509        }
510
511        sleep(loop_pause).await;
512    }
513
514    Err(sp_blockchain::Error::Backend(
515        "All connected peer retries failed".into(),
516    ))
517}