domain_client_operator/
snap_sync.rs

1use domain_runtime_primitives::{Balance, BlockNumber};
2use futures::channel::mpsc;
3use futures::{SinkExt, Stream, StreamExt};
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sc_consensus::{
6    BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, StateAction, StorageChanges,
7};
8use sc_network::PeerId;
9use sc_network_common::sync::message::{
10    BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
11};
12use sc_network_sync::block_relay_protocol::BlockDownloader;
13use sc_network_sync::service::network::NetworkServiceHandle;
14use sc_network_sync::SyncingService;
15use sc_subspace_sync_common::snap_sync_engine::SnapSyncingEngine;
16use sp_blockchain::HeaderBackend;
17use sp_consensus::BlockOrigin;
18use sp_domains::ExecutionReceiptFor;
19use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Header, NumberFor};
20use std::collections::HashSet;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use std::time::{Duration, SystemTime, UNIX_EPOCH};
24use tokio::sync::broadcast;
25use tokio::time::sleep;
26use tracing::{debug, error, trace, Instrument};
27
28pub(crate) const LOG_TARGET: &str = "domain_snap_sync";
29
30/// Notification with number of the block that is about to be imported and acknowledgement sender
31/// that pauses block production until the previous block is acknowledged.
32#[derive(Debug, Clone)]
33pub struct BlockImportingAcknowledgement<Block>
34where
35    Block: BlockT,
36{
37    /// Block number
38    pub block_number: NumberFor<Block>,
39    /// Sender for pausing the block import when operator is not fast enough to process
40    /// the consensus block.
41    pub acknowledgement_sender: mpsc::Sender<()>,
42}
43
44/// Provides parameters for domain snap sync synchronization with the consensus chain snap sync.
45pub struct ConsensusChainSyncParams<Block, DomainHeader>
46where
47    Block: BlockT,
48    DomainHeader: Header,
49{
50    /// Synchronizes consensus snap sync stages.
51    pub snap_sync_orchestrator: Arc<SnapSyncOrchestrator>,
52    /// Confirmed last Domain block ER
53    pub last_domain_block_er: ExecutionReceiptFor<DomainHeader, Block, Balance>,
54    /// Consensus chain block importing stream
55    pub block_importing_notification_stream:
56        Box<dyn Stream<Item = BlockImportingAcknowledgement<Block>> + Sync + Send + Unpin>,
57}
58
59/// Synchronizes consensus and domain chain snap sync.
60pub struct SnapSyncOrchestrator {
61    consensus_snap_sync_target_block_tx: broadcast::Sender<BlockNumber>,
62    domain_snap_sync_finished: Arc<AtomicBool>,
63}
64
65impl Default for SnapSyncOrchestrator {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71impl SnapSyncOrchestrator {
72    /// Constructor
73    pub fn new() -> Self {
74        let (tx, _) = broadcast::channel(1);
75        Self {
76            consensus_snap_sync_target_block_tx: tx,
77            domain_snap_sync_finished: Arc::new(AtomicBool::new(false)),
78        }
79    }
80
81    /// Unblocks (allows) consensus chain snap sync with the given target block.
82    pub fn unblock_consensus_snap_sync(&self, target_block_number: BlockNumber) {
83        debug!(target: LOG_TARGET, %target_block_number, "Allowed starting consensus chain snap sync.");
84
85        let target_block_send_result = self
86            .consensus_snap_sync_target_block_tx
87            .send(target_block_number);
88
89        debug!(
90            target: LOG_TARGET,
91            ?target_block_send_result,
92            "Target block sending result: {target_block_number}"
93        );
94    }
95
96    /// Returns shared variable signaling domain snap sync finished.
97    pub fn domain_snap_sync_finished(&self) -> Arc<AtomicBool> {
98        self.domain_snap_sync_finished.clone()
99    }
100
101    /// Subscribes to a channel to receive target block numbers for consensus chain snap sync.
102    pub fn consensus_snap_sync_target_block_receiver(&self) -> broadcast::Receiver<BlockNumber> {
103        self.consensus_snap_sync_target_block_tx.subscribe()
104    }
105
106    /// Signal that domain snap sync finished.
107    pub fn mark_domain_snap_sync_finished(&self) {
108        debug!(target: LOG_TARGET, "Signal that domain snap sync finished.");
109        self.domain_snap_sync_finished
110            .store(true, Ordering::Release);
111    }
112}
113
114pub struct SyncParams<DomainClient, Block, CBlock>
115where
116    Block: BlockT,
117    CBlock: BlockT,
118{
119    pub domain_client: Arc<DomainClient>,
120    pub sync_service: Arc<SyncingService<Block>>,
121    pub domain_fork_id: Option<String>,
122    pub domain_network_service_handle: NetworkServiceHandle,
123    pub domain_block_downloader: Arc<dyn BlockDownloader<Block>>,
124    pub consensus_chain_sync_params: ConsensusChainSyncParams<CBlock, Block::Header>,
125    pub challenge_period: NumberFor<CBlock>,
126}
127
128async fn get_last_confirmed_block<Block: BlockT>(
129    block_downloader: Arc<dyn BlockDownloader<Block>>,
130    sync_service: &SyncingService<Block>,
131    block_number: BlockNumber,
132) -> Result<BlockData<Block>, sp_blockchain::Error> {
133    const LAST_CONFIRMED_BLOCK_RETRIES: u32 = 5;
134    const LOOP_PAUSE: Duration = Duration::from_secs(20);
135    const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
136
137    for attempt in 1..=LAST_CONFIRMED_BLOCK_RETRIES {
138        debug!(target: LOG_TARGET, %attempt, %block_number, "Starting last confirmed block request...");
139
140        debug!(target: LOG_TARGET, %block_number, "Gathering peers for last confirmed block request.");
141        let mut tried_peers = HashSet::<PeerId>::new();
142
143        let current_peer_id = match get_currently_connected_peer(
144            sync_service,
145            &mut tried_peers,
146            LOOP_PAUSE,
147            MAX_GET_PEERS_ATTEMPT_NUMBER,
148        )
149        .instrument(tracing::info_span!("last confirmed block"))
150        .await
151        {
152            Ok(peer_id) => peer_id,
153            Err(err) => {
154                debug!(target: LOG_TARGET, ?err, "Getting peers for the last confirmed block failed");
155                continue;
156            }
157        };
158        tried_peers.insert(current_peer_id);
159
160        let id = {
161            let now = SystemTime::now();
162            let duration_since_epoch = now
163                .duration_since(UNIX_EPOCH)
164                .expect("Time usually goes forward");
165
166            duration_since_epoch.as_nanos() as u64
167        };
168
169        let block_request = BlockRequest::<Block> {
170            id,
171            direction: Direction::Ascending,
172            from: FromBlock::Number(block_number.into()),
173            max: Some(1),
174            fields: BlockAttributes::HEADER
175                | BlockAttributes::JUSTIFICATION
176                | BlockAttributes::BODY
177                | BlockAttributes::RECEIPT
178                | BlockAttributes::MESSAGE_QUEUE
179                | BlockAttributes::INDEXED_BODY,
180        };
181        let block_response_result = block_downloader
182            .download_blocks(current_peer_id, block_request.clone())
183            .await;
184
185        match block_response_result {
186            Ok(block_response_inner_result) => {
187                trace!(
188                    target: LOG_TARGET,
189                    %block_number,
190                    "Sync worker handle result: {:?}",
191                    block_response_inner_result.as_ref().map(|(block_data, protocol_name)| (hex::encode(block_data), protocol_name))
192                );
193
194                match block_response_inner_result {
195                    Ok(data) => {
196                        match block_downloader.block_response_into_blocks(&block_request, data.0) {
197                            Ok(mut blocks) => {
198                                trace!(target: LOG_TARGET, %block_number, "Domain block parsing result: {:?}", blocks);
199
200                                if let Some(blocks) = blocks.pop() {
201                                    return Ok(blocks);
202                                } else {
203                                    trace!(target: LOG_TARGET, %current_peer_id, "Got empty state blocks",);
204                                    continue;
205                                }
206                            }
207                            Err(error) => {
208                                error!(target: LOG_TARGET, %block_number, ?error, "Domain block parsing error");
209                                continue;
210                            }
211                        }
212                    }
213                    Err(error) => {
214                        error!(target: LOG_TARGET, %block_number, ?error, "Domain block sync error (inner)");
215                        continue;
216                    }
217                }
218            }
219            Err(error) => {
220                error!(target: LOG_TARGET, %block_number, ?error, "Domain block sync error");
221                continue;
222            }
223        }
224    }
225
226    Err(sp_blockchain::Error::Application(
227        format!("Failed to get block {}", block_number).into(),
228    ))
229}
230
231fn convert_block_number<Block: BlockT>(block_number: NumberFor<Block>) -> u32 {
232    let block_number: u32 = match block_number.try_into() {
233        Ok(block_number) => block_number,
234        Err(_) => {
235            panic!("Can't convert block number.")
236        }
237    };
238
239    block_number
240}
241
242pub(crate) async fn snap_sync<Block, Client, CBlock>(
243    sync_params: SyncParams<Client, Block, CBlock>,
244) -> Result<(), sp_blockchain::Error>
245where
246    Block: BlockT,
247    Client: HeaderBackend<Block>
248        + BlockImport<Block>
249        + AuxStore
250        + ProofProvider<Block>
251        + BlockchainEvents<Block>
252        + Send
253        + Sync
254        + 'static,
255    for<'a> &'a Client: BlockImport<Block>,
256    CBlock: BlockT,
257{
258    let last_confirmed_block_receipt = sync_params.consensus_chain_sync_params.last_domain_block_er;
259
260    // TODO: Handle the special case when we just added the domain
261    if last_confirmed_block_receipt.domain_block_number == 0u32.into() {
262        return Err(sp_blockchain::Error::Application(
263            "Can't snap sync from genesis.".into(),
264        ));
265    }
266
267    let consensus_block_hash = last_confirmed_block_receipt.consensus_block_hash;
268
269    let mut block_importing_notification_stream = sync_params
270        .consensus_chain_sync_params
271        .block_importing_notification_stream;
272
273    let mut consensus_target_block_acknowledgement_sender = None;
274    while let Some(mut block_notification) = block_importing_notification_stream.next().await {
275        if block_notification.block_number <= last_confirmed_block_receipt.consensus_block_number {
276            if block_notification
277                .acknowledgement_sender
278                .send(())
279                .await
280                .is_err()
281            {
282                return Err(sp_blockchain::Error::Application(
283                    format!(
284                        "Can't acknowledge block import #{}",
285                        block_notification.block_number
286                    )
287                    .into(),
288                ));
289            };
290        } else {
291            consensus_target_block_acknowledgement_sender
292                .replace(block_notification.acknowledgement_sender);
293            break;
294        }
295    }
296
297    let domain_block_number =
298        convert_block_number::<Block>(last_confirmed_block_receipt.domain_block_number);
299
300    let domain_block_hash = last_confirmed_block_receipt.domain_block_hash;
301    let domain_block = get_last_confirmed_block(
302        sync_params.domain_block_downloader,
303        &sync_params.sync_service,
304        domain_block_number,
305    )
306    .await?;
307
308    let Some(domain_block_header) = domain_block.header else {
309        return Err(sp_blockchain::Error::MissingHeader(
310            "Can't obtain domain block header for snap sync".to_string(),
311        ));
312    };
313
314    let state_result = download_state(
315        &domain_block_header,
316        &sync_params.domain_client,
317        sync_params.domain_fork_id,
318        &sync_params.domain_network_service_handle,
319        &sync_params.sync_service,
320    )
321    .await;
322
323    trace!(target: LOG_TARGET, "State downloaded: {:?}", state_result);
324
325    {
326        let client = sync_params.domain_client.clone();
327        // Import first block as finalized
328        let mut block =
329            BlockImportParams::new(BlockOrigin::NetworkInitialSync, domain_block_header);
330        block.body = domain_block.body;
331        block.justifications = domain_block.justifications;
332        block.state_action = StateAction::ApplyChanges(StorageChanges::Import(state_result?));
333        block.finalized = true;
334        block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
335        client.as_ref().import_block(block).await.map_err(|error| {
336            sp_blockchain::Error::Backend(format!("Failed to import state block: {error}"))
337        })?;
338    }
339
340    trace!(
341        target: LOG_TARGET,
342        "Domain client info after waiting: {:?}",
343        sync_params.domain_client.info()
344    );
345
346    // Verify domain state block creation.
347    if let Ok(Some(created_domain_block_hash)) =
348        sync_params.domain_client.hash(domain_block_number.into())
349    {
350        if created_domain_block_hash == domain_block_hash {
351            trace!(
352                target: LOG_TARGET,
353                ?created_domain_block_hash,
354                ?domain_block_hash,
355                "Created hash matches after the domain block import with state",
356            );
357        } else {
358            debug!(
359                target: LOG_TARGET,
360                ?created_domain_block_hash,
361                ?domain_block_hash,
362                "Created hash doesn't match after the domain block import with state",
363            );
364
365            return Err(sp_blockchain::Error::Backend(
366                "Created hash doesn't match after the domain block import with state".to_string(),
367            ));
368        }
369    } else {
370        return Err(sp_blockchain::Error::Backend(
371            "Can't obtain domain block hash after state importing for snap sync".to_string(),
372        ));
373    }
374
375    crate::aux_schema::track_domain_hash_and_consensus_hash::<_, Block, CBlock>(
376        &sync_params.domain_client,
377        domain_block_hash,
378        consensus_block_hash,
379        // skip cleaning up finalized hash so that operator can pick after snap sync
380        // and continue where snap sync left off
381        false,
382    )?;
383
384    crate::aux_schema::write_execution_receipt::<_, Block, CBlock>(
385        sync_params.domain_client.as_ref(),
386        None,
387        &last_confirmed_block_receipt,
388        sync_params.challenge_period,
389    )?;
390
391    sync_params
392        .consensus_chain_sync_params
393        .snap_sync_orchestrator
394        .mark_domain_snap_sync_finished();
395
396    debug!(target: LOG_TARGET, info = ?sync_params.domain_client.info(), "Client info after successful domain snap sync.");
397
398    // Unblock consensus block importing
399    drop(consensus_target_block_acknowledgement_sender);
400    drop(block_importing_notification_stream);
401
402    Ok(())
403}
404
405/// Download and return state for specified block
406async fn download_state<Block, Client>(
407    header: &Block::Header,
408    client: &Arc<Client>,
409    fork_id: Option<String>,
410    network_service_handle: &NetworkServiceHandle,
411    sync_service: &SyncingService<Block>,
412) -> Result<ImportedState<Block>, sp_blockchain::Error>
413where
414    Block: BlockT,
415    Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
416{
417    let block_number = *header.number();
418
419    const STATE_SYNC_RETRIES: u32 = 5;
420    const LOOP_PAUSE: Duration = Duration::from_secs(20);
421    const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
422
423    for attempt in 1..=STATE_SYNC_RETRIES {
424        debug!(target: LOG_TARGET, %block_number, %attempt, "Starting state sync...");
425
426        debug!(target: LOG_TARGET, %block_number, "Gathering peers for state sync.");
427        let mut tried_peers = HashSet::<PeerId>::new();
428
429        let current_peer_id = match get_currently_connected_peer(
430            sync_service,
431            &mut tried_peers,
432            LOOP_PAUSE,
433            MAX_GET_PEERS_ATTEMPT_NUMBER,
434        )
435        .instrument(tracing::info_span!("download state"))
436        .await
437        {
438            Ok(peer_id) => peer_id,
439            Err(err) => {
440                debug!(?err, "Getting peers for state downloading failed");
441                continue;
442            }
443        };
444        tried_peers.insert(current_peer_id);
445
446        let sync_engine = SnapSyncingEngine::<Block>::new(
447            client.clone(),
448            fork_id.as_deref(),
449            header.clone(),
450            false,
451            (current_peer_id, block_number),
452            network_service_handle,
453        )?;
454
455        let last_block_from_sync_result = sync_engine.download_state().await;
456
457        match last_block_from_sync_result {
458            Ok(block_to_import) => {
459                debug!(target: LOG_TARGET, %block_number, "Sync worker handle result: {:?}", block_to_import);
460
461                return block_to_import.state.ok_or_else(|| {
462                    sp_blockchain::Error::Backend(
463                        "Imported state was missing in synced block".into(),
464                    )
465                });
466            }
467            Err(error) => {
468                error!(target: LOG_TARGET, %block_number, %error, "State sync error");
469                continue;
470            }
471        }
472    }
473
474    Err(sp_blockchain::Error::Backend(
475        "All snap sync retries failed".into(),
476    ))
477}
478
479async fn get_currently_connected_peer<Block>(
480    sync_service: &SyncingService<Block>,
481    tried_peers: &mut HashSet<PeerId>,
482    loop_pause: Duration,
483    max_attempts: usize,
484) -> Result<PeerId, sp_blockchain::Error>
485where
486    Block: BlockT,
487{
488    for current_attempt in 0..max_attempts {
489        let all_connected_peers = sync_service
490            .peers_info()
491            .await
492            .expect("Network service must be available.");
493
494        debug!(
495            target: LOG_TARGET,
496            %current_attempt,
497            ?all_connected_peers,
498            "Connected peers"
499        );
500
501        let connected_full_peers = all_connected_peers
502            .iter()
503            .filter_map(|(peer_id, info)| (info.roles.is_full()).then_some(*peer_id))
504            .collect::<Vec<_>>();
505
506        debug!(
507            target: LOG_TARGET,
508            %current_attempt,
509            ?tried_peers,
510            "Sync peers: {:?}", connected_full_peers
511        );
512
513        let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter());
514
515        if let Some(peer_id) = active_peers_set.difference(tried_peers).next().cloned() {
516            tried_peers.insert(peer_id);
517            return Ok(peer_id);
518        }
519
520        sleep(loop_pause).await;
521    }
522
523    Err(sp_blockchain::Error::Backend(
524        "All connected peer retries failed".into(),
525    ))
526}