1use crate::mmr::sync::MmrSync;
2use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
3use crate::sync_from_dsn::{PieceGetter, LOG_TARGET};
4use crate::utils::wait_for_block_import;
5use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
6use sc_consensus::import_queue::ImportQueueService;
7use sc_consensus::{
8 BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, IncomingBlock, StateAction,
9 StorageChanges,
10};
11use sc_consensus_subspace::archiver::{decode_block, SegmentHeadersStore};
12use sc_network::service::traits::NetworkService;
13use sc_network::{NetworkBlock, NetworkRequest, PeerId};
14use sc_network_sync::service::network::NetworkServiceHandle;
15use sc_network_sync::SyncingService;
16use sc_subspace_sync_common::snap_sync_engine::SnapSyncingEngine;
17use sp_api::ProvideRuntimeApi;
18use sp_blockchain::HeaderBackend;
19use sp_consensus::BlockOrigin;
20use sp_consensus_subspace::SubspaceApi;
21use sp_core::offchain::OffchainStorage;
22use sp_core::H256;
23use sp_mmr_primitives::MmrApi;
24use sp_objects::ObjectsApi;
25use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
26use std::collections::{HashSet, VecDeque};
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::{Arc, Mutex};
29use std::time::Duration;
30use subspace_archiving::reconstructor::Reconstructor;
31use subspace_core_primitives::segments::SegmentIndex;
32use subspace_core_primitives::{BlockNumber, PublicKey};
33use subspace_data_retrieval::segment_downloading::download_segment_pieces;
34use subspace_erasure_coding::ErasureCoding;
35use subspace_networking::Node;
36use tokio::sync::broadcast::Receiver;
37use tokio::task;
38use tokio::time::sleep;
39use tracing::{debug, error, trace, warn};
40
41#[derive(thiserror::Error, Debug)]
43pub enum Error {
44 #[error("Snap Sync requires user action: {0}")]
47 SnapSyncImpossible(String),
48
49 #[error(transparent)]
51 Sub(#[from] sc_service::Error),
52
53 #[error(transparent)]
55 Client(#[from] sp_blockchain::Error),
56
57 #[error("Snap sync error: {0}")]
59 Other(String),
60}
61
62impl From<String> for Error {
63 fn from(error: String) -> Self {
64 Error::Other(error)
65 }
66}
67
68#[allow(clippy::too_many_arguments)]
71pub(crate) async fn snap_sync<Block, AS, Client, PG, OS>(
72 segment_headers_store: SegmentHeadersStore<AS>,
73 node: Node,
74 fork_id: Option<String>,
75 client: Arc<Client>,
76 mut import_queue_service: Box<dyn ImportQueueService<Block>>,
77 pause_sync: Arc<AtomicBool>,
78 piece_getter: PG,
79 sync_service: Arc<SyncingService<Block>>,
80 network_service_handle: NetworkServiceHandle,
81 erasure_coding: ErasureCoding,
82 target_block_receiver: Option<Receiver<BlockNumber>>,
83 offchain_storage: Option<OS>,
84 network_service: Arc<dyn NetworkService>,
85) -> Result<(), Error>
86where
87 Block: BlockT,
88 AS: AuxStore,
89 Client: HeaderBackend<Block>
90 + ProvideRuntimeApi<Block>
91 + ProofProvider<Block>
92 + BlockImport<Block>
93 + BlockchainEvents<Block>
94 + Send
95 + Sync
96 + 'static,
97 Client::Api:
98 SubspaceApi<Block, PublicKey> + ObjectsApi<Block> + MmrApi<Block, H256, NumberFor<Block>>,
99 PG: PieceGetter,
100 OS: OffchainStorage,
101{
102 let info = client.info();
103 if info.best_hash == info.genesis_hash {
107 pause_sync.store(true, Ordering::Release);
108
109 let target_block = if let Some(mut target_block_receiver) = target_block_receiver {
110 match target_block_receiver.recv().await {
111 Ok(target_block) => Some(target_block),
112 Err(err) => {
113 error!(target: LOG_TARGET, ?err, "Snap sync failed: can't obtain target block.");
114 return Err(Error::Other(
115 "Snap sync failed: can't obtain target block.".into(),
116 ));
117 }
118 }
119 } else {
120 None
121 };
122
123 debug!(target: LOG_TARGET, "Snap sync target block: {:?}", target_block);
124
125 sync(
126 &segment_headers_store,
127 &node,
128 &piece_getter,
129 fork_id.as_deref(),
130 &client,
131 import_queue_service.as_mut(),
132 sync_service.clone(),
133 &network_service_handle,
134 target_block,
135 &erasure_coding,
136 offchain_storage,
137 network_service,
138 )
139 .await?;
140
141 {
144 let info = client.info();
145 sync_service.new_best_block_imported(info.best_hash, info.best_number);
146 }
147 pause_sync.store(false, Ordering::Release);
148 } else {
149 debug!(target: LOG_TARGET, "Snap sync can only work with genesis state, skipping");
150 }
151
152 Ok(())
153}
154
155pub(crate) async fn get_blocks_from_target_segment<AS, PG>(
158 segment_headers_store: &SegmentHeadersStore<AS>,
159 node: &Node,
160 piece_getter: &PG,
161 target_block: Option<BlockNumber>,
162 erasure_coding: &ErasureCoding,
163) -> Result<Option<(SegmentIndex, VecDeque<(BlockNumber, Vec<u8>)>)>, Error>
164where
165 AS: AuxStore,
166 PG: PieceGetter,
167{
168 sync_segment_headers(segment_headers_store, node)
169 .await
170 .map_err(|error| format!("Failed to sync segment headers: {}", error))?;
171
172 let target_segment_index = {
173 let last_segment_index = segment_headers_store
174 .max_segment_index()
175 .expect("Successfully synced above; qed");
176
177 if let Some(target_block) = target_block {
178 let mut segment_header = segment_headers_store
179 .get_segment_header(last_segment_index)
180 .ok_or(format!(
181 "Can't get segment header from the store: {last_segment_index}"
182 ))?;
183
184 let mut target_block_exceeded_last_archived_block = false;
185 if target_block > segment_header.last_archived_block().number {
186 warn!(
187 target: LOG_TARGET,
188 %last_segment_index,
189 %target_block,
190
191 "Specified target block is greater than the last archived block. \
192 Choosing the last archived block (#{}) as target block...
193 ",
194 segment_header.last_archived_block().number
195 );
196 target_block_exceeded_last_archived_block = true;
197 }
198
199 if !target_block_exceeded_last_archived_block {
200 let mut current_segment_index = last_segment_index;
201
202 loop {
203 if current_segment_index <= SegmentIndex::ONE {
204 break;
205 }
206
207 if target_block > segment_header.last_archived_block().number {
208 current_segment_index += SegmentIndex::ONE;
209 break;
210 }
211
212 current_segment_index -= SegmentIndex::ONE;
213
214 segment_header = segment_headers_store
215 .get_segment_header(current_segment_index)
216 .ok_or(format!(
217 "Can't get segment header from the store: {last_segment_index}"
218 ))?;
219 }
220
221 current_segment_index
222 } else {
223 last_segment_index
224 }
225 } else {
226 last_segment_index
227 }
228 };
229
230 if target_segment_index <= SegmentIndex::ONE {
232 return Err(Error::SnapSyncImpossible(
234 "Snap sync is impossible - not enough archived history".into(),
235 ));
236 }
237
238 let mut segments_to_reconstruct = VecDeque::from([target_segment_index]);
241 {
242 let mut last_segment_first_block_number = None;
243
244 loop {
245 let oldest_segment_index = *segments_to_reconstruct.front().expect("Not empty; qed");
246 let segment_index = oldest_segment_index
247 .checked_sub(SegmentIndex::ONE)
248 .ok_or_else(|| {
249 format!(
250 "Attempted to get segment index before {oldest_segment_index} during \
251 snap sync"
252 )
253 })?;
254 let segment_header = segment_headers_store
255 .get_segment_header(segment_index)
256 .ok_or_else(|| {
257 format!("Failed to get segment index {segment_index} during snap sync")
258 })?;
259 let last_archived_block = segment_header.last_archived_block();
260
261 if last_archived_block.partial_archived().is_none() {
264 break;
265 }
266
267 match last_segment_first_block_number {
268 Some(block_number) => {
269 if block_number == last_archived_block.number {
270 segments_to_reconstruct.push_front(segment_index);
274 } else {
275 break;
277 }
278 }
279 None => {
280 last_segment_first_block_number.replace(last_archived_block.number);
281 segments_to_reconstruct.push_front(segment_index);
284 }
285 }
286 }
287 }
288
289 let mut blocks = VecDeque::new();
291 {
292 let reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
293
294 for segment_index in segments_to_reconstruct {
295 let segment_pieces = download_segment_pieces(segment_index, piece_getter)
296 .await
297 .map_err(|error| format!("Failed to download segment pieces: {error}"))?;
298 let segment_contents_fut = task::spawn_blocking({
300 let reconstructor = reconstructor.clone();
301
302 move || {
303 reconstructor
304 .lock()
305 .expect("Panic if previous thread panicked when holding the mutex")
306 .add_segment(segment_pieces.as_ref())
307 }
308 });
309
310 blocks = VecDeque::from(
311 segment_contents_fut
312 .await
313 .expect("Panic if blocking task panicked")
314 .map_err(|error| error.to_string())?
315 .blocks,
316 );
317
318 trace!(target: LOG_TARGET, %segment_index, "Segment reconstructed successfully");
319 }
320 }
321
322 Ok(Some((target_segment_index, blocks)))
323}
324
325#[allow(clippy::too_many_arguments)]
326async fn sync<PG, AS, Block, Client, IQS, OS, NR>(
329 segment_headers_store: &SegmentHeadersStore<AS>,
330 node: &Node,
331 piece_getter: &PG,
332 fork_id: Option<&str>,
333 client: &Arc<Client>,
334 import_queue_service: &mut IQS,
335 sync_service: Arc<SyncingService<Block>>,
336 network_service_handle: &NetworkServiceHandle,
337 target_block: Option<BlockNumber>,
338 erasure_coding: &ErasureCoding,
339 offchain_storage: Option<OS>,
340 network_request: NR,
341) -> Result<(), Error>
342where
343 PG: PieceGetter,
344 AS: AuxStore,
345 Block: BlockT,
346 Client: HeaderBackend<Block>
347 + ProvideRuntimeApi<Block>
348 + ProofProvider<Block>
349 + BlockImport<Block>
350 + BlockchainEvents<Block>
351 + Send
352 + Sync
353 + 'static,
354 Client::Api:
355 SubspaceApi<Block, PublicKey> + ObjectsApi<Block> + MmrApi<Block, H256, NumberFor<Block>>,
356 IQS: ImportQueueService<Block> + ?Sized,
357 OS: OffchainStorage,
358 NR: NetworkRequest + Sync + Send,
359{
360 debug!(target: LOG_TARGET, "Starting snap sync...");
361
362 let Some((target_segment_index, mut blocks)) = get_blocks_from_target_segment(
363 segment_headers_store,
364 node,
365 piece_getter,
366 target_block,
367 erasure_coding,
368 )
369 .await?
370 else {
371 return Ok(());
373 };
374
375 debug!(
376 target: LOG_TARGET,
377 "Segments data received. Target segment index: {:?}",
378 target_segment_index
379 );
380
381 let mut blocks_to_import = Vec::with_capacity(blocks.len().saturating_sub(1));
382 let last_block_number;
383
384 {
386 let (first_block_number, first_block_bytes) = blocks
387 .pop_front()
388 .expect("List of blocks is not empty according to logic above; qed");
389
390 last_block_number = blocks
392 .back()
393 .map_or(first_block_number, |(block_number, _block_bytes)| {
394 *block_number
395 });
396
397 debug!(
398 target: LOG_TARGET,
399 %target_segment_index,
400 %first_block_number,
401 %last_block_number,
402 "Blocks from target segment downloaded"
403 );
404
405 let signed_block = decode_block::<Block>(&first_block_bytes)
406 .map_err(|error| format!("Failed to decode archived block: {error}"))?;
407 drop(first_block_bytes);
408 let (header, extrinsics) = signed_block.block.deconstruct();
409
410 let state = download_state(
412 &header,
413 client,
414 fork_id,
415 &sync_service,
416 network_service_handle,
417 )
418 .await
419 .map_err(|error| {
420 format!("Failed to download state for the first block of target segment: {error}")
421 })?;
422
423 debug!(target: LOG_TARGET, "Downloaded state of the first block of the target segment");
424
425 let mut block = BlockImportParams::new(BlockOrigin::NetworkInitialSync, header);
427 block.body.replace(extrinsics);
428 block.justifications = signed_block.justifications;
429 block.state_action = StateAction::ApplyChanges(StorageChanges::Import(state));
430 block.finalized = true;
431 block.create_gap = false;
432 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
433 client
434 .import_block(block)
435 .await
436 .map_err(|error| format!("Failed to import first block of target segment: {error}"))?;
437 }
438
439 let maybe_mmr_sync = if let Some(offchain_storage) = offchain_storage {
442 let mut mmr_sync = MmrSync::new(client.clone(), offchain_storage);
443 mmr_sync
446 .sync(
447 fork_id.map(|v| v.into()),
448 network_request,
449 sync_service.clone(),
450 last_block_number,
451 )
452 .await?;
453 Some(mmr_sync)
454 } else {
455 None
456 };
457
458 debug!(
459 target: LOG_TARGET,
460 blocks_count = %blocks.len(),
461 "Queuing importing remaining blocks from target segment"
462 );
463
464 for (_block_number, block_bytes) in blocks {
465 let signed_block = decode_block::<Block>(&block_bytes)
466 .map_err(|error| format!("Failed to decode archived block: {error}"))?;
467 let (header, extrinsics) = signed_block.block.deconstruct();
468
469 blocks_to_import.push(IncomingBlock {
470 hash: header.hash(),
471 header: Some(header),
472 body: Some(extrinsics),
473 indexed_body: None,
474 justifications: signed_block.justifications,
475 origin: None,
476 allow_missing_state: false,
477 import_existing: false,
478 skip_execution: false,
479 state: None,
480 });
481 }
482
483 if !blocks_to_import.is_empty() {
484 import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
485 }
486
487 wait_for_block_import(client.as_ref(), last_block_number.into()).await;
490
491 if let Some(mmr_sync) = maybe_mmr_sync {
493 mmr_sync.verify_mmr_data()?;
494 }
495
496 debug!(target: LOG_TARGET, info = ?client.info(), "Snap sync finished successfully");
497
498 Ok(())
499}
500
501async fn sync_segment_headers<AS>(
502 segment_headers_store: &SegmentHeadersStore<AS>,
503 node: &Node,
504) -> Result<(), Error>
505where
506 AS: AuxStore,
507{
508 let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
509 Error::Other(
510 "Archiver needs to be initialized before syncing from DSN to populate the very first \
511 segment"
512 .to_string(),
513 )
514 })?;
515 let new_segment_headers = SegmentHeaderDownloader::new(node)
516 .get_segment_headers(&last_segment_header)
517 .await
518 .map_err(|error| error.to_string())?;
519
520 debug!(target: LOG_TARGET, "Found {} new segment headers", new_segment_headers.len());
521
522 if !new_segment_headers.is_empty() {
523 segment_headers_store.add_segment_headers(&new_segment_headers)?;
524 }
525
526 Ok(())
527}
528
529async fn download_state<Block, Client>(
531 header: &Block::Header,
532 client: &Arc<Client>,
533 fork_id: Option<&str>,
534 sync_service: &SyncingService<Block>,
535 network_service_handle: &NetworkServiceHandle,
536) -> Result<ImportedState<Block>, Error>
537where
538 Block: BlockT,
539 Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
540{
541 let block_number = *header.number();
542
543 const STATE_SYNC_RETRIES: u32 = 10;
544 const LOOP_PAUSE: Duration = Duration::from_secs(10);
545
546 for attempt in 1..=STATE_SYNC_RETRIES {
547 debug!(target: LOG_TARGET, %attempt, "Starting state sync...");
548
549 debug!(target: LOG_TARGET, "Gathering peers for state sync.");
550 let mut tried_peers = HashSet::<PeerId>::new();
551
552 let current_peer_id = loop {
554 let connected_full_peers = sync_service
555 .peers_info()
556 .await
557 .expect("Network service must be available.")
558 .iter()
559 .filter_map(|(peer_id, info)| {
560 (info.roles.is_full() && info.best_number > block_number).then_some(*peer_id)
561 })
562 .collect::<Vec<_>>();
563
564 debug!(target: LOG_TARGET, ?tried_peers, "Sync peers: {}", connected_full_peers.len());
565
566 let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter());
567
568 if let Some(peer_id) = active_peers_set.difference(&tried_peers).next().cloned() {
569 break peer_id;
570 }
571
572 sleep(LOOP_PAUSE).await;
573 };
574
575 tried_peers.insert(current_peer_id);
576
577 let sync_engine = SnapSyncingEngine::<Block>::new(
578 client.clone(),
579 fork_id,
580 header.clone(),
581 false,
582 (current_peer_id, block_number),
583 network_service_handle,
584 )
585 .map_err(Error::Client)?;
586
587 let last_block_from_sync_result = sync_engine.download_state().await;
588
589 match last_block_from_sync_result {
590 Ok(block_to_import) => {
591 debug!(target: LOG_TARGET, "Sync worker handle result: {:?}", block_to_import);
592
593 return block_to_import.state.ok_or_else(|| {
594 Error::Other("Imported state was missing in synced block".into())
595 });
596 }
597 Err(error) => {
598 error!(target: LOG_TARGET, %error, "State sync error");
599 continue;
600 }
601 }
602 }
603
604 Err(Error::Other("All snap sync retries failed".into()))
605}