subspace_service/sync_from_dsn/
snap_sync.rs

1use crate::mmr::sync::MmrSync;
2use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
3use crate::sync_from_dsn::{PieceGetter, LOG_TARGET};
4use crate::utils::wait_for_block_import;
5use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
6use sc_consensus::import_queue::ImportQueueService;
7use sc_consensus::{
8    BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, IncomingBlock, StateAction,
9    StorageChanges,
10};
11use sc_consensus_subspace::archiver::{decode_block, SegmentHeadersStore};
12use sc_network::service::traits::NetworkService;
13use sc_network::{NetworkBlock, NetworkRequest, PeerId};
14use sc_network_sync::service::network::NetworkServiceHandle;
15use sc_network_sync::SyncingService;
16use sc_subspace_sync_common::snap_sync_engine::SnapSyncingEngine;
17use sp_api::ProvideRuntimeApi;
18use sp_blockchain::HeaderBackend;
19use sp_consensus::BlockOrigin;
20use sp_consensus_subspace::SubspaceApi;
21use sp_core::offchain::OffchainStorage;
22use sp_core::H256;
23use sp_mmr_primitives::MmrApi;
24use sp_objects::ObjectsApi;
25use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
26use std::collections::{HashSet, VecDeque};
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::{Arc, Mutex};
29use std::time::Duration;
30use subspace_archiving::reconstructor::Reconstructor;
31use subspace_core_primitives::segments::SegmentIndex;
32use subspace_core_primitives::{BlockNumber, PublicKey};
33use subspace_data_retrieval::segment_downloading::download_segment_pieces;
34use subspace_erasure_coding::ErasureCoding;
35use subspace_networking::Node;
36use tokio::sync::broadcast::Receiver;
37use tokio::task;
38use tokio::time::sleep;
39use tracing::{debug, error, trace, warn};
40
41/// Error type for snap sync.
42#[derive(thiserror::Error, Debug)]
43pub enum Error {
44    /// A fatal snap sync error which requires user intervention.
45    /// Most snap sync errors are non-fatal, because we can just continue with regular sync.
46    #[error("Snap Sync requires user action: {0}")]
47    SnapSyncImpossible(String),
48
49    /// Substrate service error.
50    #[error(transparent)]
51    Sub(#[from] sc_service::Error),
52
53    /// Substrate blockchain client error.
54    #[error(transparent)]
55    Client(#[from] sp_blockchain::Error),
56
57    /// Other.
58    #[error("Snap sync error: {0}")]
59    Other(String),
60}
61
62impl From<String> for Error {
63    fn from(error: String) -> Self {
64        Error::Other(error)
65    }
66}
67
68/// Run a snap sync, return an error if snap sync is impossible and user intervention is required.
69/// Otherwise, just log the error and return `Ok(())` so that regular sync continues.
70#[allow(clippy::too_many_arguments)]
71pub(crate) async fn snap_sync<Block, AS, Client, PG, OS>(
72    segment_headers_store: SegmentHeadersStore<AS>,
73    node: Node,
74    fork_id: Option<String>,
75    client: Arc<Client>,
76    mut import_queue_service: Box<dyn ImportQueueService<Block>>,
77    pause_sync: Arc<AtomicBool>,
78    piece_getter: PG,
79    sync_service: Arc<SyncingService<Block>>,
80    network_service_handle: NetworkServiceHandle,
81    erasure_coding: ErasureCoding,
82    target_block_receiver: Option<Receiver<BlockNumber>>,
83    offchain_storage: Option<OS>,
84    network_service: Arc<dyn NetworkService>,
85) -> Result<(), Error>
86where
87    Block: BlockT,
88    AS: AuxStore,
89    Client: HeaderBackend<Block>
90        + ProvideRuntimeApi<Block>
91        + ProofProvider<Block>
92        + BlockImport<Block>
93        + BlockchainEvents<Block>
94        + Send
95        + Sync
96        + 'static,
97    Client::Api:
98        SubspaceApi<Block, PublicKey> + ObjectsApi<Block> + MmrApi<Block, H256, NumberFor<Block>>,
99    PG: PieceGetter,
100    OS: OffchainStorage,
101{
102    let info = client.info();
103    // Only attempt snap sync with genesis state
104    // TODO: Support snap sync from any state once
105    //  https://github.com/paritytech/polkadot-sdk/issues/5366 is resolved
106    if info.best_hash == info.genesis_hash {
107        pause_sync.store(true, Ordering::Release);
108
109        let target_block = if let Some(mut target_block_receiver) = target_block_receiver {
110            match target_block_receiver.recv().await {
111                Ok(target_block) => Some(target_block),
112                Err(err) => {
113                    error!(target: LOG_TARGET, ?err, "Snap sync failed: can't obtain target block.");
114                    return Err(Error::Other(
115                        "Snap sync failed: can't obtain target block.".into(),
116                    ));
117                }
118            }
119        } else {
120            None
121        };
122
123        debug!(target: LOG_TARGET, "Snap sync target block: {:?}", target_block);
124
125        sync(
126            &segment_headers_store,
127            &node,
128            &piece_getter,
129            fork_id.as_deref(),
130            &client,
131            import_queue_service.as_mut(),
132            sync_service.clone(),
133            &network_service_handle,
134            target_block,
135            &erasure_coding,
136            offchain_storage,
137            network_service,
138        )
139        .await?;
140
141        // This will notify Substrate's sync mechanism and allow regular Substrate sync to continue
142        // gracefully
143        {
144            let info = client.info();
145            sync_service.new_best_block_imported(info.best_hash, info.best_number);
146        }
147        pause_sync.store(false, Ordering::Release);
148    } else {
149        debug!(target: LOG_TARGET, "Snap sync can only work with genesis state, skipping");
150    }
151
152    Ok(())
153}
154
155// Get blocks from the last segment or from the segment containing the target block.
156// Returns encoded blocks collection and used segment index.
157pub(crate) async fn get_blocks_from_target_segment<AS, PG>(
158    segment_headers_store: &SegmentHeadersStore<AS>,
159    node: &Node,
160    piece_getter: &PG,
161    target_block: Option<BlockNumber>,
162    erasure_coding: &ErasureCoding,
163) -> Result<Option<(SegmentIndex, VecDeque<(BlockNumber, Vec<u8>)>)>, Error>
164where
165    AS: AuxStore,
166    PG: PieceGetter,
167{
168    sync_segment_headers(segment_headers_store, node)
169        .await
170        .map_err(|error| format!("Failed to sync segment headers: {}", error))?;
171
172    let target_segment_index = {
173        let last_segment_index = segment_headers_store
174            .max_segment_index()
175            .expect("Successfully synced above; qed");
176
177        if let Some(target_block) = target_block {
178            let mut segment_header = segment_headers_store
179                .get_segment_header(last_segment_index)
180                .ok_or(format!(
181                    "Can't get segment header from the store: {last_segment_index}"
182                ))?;
183
184            let mut target_block_exceeded_last_archived_block = false;
185            if target_block > segment_header.last_archived_block().number {
186                warn!(
187                    target: LOG_TARGET,
188                   %last_segment_index,
189                   %target_block,
190
191                    "Specified target block is greater than the last archived block. \
192                     Choosing the last archived block (#{}) as target block...
193                    ",
194                    segment_header.last_archived_block().number
195                );
196                target_block_exceeded_last_archived_block = true;
197            }
198
199            if !target_block_exceeded_last_archived_block {
200                let mut current_segment_index = last_segment_index;
201
202                loop {
203                    if current_segment_index <= SegmentIndex::ONE {
204                        break;
205                    }
206
207                    if target_block > segment_header.last_archived_block().number {
208                        current_segment_index += SegmentIndex::ONE;
209                        break;
210                    }
211
212                    current_segment_index -= SegmentIndex::ONE;
213
214                    segment_header = segment_headers_store
215                        .get_segment_header(current_segment_index)
216                        .ok_or(format!(
217                            "Can't get segment header from the store: {last_segment_index}"
218                        ))?;
219                }
220
221                current_segment_index
222            } else {
223                last_segment_index
224            }
225        } else {
226            last_segment_index
227        }
228    };
229
230    // We don't have the genesis state when we choose to snap sync.
231    if target_segment_index <= SegmentIndex::ONE {
232        // The caller logs this error
233        return Err(Error::SnapSyncImpossible(
234            "Snap sync is impossible - not enough archived history".into(),
235        ));
236    }
237
238    // Identify all segment headers that would need to be reconstructed in order to get first
239    // block of last segment header
240    let mut segments_to_reconstruct = VecDeque::from([target_segment_index]);
241    {
242        let mut last_segment_first_block_number = None;
243
244        loop {
245            let oldest_segment_index = *segments_to_reconstruct.front().expect("Not empty; qed");
246            let segment_index = oldest_segment_index
247                .checked_sub(SegmentIndex::ONE)
248                .ok_or_else(|| {
249                    format!(
250                        "Attempted to get segment index before {oldest_segment_index} during \
251                            snap sync"
252                    )
253                })?;
254            let segment_header = segment_headers_store
255                .get_segment_header(segment_index)
256                .ok_or_else(|| {
257                    format!("Failed to get segment index {segment_index} during snap sync")
258                })?;
259            let last_archived_block = segment_header.last_archived_block();
260
261            // If older segment header ends with fully archived block then no additional
262            // information is necessary
263            if last_archived_block.partial_archived().is_none() {
264                break;
265            }
266
267            match last_segment_first_block_number {
268                Some(block_number) => {
269                    if block_number == last_archived_block.number {
270                        // If older segment ends with the same block number as the first block
271                        // in the last segment then add it to the list of segments that need to
272                        // be reconstructed
273                        segments_to_reconstruct.push_front(segment_index);
274                    } else {
275                        // Otherwise we're done here
276                        break;
277                    }
278                }
279                None => {
280                    last_segment_first_block_number.replace(last_archived_block.number);
281                    // This segment will definitely be needed to reconstruct first block of the
282                    // last segment
283                    segments_to_reconstruct.push_front(segment_index);
284                }
285            }
286        }
287    }
288
289    // Reconstruct blocks of the last segment
290    let mut blocks = VecDeque::new();
291    {
292        let reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
293
294        for segment_index in segments_to_reconstruct {
295            let segment_pieces = download_segment_pieces(segment_index, piece_getter)
296                .await
297                .map_err(|error| format!("Failed to download segment pieces: {error}"))?;
298            // CPU-intensive piece and segment reconstruction code can block the async executor.
299            let segment_contents_fut = task::spawn_blocking({
300                let reconstructor = reconstructor.clone();
301
302                move || {
303                    reconstructor
304                        .lock()
305                        .expect("Panic if previous thread panicked when holding the mutex")
306                        .add_segment(segment_pieces.as_ref())
307                }
308            });
309
310            blocks = VecDeque::from(
311                segment_contents_fut
312                    .await
313                    .expect("Panic if blocking task panicked")
314                    .map_err(|error| error.to_string())?
315                    .blocks,
316            );
317
318            trace!(target: LOG_TARGET, %segment_index, "Segment reconstructed successfully");
319        }
320    }
321
322    Ok(Some((target_segment_index, blocks)))
323}
324
325#[allow(clippy::too_many_arguments)]
326/// Synchronize the blockchain to the target_block (approximate value based on the containing
327/// segment) or to the last archived block. Returns false when sync is skipped.
328async fn sync<PG, AS, Block, Client, IQS, OS, NR>(
329    segment_headers_store: &SegmentHeadersStore<AS>,
330    node: &Node,
331    piece_getter: &PG,
332    fork_id: Option<&str>,
333    client: &Arc<Client>,
334    import_queue_service: &mut IQS,
335    sync_service: Arc<SyncingService<Block>>,
336    network_service_handle: &NetworkServiceHandle,
337    target_block: Option<BlockNumber>,
338    erasure_coding: &ErasureCoding,
339    offchain_storage: Option<OS>,
340    network_request: NR,
341) -> Result<(), Error>
342where
343    PG: PieceGetter,
344    AS: AuxStore,
345    Block: BlockT,
346    Client: HeaderBackend<Block>
347        + ProvideRuntimeApi<Block>
348        + ProofProvider<Block>
349        + BlockImport<Block>
350        + BlockchainEvents<Block>
351        + Send
352        + Sync
353        + 'static,
354    Client::Api:
355        SubspaceApi<Block, PublicKey> + ObjectsApi<Block> + MmrApi<Block, H256, NumberFor<Block>>,
356    IQS: ImportQueueService<Block> + ?Sized,
357    OS: OffchainStorage,
358    NR: NetworkRequest + Sync + Send,
359{
360    debug!(target: LOG_TARGET, "Starting snap sync...");
361
362    let Some((target_segment_index, mut blocks)) = get_blocks_from_target_segment(
363        segment_headers_store,
364        node,
365        piece_getter,
366        target_block,
367        erasure_coding,
368    )
369    .await?
370    else {
371        // Snap-sync skipped
372        return Ok(());
373    };
374
375    debug!(
376        target: LOG_TARGET,
377        "Segments data received. Target segment index: {:?}",
378        target_segment_index
379    );
380
381    let mut blocks_to_import = Vec::with_capacity(blocks.len().saturating_sub(1));
382    let last_block_number;
383
384    // First block is special because we need to download state for it
385    {
386        let (first_block_number, first_block_bytes) = blocks
387            .pop_front()
388            .expect("List of blocks is not empty according to logic above; qed");
389
390        // Sometimes first block is the only block
391        last_block_number = blocks
392            .back()
393            .map_or(first_block_number, |(block_number, _block_bytes)| {
394                *block_number
395            });
396
397        debug!(
398            target: LOG_TARGET,
399            %target_segment_index,
400            %first_block_number,
401            %last_block_number,
402            "Blocks from target segment downloaded"
403        );
404
405        let signed_block = decode_block::<Block>(&first_block_bytes)
406            .map_err(|error| format!("Failed to decode archived block: {error}"))?;
407        drop(first_block_bytes);
408        let (header, extrinsics) = signed_block.block.deconstruct();
409
410        // Download state for the first block, so it can be imported even without doing execution
411        let state = download_state(
412            &header,
413            client,
414            fork_id,
415            &sync_service,
416            network_service_handle,
417        )
418        .await
419        .map_err(|error| {
420            format!("Failed to download state for the first block of target segment: {error}")
421        })?;
422
423        debug!(target: LOG_TARGET, "Downloaded state of the first block of the target segment");
424
425        // Import first block as finalized
426        let mut block = BlockImportParams::new(BlockOrigin::NetworkInitialSync, header);
427        block.body.replace(extrinsics);
428        block.justifications = signed_block.justifications;
429        block.state_action = StateAction::ApplyChanges(StorageChanges::Import(state));
430        block.finalized = true;
431        block.create_gap = false;
432        block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
433        client
434            .import_block(block)
435            .await
436            .map_err(|error| format!("Failed to import first block of target segment: {error}"))?;
437    }
438
439    // download and commit MMR data before importing next set of blocks
440    // since they are imported with block verification, and we need MMR data during the verification
441    let maybe_mmr_sync = if let Some(offchain_storage) = offchain_storage {
442        let mut mmr_sync = MmrSync::new(client.clone(), offchain_storage);
443        // We sync MMR up to the last block number. All other MMR-data will be synced after
444        // resuming either DSN-sync or Substrate-sync.
445        mmr_sync
446            .sync(
447                fork_id.map(|v| v.into()),
448                network_request,
449                sync_service.clone(),
450                last_block_number,
451            )
452            .await?;
453        Some(mmr_sync)
454    } else {
455        None
456    };
457
458    debug!(
459        target: LOG_TARGET,
460        blocks_count = %blocks.len(),
461        "Queuing importing remaining blocks from target segment"
462    );
463
464    for (_block_number, block_bytes) in blocks {
465        let signed_block = decode_block::<Block>(&block_bytes)
466            .map_err(|error| format!("Failed to decode archived block: {error}"))?;
467        let (header, extrinsics) = signed_block.block.deconstruct();
468
469        blocks_to_import.push(IncomingBlock {
470            hash: header.hash(),
471            header: Some(header),
472            body: Some(extrinsics),
473            indexed_body: None,
474            justifications: signed_block.justifications,
475            origin: None,
476            allow_missing_state: false,
477            import_existing: false,
478            skip_execution: false,
479            state: None,
480        });
481    }
482
483    if !blocks_to_import.is_empty() {
484        import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
485    }
486
487    // Wait for blocks to be imported
488    // TODO: Replace this hack with actual watching of block import
489    wait_for_block_import(client.as_ref(), last_block_number.into()).await;
490
491    // verify the MMR sync before finishing up the block import
492    if let Some(mmr_sync) = maybe_mmr_sync {
493        mmr_sync.verify_mmr_data()?;
494    }
495
496    debug!(target: LOG_TARGET, info = ?client.info(), "Snap sync finished successfully");
497
498    Ok(())
499}
500
501async fn sync_segment_headers<AS>(
502    segment_headers_store: &SegmentHeadersStore<AS>,
503    node: &Node,
504) -> Result<(), Error>
505where
506    AS: AuxStore,
507{
508    let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
509        Error::Other(
510            "Archiver needs to be initialized before syncing from DSN to populate the very first \
511            segment"
512                .to_string(),
513        )
514    })?;
515    let new_segment_headers = SegmentHeaderDownloader::new(node)
516        .get_segment_headers(&last_segment_header)
517        .await
518        .map_err(|error| error.to_string())?;
519
520    debug!(target: LOG_TARGET, "Found {} new segment headers", new_segment_headers.len());
521
522    if !new_segment_headers.is_empty() {
523        segment_headers_store.add_segment_headers(&new_segment_headers)?;
524    }
525
526    Ok(())
527}
528
529/// Download and return state for specified block
530async fn download_state<Block, Client>(
531    header: &Block::Header,
532    client: &Arc<Client>,
533    fork_id: Option<&str>,
534    sync_service: &SyncingService<Block>,
535    network_service_handle: &NetworkServiceHandle,
536) -> Result<ImportedState<Block>, Error>
537where
538    Block: BlockT,
539    Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
540{
541    let block_number = *header.number();
542
543    const STATE_SYNC_RETRIES: u32 = 10;
544    const LOOP_PAUSE: Duration = Duration::from_secs(10);
545
546    for attempt in 1..=STATE_SYNC_RETRIES {
547        debug!(target: LOG_TARGET, %attempt, "Starting state sync...");
548
549        debug!(target: LOG_TARGET, "Gathering peers for state sync.");
550        let mut tried_peers = HashSet::<PeerId>::new();
551
552        // TODO: add loop timeout
553        let current_peer_id = loop {
554            let connected_full_peers = sync_service
555                .peers_info()
556                .await
557                .expect("Network service must be available.")
558                .iter()
559                .filter_map(|(peer_id, info)| {
560                    (info.roles.is_full() && info.best_number > block_number).then_some(*peer_id)
561                })
562                .collect::<Vec<_>>();
563
564            debug!(target: LOG_TARGET, ?tried_peers, "Sync peers: {}", connected_full_peers.len());
565
566            let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter());
567
568            if let Some(peer_id) = active_peers_set.difference(&tried_peers).next().cloned() {
569                break peer_id;
570            }
571
572            sleep(LOOP_PAUSE).await;
573        };
574
575        tried_peers.insert(current_peer_id);
576
577        let sync_engine = SnapSyncingEngine::<Block>::new(
578            client.clone(),
579            fork_id,
580            header.clone(),
581            false,
582            (current_peer_id, block_number),
583            network_service_handle,
584        )
585        .map_err(Error::Client)?;
586
587        let last_block_from_sync_result = sync_engine.download_state().await;
588
589        match last_block_from_sync_result {
590            Ok(block_to_import) => {
591                debug!(target: LOG_TARGET, "Sync worker handle result: {:?}", block_to_import);
592
593                return block_to_import.state.ok_or_else(|| {
594                    Error::Other("Imported state was missing in synced block".into())
595                });
596            }
597            Err(error) => {
598                error!(target: LOG_TARGET, %error, "State sync error");
599                continue;
600            }
601        }
602    }
603
604    Err(Error::Other("All snap sync retries failed".into()))
605}