1use crate::mmr::sync::MmrSync;
2use crate::sync_from_dsn::PieceGetter;
3use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
4use crate::utils::wait_for_block_import;
5use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
6use sc_consensus::import_queue::ImportQueueService;
7use sc_consensus::{
8 BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, IncomingBlock, StateAction,
9 StorageChanges,
10};
11use sc_consensus_subspace::archiver::{SegmentHeadersStore, decode_block};
12use sc_network::service::traits::NetworkService;
13use sc_network::{NetworkBlock, NetworkRequest, PeerId};
14use sc_network_sync::SyncingService;
15use sc_network_sync::service::network::NetworkServiceHandle;
16use sc_subspace_sync_common::snap_sync_engine::SnapSyncingEngine;
17use sp_api::ProvideRuntimeApi;
18use sp_blockchain::HeaderBackend;
19use sp_consensus::BlockOrigin;
20use sp_consensus_subspace::SubspaceApi;
21use sp_core::H256;
22use sp_core::offchain::OffchainStorage;
23use sp_mmr_primitives::MmrApi;
24use sp_objects::ObjectsApi;
25use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
26use std::collections::{HashSet, VecDeque};
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::{Arc, Mutex};
29use std::time::Duration;
30use subspace_archiving::reconstructor::Reconstructor;
31use subspace_core_primitives::segments::SegmentIndex;
32use subspace_core_primitives::{BlockNumber, PublicKey};
33use subspace_data_retrieval::segment_downloading::download_segment_pieces;
34use subspace_erasure_coding::ErasureCoding;
35use subspace_networking::Node;
36use tokio::sync::broadcast::Receiver;
37use tokio::task;
38use tokio::time::sleep;
39use tracing::{debug, error, trace, warn};
40
41#[derive(thiserror::Error, Debug)]
43pub enum Error {
44 #[error("Snap Sync requires user action: {0}")]
47 SnapSyncImpossible(String),
48
49 #[error(transparent)]
51 Sub(#[from] sc_service::Error),
52
53 #[error(transparent)]
55 Client(#[from] sp_blockchain::Error),
56
57 #[error("Snap sync error: {0}")]
59 Other(String),
60}
61
62impl From<String> for Error {
63 fn from(error: String) -> Self {
64 Error::Other(error)
65 }
66}
67
68#[allow(clippy::too_many_arguments)]
71pub(crate) async fn snap_sync<Block, AS, Client, PG, OS>(
72 segment_headers_store: SegmentHeadersStore<AS>,
73 node: Node,
74 fork_id: Option<String>,
75 client: Arc<Client>,
76 mut import_queue_service: Box<dyn ImportQueueService<Block>>,
77 pause_sync: Arc<AtomicBool>,
78 piece_getter: PG,
79 sync_service: Arc<SyncingService<Block>>,
80 network_service_handle: NetworkServiceHandle,
81 erasure_coding: ErasureCoding,
82 target_block_receiver: Option<Receiver<BlockNumber>>,
83 offchain_storage: Option<OS>,
84 network_service: Arc<dyn NetworkService>,
85) -> Result<(), Error>
86where
87 Block: BlockT,
88 AS: AuxStore,
89 Client: HeaderBackend<Block>
90 + ProvideRuntimeApi<Block>
91 + ProofProvider<Block>
92 + BlockImport<Block>
93 + BlockchainEvents<Block>
94 + Send
95 + Sync
96 + 'static,
97 Client::Api:
98 SubspaceApi<Block, PublicKey> + ObjectsApi<Block> + MmrApi<Block, H256, NumberFor<Block>>,
99 PG: PieceGetter,
100 OS: OffchainStorage,
101{
102 let info = client.info();
103 if info.best_hash == info.genesis_hash {
107 pause_sync.store(true, Ordering::Release);
108
109 let target_block = if let Some(mut target_block_receiver) = target_block_receiver {
110 match target_block_receiver.recv().await {
111 Ok(target_block) => Some(target_block),
112 Err(err) => {
113 error!(?err, "Snap sync failed: can't obtain target block.");
114 return Err(Error::Other(
115 "Snap sync failed: can't obtain target block.".into(),
116 ));
117 }
118 }
119 } else {
120 None
121 };
122
123 debug!("Snap sync target block: {:?}", target_block);
124
125 sync(
126 &segment_headers_store,
127 &node,
128 &piece_getter,
129 fork_id.as_deref(),
130 &client,
131 import_queue_service.as_mut(),
132 sync_service.clone(),
133 &network_service_handle,
134 target_block,
135 &erasure_coding,
136 offchain_storage,
137 network_service,
138 )
139 .await?;
140
141 {
144 let info = client.info();
145 sync_service.new_best_block_imported(info.best_hash, info.best_number);
146 }
147 pause_sync.store(false, Ordering::Release);
148 } else {
149 debug!("Snap sync can only work with genesis state, skipping");
150 }
151
152 Ok(())
153}
154
155pub(crate) async fn get_blocks_from_target_segment<AS, PG>(
158 segment_headers_store: &SegmentHeadersStore<AS>,
159 node: &Node,
160 piece_getter: &PG,
161 target_block: Option<BlockNumber>,
162 erasure_coding: &ErasureCoding,
163) -> Result<Option<(SegmentIndex, VecDeque<(BlockNumber, Vec<u8>)>)>, Error>
164where
165 AS: AuxStore,
166 PG: PieceGetter,
167{
168 sync_segment_headers(segment_headers_store, node)
169 .await
170 .map_err(|error| format!("Failed to sync segment headers: {error}"))?;
171
172 let target_segment_index = {
173 let last_segment_index = segment_headers_store
174 .max_segment_index()
175 .expect("Successfully synced above; qed");
176
177 if let Some(target_block) = target_block {
178 let mut segment_header = segment_headers_store
179 .get_segment_header(last_segment_index)
180 .ok_or(format!(
181 "Can't get segment header from the store: {last_segment_index}"
182 ))?;
183
184 let mut target_block_exceeded_last_archived_block = false;
185 if target_block > segment_header.last_archived_block().number {
186 warn!(
187 %last_segment_index,
188 %target_block,
189
190 "Specified target block is greater than the last archived block. \
191 Choosing the last archived block (#{}) as target block...
192 ",
193 segment_header.last_archived_block().number
194 );
195 target_block_exceeded_last_archived_block = true;
196 }
197
198 if !target_block_exceeded_last_archived_block {
199 let mut current_segment_index = last_segment_index;
200
201 loop {
202 if current_segment_index <= SegmentIndex::ONE {
203 break;
204 }
205
206 if target_block > segment_header.last_archived_block().number {
207 current_segment_index += SegmentIndex::ONE;
208 break;
209 }
210
211 current_segment_index -= SegmentIndex::ONE;
212
213 segment_header = segment_headers_store
214 .get_segment_header(current_segment_index)
215 .ok_or(format!(
216 "Can't get segment header from the store: {last_segment_index}"
217 ))?;
218 }
219
220 current_segment_index
221 } else {
222 last_segment_index
223 }
224 } else {
225 last_segment_index
226 }
227 };
228
229 if target_segment_index <= SegmentIndex::ONE {
231 return Err(Error::SnapSyncImpossible(
233 "Snap sync is impossible - not enough archived history".into(),
234 ));
235 }
236
237 let mut segments_to_reconstruct = VecDeque::from([target_segment_index]);
240 {
241 let mut last_segment_first_block_number = None;
242
243 loop {
244 let oldest_segment_index = *segments_to_reconstruct.front().expect("Not empty; qed");
245 let segment_index = oldest_segment_index
246 .checked_sub(SegmentIndex::ONE)
247 .ok_or_else(|| {
248 format!(
249 "Attempted to get segment index before {oldest_segment_index} during \
250 snap sync"
251 )
252 })?;
253 let segment_header = segment_headers_store
254 .get_segment_header(segment_index)
255 .ok_or_else(|| {
256 format!("Failed to get segment index {segment_index} during snap sync")
257 })?;
258 let last_archived_block = segment_header.last_archived_block();
259
260 if last_archived_block.partial_archived().is_none() {
263 break;
264 }
265
266 match last_segment_first_block_number {
267 Some(block_number) => {
268 if block_number == last_archived_block.number {
269 segments_to_reconstruct.push_front(segment_index);
273 } else {
274 break;
276 }
277 }
278 None => {
279 last_segment_first_block_number.replace(last_archived_block.number);
280 segments_to_reconstruct.push_front(segment_index);
283 }
284 }
285 }
286 }
287
288 let mut blocks = VecDeque::new();
290 {
291 let reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
292
293 for segment_index in segments_to_reconstruct {
294 let segment_pieces = download_segment_pieces(segment_index, piece_getter)
295 .await
296 .map_err(|error| format!("Failed to download segment pieces: {error}"))?;
297 let segment_contents_fut = task::spawn_blocking({
299 let reconstructor = reconstructor.clone();
300
301 move || {
302 reconstructor
303 .lock()
304 .expect("Panic if previous thread panicked when holding the mutex")
305 .add_segment(segment_pieces.as_ref())
306 }
307 });
308
309 blocks = VecDeque::from(
310 segment_contents_fut
311 .await
312 .expect("Panic if blocking task panicked")
313 .map_err(|error| error.to_string())?
314 .blocks,
315 );
316
317 trace!( %segment_index, "Segment reconstructed successfully");
318 }
319 }
320
321 Ok(Some((target_segment_index, blocks)))
322}
323
324#[allow(clippy::too_many_arguments)]
325async fn sync<PG, AS, Block, Client, IQS, OS, NR>(
328 segment_headers_store: &SegmentHeadersStore<AS>,
329 node: &Node,
330 piece_getter: &PG,
331 fork_id: Option<&str>,
332 client: &Arc<Client>,
333 import_queue_service: &mut IQS,
334 sync_service: Arc<SyncingService<Block>>,
335 network_service_handle: &NetworkServiceHandle,
336 target_block: Option<BlockNumber>,
337 erasure_coding: &ErasureCoding,
338 offchain_storage: Option<OS>,
339 network_request: NR,
340) -> Result<(), Error>
341where
342 PG: PieceGetter,
343 AS: AuxStore,
344 Block: BlockT,
345 Client: HeaderBackend<Block>
346 + ProvideRuntimeApi<Block>
347 + ProofProvider<Block>
348 + BlockImport<Block>
349 + BlockchainEvents<Block>
350 + Send
351 + Sync
352 + 'static,
353 Client::Api:
354 SubspaceApi<Block, PublicKey> + ObjectsApi<Block> + MmrApi<Block, H256, NumberFor<Block>>,
355 IQS: ImportQueueService<Block> + ?Sized,
356 OS: OffchainStorage,
357 NR: NetworkRequest + Sync + Send,
358{
359 debug!("Starting snap sync...");
360
361 let Some((target_segment_index, mut blocks)) = get_blocks_from_target_segment(
362 segment_headers_store,
363 node,
364 piece_getter,
365 target_block,
366 erasure_coding,
367 )
368 .await?
369 else {
370 return Ok(());
372 };
373
374 debug!(
375 "Segments data received. Target segment index: {:?}",
376 target_segment_index
377 );
378
379 let mut blocks_to_import = Vec::with_capacity(blocks.len().saturating_sub(1));
380 let last_block_number;
381
382 {
384 let (first_block_number, first_block_bytes) = blocks
385 .pop_front()
386 .expect("List of blocks is not empty according to logic above; qed");
387
388 last_block_number = blocks
390 .back()
391 .map_or(first_block_number, |(block_number, _block_bytes)| {
392 *block_number
393 });
394
395 debug!(
396 %target_segment_index,
397 %first_block_number,
398 %last_block_number,
399 "Blocks from target segment downloaded"
400 );
401
402 let signed_block = decode_block::<Block>(&first_block_bytes)
403 .map_err(|error| format!("Failed to decode archived block: {error}"))?;
404 drop(first_block_bytes);
405 let (header, extrinsics) = signed_block.block.deconstruct();
406
407 let state = download_state(
409 &header,
410 client,
411 fork_id,
412 &sync_service,
413 network_service_handle,
414 )
415 .await
416 .map_err(|error| {
417 format!("Failed to download state for the first block of target segment: {error}")
418 })?;
419
420 debug!("Downloaded state of the first block of the target segment");
421
422 let mut block = BlockImportParams::new(BlockOrigin::NetworkInitialSync, header);
424 block.body.replace(extrinsics);
425 block.justifications = signed_block.justifications;
426 block.state_action = StateAction::ApplyChanges(StorageChanges::Import(state));
427 block.finalized = true;
428 block.create_gap = false;
429 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
430 client
431 .import_block(block)
432 .await
433 .map_err(|error| format!("Failed to import first block of target segment: {error}"))?;
434 }
435
436 let maybe_mmr_sync = if let Some(offchain_storage) = offchain_storage {
439 let mut mmr_sync = MmrSync::new(client.clone(), offchain_storage);
440 mmr_sync
443 .sync(
444 fork_id.map(|v| v.into()),
445 network_request,
446 sync_service.clone(),
447 last_block_number,
448 )
449 .await?;
450 Some(mmr_sync)
451 } else {
452 None
453 };
454
455 debug!(
456 blocks_count = %blocks.len(),
457 "Queuing importing remaining blocks from target segment"
458 );
459
460 for (_block_number, block_bytes) in blocks {
461 let signed_block = decode_block::<Block>(&block_bytes)
462 .map_err(|error| format!("Failed to decode archived block: {error}"))?;
463 let (header, extrinsics) = signed_block.block.deconstruct();
464
465 blocks_to_import.push(IncomingBlock {
466 hash: header.hash(),
467 header: Some(header),
468 body: Some(extrinsics),
469 indexed_body: None,
470 justifications: signed_block.justifications,
471 origin: None,
472 allow_missing_state: false,
473 import_existing: false,
474 skip_execution: false,
475 state: None,
476 });
477 }
478
479 if !blocks_to_import.is_empty() {
480 import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
481 }
482
483 wait_for_block_import(client.as_ref(), last_block_number.into()).await;
486
487 if let Some(mmr_sync) = maybe_mmr_sync {
489 mmr_sync.verify_mmr_data()?;
490 }
491
492 debug!( info = ?client.info(), "Snap sync finished successfully");
493
494 Ok(())
495}
496
497async fn sync_segment_headers<AS>(
498 segment_headers_store: &SegmentHeadersStore<AS>,
499 node: &Node,
500) -> Result<(), Error>
501where
502 AS: AuxStore,
503{
504 let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
505 Error::Other(
506 "Archiver needs to be initialized before syncing from DSN to populate the very first \
507 segment"
508 .to_string(),
509 )
510 })?;
511 let new_segment_headers = SegmentHeaderDownloader::new(node)
512 .get_segment_headers(&last_segment_header)
513 .await
514 .map_err(|error| error.to_string())?;
515
516 debug!("Found {} new segment headers", new_segment_headers.len());
517
518 if !new_segment_headers.is_empty() {
519 segment_headers_store.add_segment_headers(&new_segment_headers)?;
520 }
521
522 Ok(())
523}
524
525async fn download_state<Block, Client>(
527 header: &Block::Header,
528 client: &Arc<Client>,
529 fork_id: Option<&str>,
530 sync_service: &SyncingService<Block>,
531 network_service_handle: &NetworkServiceHandle,
532) -> Result<ImportedState<Block>, Error>
533where
534 Block: BlockT,
535 Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
536{
537 let block_number = *header.number();
538
539 const STATE_SYNC_RETRIES: u32 = 10;
540 const LOOP_PAUSE: Duration = Duration::from_secs(10);
541
542 for attempt in 1..=STATE_SYNC_RETRIES {
543 debug!( %attempt, "Starting state sync...");
544
545 debug!("Gathering peers for state sync.");
546 let mut tried_peers = HashSet::<PeerId>::new();
547
548 let current_peer_id = loop {
550 let connected_full_peers = sync_service
551 .peers_info()
552 .await
553 .expect("Network service must be available.")
554 .iter()
555 .filter_map(|(peer_id, info)| {
556 (info.roles.is_full() && info.best_number > block_number).then_some(*peer_id)
557 })
558 .collect::<Vec<_>>();
559
560 debug!(?tried_peers, "Sync peers: {}", connected_full_peers.len());
561
562 let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter());
563
564 if let Some(peer_id) = active_peers_set.difference(&tried_peers).next().cloned() {
565 break peer_id;
566 }
567
568 sleep(LOOP_PAUSE).await;
569 };
570
571 tried_peers.insert(current_peer_id);
572
573 let sync_engine = SnapSyncingEngine::<Block>::new(
574 client.clone(),
575 fork_id,
576 header.clone(),
577 false,
578 (current_peer_id, block_number),
579 network_service_handle,
580 )
581 .map_err(Error::Client)?;
582
583 let last_block_from_sync_result = sync_engine.download_state().await;
584
585 match last_block_from_sync_result {
586 Ok(block_to_import) => {
587 debug!("Sync worker handle result: {:?}", block_to_import);
588
589 return block_to_import.state.ok_or_else(|| {
590 Error::Other("Imported state was missing in synced block".into())
591 });
592 }
593 Err(error) => {
594 error!( %error, "State sync error");
595 continue;
596 }
597 }
598 }
599
600 Err(Error::Other("All snap sync retries failed".into()))
601}