subspace_service/sync_from_dsn/
snap_sync.rs

1use crate::mmr::sync::MmrSync;
2use crate::sync_from_dsn::PieceGetter;
3use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
4use crate::utils::wait_for_block_import;
5use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
6use sc_consensus::import_queue::ImportQueueService;
7use sc_consensus::{
8    BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, IncomingBlock, StateAction,
9    StorageChanges,
10};
11use sc_consensus_subspace::archiver::{SegmentHeadersStore, decode_block};
12use sc_network::service::traits::NetworkService;
13use sc_network::{NetworkBlock, NetworkRequest, PeerId};
14use sc_network_sync::SyncingService;
15use sc_network_sync::service::network::NetworkServiceHandle;
16use sc_subspace_sync_common::snap_sync_engine::SnapSyncingEngine;
17use sp_api::ProvideRuntimeApi;
18use sp_blockchain::HeaderBackend;
19use sp_consensus::BlockOrigin;
20use sp_consensus_subspace::SubspaceApi;
21use sp_core::H256;
22use sp_core::offchain::OffchainStorage;
23use sp_mmr_primitives::MmrApi;
24use sp_objects::ObjectsApi;
25use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
26use std::collections::{HashSet, VecDeque};
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::{Arc, Mutex};
29use std::time::Duration;
30use subspace_archiving::reconstructor::Reconstructor;
31use subspace_core_primitives::segments::SegmentIndex;
32use subspace_core_primitives::{BlockNumber, PublicKey};
33use subspace_data_retrieval::segment_downloading::download_segment_pieces;
34use subspace_erasure_coding::ErasureCoding;
35use subspace_networking::Node;
36use tokio::sync::broadcast::Receiver;
37use tokio::task;
38use tokio::time::sleep;
39use tracing::{debug, error, trace, warn};
40
41/// Error type for snap sync.
42#[derive(thiserror::Error, Debug)]
43pub enum Error {
44    /// A fatal snap sync error which requires user intervention.
45    /// Most snap sync errors are non-fatal, because we can just continue with regular sync.
46    #[error("Snap Sync requires user action: {0}")]
47    SnapSyncImpossible(String),
48
49    /// Substrate service error.
50    #[error(transparent)]
51    Sub(#[from] sc_service::Error),
52
53    /// Substrate blockchain client error.
54    #[error(transparent)]
55    Client(#[from] sp_blockchain::Error),
56
57    /// Other.
58    #[error("Snap sync error: {0}")]
59    Other(String),
60}
61
62impl From<String> for Error {
63    fn from(error: String) -> Self {
64        Error::Other(error)
65    }
66}
67
68/// Run a snap sync, return an error if snap sync is impossible and user intervention is required.
69/// Otherwise, just log the error and return `Ok(())` so that regular sync continues.
70#[allow(clippy::too_many_arguments)]
71pub(crate) async fn snap_sync<Block, AS, Client, PG, OS>(
72    segment_headers_store: SegmentHeadersStore<AS>,
73    node: Node,
74    fork_id: Option<String>,
75    client: Arc<Client>,
76    mut import_queue_service: Box<dyn ImportQueueService<Block>>,
77    pause_sync: Arc<AtomicBool>,
78    piece_getter: PG,
79    sync_service: Arc<SyncingService<Block>>,
80    network_service_handle: NetworkServiceHandle,
81    erasure_coding: ErasureCoding,
82    target_block_receiver: Option<Receiver<BlockNumber>>,
83    offchain_storage: Option<OS>,
84    network_service: Arc<dyn NetworkService>,
85) -> Result<(), Error>
86where
87    Block: BlockT,
88    AS: AuxStore,
89    Client: HeaderBackend<Block>
90        + ProvideRuntimeApi<Block>
91        + ProofProvider<Block>
92        + BlockImport<Block>
93        + BlockchainEvents<Block>
94        + Send
95        + Sync
96        + 'static,
97    Client::Api:
98        SubspaceApi<Block, PublicKey> + ObjectsApi<Block> + MmrApi<Block, H256, NumberFor<Block>>,
99    PG: PieceGetter,
100    OS: OffchainStorage,
101{
102    let info = client.info();
103    // Only attempt snap sync with genesis state
104    // TODO: Support snap sync from any state once
105    //  https://github.com/paritytech/polkadot-sdk/issues/5366 is resolved
106    if info.best_hash == info.genesis_hash {
107        pause_sync.store(true, Ordering::Release);
108
109        let target_block = if let Some(mut target_block_receiver) = target_block_receiver {
110            match target_block_receiver.recv().await {
111                Ok(target_block) => Some(target_block),
112                Err(err) => {
113                    error!(?err, "Snap sync failed: can't obtain target block.");
114                    return Err(Error::Other(
115                        "Snap sync failed: can't obtain target block.".into(),
116                    ));
117                }
118            }
119        } else {
120            None
121        };
122
123        debug!("Snap sync target block: {:?}", target_block);
124
125        sync(
126            &segment_headers_store,
127            &node,
128            &piece_getter,
129            fork_id.as_deref(),
130            &client,
131            import_queue_service.as_mut(),
132            sync_service.clone(),
133            &network_service_handle,
134            target_block,
135            &erasure_coding,
136            offchain_storage,
137            network_service,
138        )
139        .await?;
140
141        // This will notify Substrate's sync mechanism and allow regular Substrate sync to continue
142        // gracefully
143        {
144            let info = client.info();
145            sync_service.new_best_block_imported(info.best_hash, info.best_number);
146        }
147        pause_sync.store(false, Ordering::Release);
148    } else {
149        debug!("Snap sync can only work with genesis state, skipping");
150    }
151
152    Ok(())
153}
154
155// Get blocks from the last segment or from the segment containing the target block.
156// Returns encoded blocks collection and used segment index.
157pub(crate) async fn get_blocks_from_target_segment<AS, PG>(
158    segment_headers_store: &SegmentHeadersStore<AS>,
159    node: &Node,
160    piece_getter: &PG,
161    target_block: Option<BlockNumber>,
162    erasure_coding: &ErasureCoding,
163) -> Result<Option<(SegmentIndex, VecDeque<(BlockNumber, Vec<u8>)>)>, Error>
164where
165    AS: AuxStore,
166    PG: PieceGetter,
167{
168    sync_segment_headers(segment_headers_store, node)
169        .await
170        .map_err(|error| format!("Failed to sync segment headers: {error}"))?;
171
172    let target_segment_index = {
173        let last_segment_index = segment_headers_store
174            .max_segment_index()
175            .expect("Successfully synced above; qed");
176
177        if let Some(target_block) = target_block {
178            let mut segment_header = segment_headers_store
179                .get_segment_header(last_segment_index)
180                .ok_or(format!(
181                    "Can't get segment header from the store: {last_segment_index}"
182                ))?;
183
184            let mut target_block_exceeded_last_archived_block = false;
185            if target_block > segment_header.last_archived_block().number {
186                warn!(
187                   %last_segment_index,
188                   %target_block,
189
190                    "Specified target block is greater than the last archived block. \
191                     Choosing the last archived block (#{}) as target block...
192                    ",
193                    segment_header.last_archived_block().number
194                );
195                target_block_exceeded_last_archived_block = true;
196            }
197
198            if !target_block_exceeded_last_archived_block {
199                let mut current_segment_index = last_segment_index;
200
201                loop {
202                    if current_segment_index <= SegmentIndex::ONE {
203                        break;
204                    }
205
206                    if target_block > segment_header.last_archived_block().number {
207                        current_segment_index += SegmentIndex::ONE;
208                        break;
209                    }
210
211                    current_segment_index -= SegmentIndex::ONE;
212
213                    segment_header = segment_headers_store
214                        .get_segment_header(current_segment_index)
215                        .ok_or(format!(
216                            "Can't get segment header from the store: {last_segment_index}"
217                        ))?;
218                }
219
220                current_segment_index
221            } else {
222                last_segment_index
223            }
224        } else {
225            last_segment_index
226        }
227    };
228
229    // We don't have the genesis state when we choose to snap sync.
230    if target_segment_index <= SegmentIndex::ONE {
231        // The caller logs this error
232        return Err(Error::SnapSyncImpossible(
233            "Snap sync is impossible - not enough archived history".into(),
234        ));
235    }
236
237    // Identify all segment headers that would need to be reconstructed in order to get first
238    // block of last segment header
239    let mut segments_to_reconstruct = VecDeque::from([target_segment_index]);
240    {
241        let mut last_segment_first_block_number = None;
242
243        loop {
244            let oldest_segment_index = *segments_to_reconstruct.front().expect("Not empty; qed");
245            let segment_index = oldest_segment_index
246                .checked_sub(SegmentIndex::ONE)
247                .ok_or_else(|| {
248                    format!(
249                        "Attempted to get segment index before {oldest_segment_index} during \
250                            snap sync"
251                    )
252                })?;
253            let segment_header = segment_headers_store
254                .get_segment_header(segment_index)
255                .ok_or_else(|| {
256                    format!("Failed to get segment index {segment_index} during snap sync")
257                })?;
258            let last_archived_block = segment_header.last_archived_block();
259
260            // If older segment header ends with fully archived block then no additional
261            // information is necessary
262            if last_archived_block.partial_archived().is_none() {
263                break;
264            }
265
266            match last_segment_first_block_number {
267                Some(block_number) => {
268                    if block_number == last_archived_block.number {
269                        // If older segment ends with the same block number as the first block
270                        // in the last segment then add it to the list of segments that need to
271                        // be reconstructed
272                        segments_to_reconstruct.push_front(segment_index);
273                    } else {
274                        // Otherwise we're done here
275                        break;
276                    }
277                }
278                None => {
279                    last_segment_first_block_number.replace(last_archived_block.number);
280                    // This segment will definitely be needed to reconstruct first block of the
281                    // last segment
282                    segments_to_reconstruct.push_front(segment_index);
283                }
284            }
285        }
286    }
287
288    // Reconstruct blocks of the last segment
289    let mut blocks = VecDeque::new();
290    {
291        let reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
292
293        for segment_index in segments_to_reconstruct {
294            let segment_pieces = download_segment_pieces(segment_index, piece_getter)
295                .await
296                .map_err(|error| format!("Failed to download segment pieces: {error}"))?;
297            // CPU-intensive piece and segment reconstruction code can block the async executor.
298            let segment_contents_fut = task::spawn_blocking({
299                let reconstructor = reconstructor.clone();
300
301                move || {
302                    reconstructor
303                        .lock()
304                        .expect("Panic if previous thread panicked when holding the mutex")
305                        .add_segment(segment_pieces.as_ref())
306                }
307            });
308
309            blocks = VecDeque::from(
310                segment_contents_fut
311                    .await
312                    .expect("Panic if blocking task panicked")
313                    .map_err(|error| error.to_string())?
314                    .blocks,
315            );
316
317            trace!( %segment_index, "Segment reconstructed successfully");
318        }
319    }
320
321    Ok(Some((target_segment_index, blocks)))
322}
323
324#[allow(clippy::too_many_arguments)]
325/// Synchronize the blockchain to the target_block (approximate value based on the containing
326/// segment) or to the last archived block. Returns false when sync is skipped.
327async fn sync<PG, AS, Block, Client, IQS, OS, NR>(
328    segment_headers_store: &SegmentHeadersStore<AS>,
329    node: &Node,
330    piece_getter: &PG,
331    fork_id: Option<&str>,
332    client: &Arc<Client>,
333    import_queue_service: &mut IQS,
334    sync_service: Arc<SyncingService<Block>>,
335    network_service_handle: &NetworkServiceHandle,
336    target_block: Option<BlockNumber>,
337    erasure_coding: &ErasureCoding,
338    offchain_storage: Option<OS>,
339    network_request: NR,
340) -> Result<(), Error>
341where
342    PG: PieceGetter,
343    AS: AuxStore,
344    Block: BlockT,
345    Client: HeaderBackend<Block>
346        + ProvideRuntimeApi<Block>
347        + ProofProvider<Block>
348        + BlockImport<Block>
349        + BlockchainEvents<Block>
350        + Send
351        + Sync
352        + 'static,
353    Client::Api:
354        SubspaceApi<Block, PublicKey> + ObjectsApi<Block> + MmrApi<Block, H256, NumberFor<Block>>,
355    IQS: ImportQueueService<Block> + ?Sized,
356    OS: OffchainStorage,
357    NR: NetworkRequest + Sync + Send,
358{
359    debug!("Starting snap sync...");
360
361    let Some((target_segment_index, mut blocks)) = get_blocks_from_target_segment(
362        segment_headers_store,
363        node,
364        piece_getter,
365        target_block,
366        erasure_coding,
367    )
368    .await?
369    else {
370        // Snap-sync skipped
371        return Ok(());
372    };
373
374    debug!(
375        "Segments data received. Target segment index: {:?}",
376        target_segment_index
377    );
378
379    let mut blocks_to_import = Vec::with_capacity(blocks.len().saturating_sub(1));
380    let last_block_number;
381
382    // First block is special because we need to download state for it
383    {
384        let (first_block_number, first_block_bytes) = blocks
385            .pop_front()
386            .expect("List of blocks is not empty according to logic above; qed");
387
388        // Sometimes first block is the only block
389        last_block_number = blocks
390            .back()
391            .map_or(first_block_number, |(block_number, _block_bytes)| {
392                *block_number
393            });
394
395        debug!(
396            %target_segment_index,
397            %first_block_number,
398            %last_block_number,
399            "Blocks from target segment downloaded"
400        );
401
402        let signed_block = decode_block::<Block>(&first_block_bytes)
403            .map_err(|error| format!("Failed to decode archived block: {error}"))?;
404        drop(first_block_bytes);
405        let (header, extrinsics) = signed_block.block.deconstruct();
406
407        // Download state for the first block, so it can be imported even without doing execution
408        let state = download_state(
409            &header,
410            client,
411            fork_id,
412            &sync_service,
413            network_service_handle,
414        )
415        .await
416        .map_err(|error| {
417            format!("Failed to download state for the first block of target segment: {error}")
418        })?;
419
420        debug!("Downloaded state of the first block of the target segment");
421
422        // Import first block as finalized
423        let mut block = BlockImportParams::new(BlockOrigin::NetworkInitialSync, header);
424        block.body.replace(extrinsics);
425        block.justifications = signed_block.justifications;
426        block.state_action = StateAction::ApplyChanges(StorageChanges::Import(state));
427        block.finalized = true;
428        block.create_gap = false;
429        block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
430        client
431            .import_block(block)
432            .await
433            .map_err(|error| format!("Failed to import first block of target segment: {error}"))?;
434    }
435
436    // download and commit MMR data before importing next set of blocks
437    // since they are imported with block verification, and we need MMR data during the verification
438    let maybe_mmr_sync = if let Some(offchain_storage) = offchain_storage {
439        let mut mmr_sync = MmrSync::new(client.clone(), offchain_storage);
440        // We sync MMR up to the last block number. All other MMR-data will be synced after
441        // resuming either DSN-sync or Substrate-sync.
442        mmr_sync
443            .sync(
444                fork_id.map(|v| v.into()),
445                network_request,
446                sync_service.clone(),
447                last_block_number,
448            )
449            .await?;
450        Some(mmr_sync)
451    } else {
452        None
453    };
454
455    debug!(
456        blocks_count = %blocks.len(),
457        "Queuing importing remaining blocks from target segment"
458    );
459
460    for (_block_number, block_bytes) in blocks {
461        let signed_block = decode_block::<Block>(&block_bytes)
462            .map_err(|error| format!("Failed to decode archived block: {error}"))?;
463        let (header, extrinsics) = signed_block.block.deconstruct();
464
465        blocks_to_import.push(IncomingBlock {
466            hash: header.hash(),
467            header: Some(header),
468            body: Some(extrinsics),
469            indexed_body: None,
470            justifications: signed_block.justifications,
471            origin: None,
472            allow_missing_state: false,
473            import_existing: false,
474            skip_execution: false,
475            state: None,
476        });
477    }
478
479    if !blocks_to_import.is_empty() {
480        import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
481    }
482
483    // Wait for blocks to be imported
484    // TODO: Replace this hack with actual watching of block import
485    wait_for_block_import(client.as_ref(), last_block_number.into()).await;
486
487    // verify the MMR sync before finishing up the block import
488    if let Some(mmr_sync) = maybe_mmr_sync {
489        mmr_sync.verify_mmr_data()?;
490    }
491
492    debug!( info = ?client.info(), "Snap sync finished successfully");
493
494    Ok(())
495}
496
497async fn sync_segment_headers<AS>(
498    segment_headers_store: &SegmentHeadersStore<AS>,
499    node: &Node,
500) -> Result<(), Error>
501where
502    AS: AuxStore,
503{
504    let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
505        Error::Other(
506            "Archiver needs to be initialized before syncing from DSN to populate the very first \
507            segment"
508                .to_string(),
509        )
510    })?;
511    let new_segment_headers = SegmentHeaderDownloader::new(node)
512        .get_segment_headers(&last_segment_header)
513        .await
514        .map_err(|error| error.to_string())?;
515
516    debug!("Found {} new segment headers", new_segment_headers.len());
517
518    if !new_segment_headers.is_empty() {
519        segment_headers_store.add_segment_headers(&new_segment_headers)?;
520    }
521
522    Ok(())
523}
524
525/// Download and return state for specified block
526async fn download_state<Block, Client>(
527    header: &Block::Header,
528    client: &Arc<Client>,
529    fork_id: Option<&str>,
530    sync_service: &SyncingService<Block>,
531    network_service_handle: &NetworkServiceHandle,
532) -> Result<ImportedState<Block>, Error>
533where
534    Block: BlockT,
535    Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
536{
537    let block_number = *header.number();
538
539    const STATE_SYNC_RETRIES: u32 = 10;
540    const LOOP_PAUSE: Duration = Duration::from_secs(10);
541
542    for attempt in 1..=STATE_SYNC_RETRIES {
543        debug!( %attempt, "Starting state sync...");
544
545        debug!("Gathering peers for state sync.");
546        let mut tried_peers = HashSet::<PeerId>::new();
547
548        // TODO: add loop timeout
549        let current_peer_id = loop {
550            let connected_full_peers = sync_service
551                .peers_info()
552                .await
553                .expect("Network service must be available.")
554                .iter()
555                .filter_map(|(peer_id, info)| {
556                    (info.roles.is_full() && info.best_number > block_number).then_some(*peer_id)
557                })
558                .collect::<Vec<_>>();
559
560            debug!(?tried_peers, "Sync peers: {}", connected_full_peers.len());
561
562            let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter());
563
564            if let Some(peer_id) = active_peers_set.difference(&tried_peers).next().cloned() {
565                break peer_id;
566            }
567
568            sleep(LOOP_PAUSE).await;
569        };
570
571        tried_peers.insert(current_peer_id);
572
573        let sync_engine = SnapSyncingEngine::<Block>::new(
574            client.clone(),
575            fork_id,
576            header.clone(),
577            false,
578            (current_peer_id, block_number),
579            network_service_handle,
580        )
581        .map_err(Error::Client)?;
582
583        let last_block_from_sync_result = sync_engine.download_state().await;
584
585        match last_block_from_sync_result {
586            Ok(block_to_import) => {
587                debug!("Sync worker handle result: {:?}", block_to_import);
588
589                return block_to_import.state.ok_or_else(|| {
590                    Error::Other("Imported state was missing in synced block".into())
591                });
592            }
593            Err(error) => {
594                error!( %error, "State sync error");
595                continue;
596            }
597        }
598    }
599
600    Err(Error::Other("All snap sync retries failed".into()))
601}