subspace_service/sync_from_dsn/
import_blocks.rs1use crate::sync_from_dsn::PieceGetter;
2use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
3use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
4use sc_consensus::IncomingBlock;
5use sc_consensus::import_queue::ImportQueueService;
6use sc_consensus_subspace::archiver::{SegmentHeadersStore, decode_block, encode_block};
7use sc_service::Error;
8use sc_tracing::tracing::{debug, info, trace};
9use sp_consensus::BlockOrigin;
10use sp_runtime::Saturating;
11use sp_runtime::generic::SignedBlock;
12use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One};
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15use subspace_archiving::reconstructor::Reconstructor;
16use subspace_core_primitives::BlockNumber;
17use subspace_core_primitives::segments::SegmentIndex;
18use subspace_data_retrieval::segment_downloading::download_segment_pieces;
19use subspace_erasure_coding::ErasureCoding;
20use tokio::task;
21
22const QUEUED_BLOCKS_LIMIT: BlockNumber = 500;
25const WAIT_FOR_BLOCKS_TO_IMPORT: Duration = Duration::from_secs(1);
27
28#[allow(clippy::too_many_arguments)]
32pub(super) async fn import_blocks_from_dsn<Block, AS, Client, PG, IQS>(
33 segment_headers_store: &SegmentHeadersStore<AS>,
34 segment_header_downloader: &SegmentHeaderDownloader,
35 client: &Client,
36 piece_getter: &PG,
37 import_queue_service: &mut IQS,
38 last_completed_segment_index: &mut SegmentIndex,
39 last_processed_block_number: &mut NumberFor<Block>,
40 erasure_coding: &ErasureCoding,
41) -> Result<u64, Error>
42where
43 Block: BlockT,
44 AS: AuxStore + Send + Sync + 'static,
45 Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
46 PG: PieceGetter,
47 IQS: ImportQueueService<Block> + ?Sized,
48{
49 {
50 let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
51 Error::Other(
52 "Archiver needs to be initialized before syncing from DSN to populate the very \
53 first segment"
54 .to_string(),
55 )
56 })?;
57
58 let new_segment_headers = segment_header_downloader
59 .get_segment_headers(&last_segment_header)
60 .await
61 .map_err(|error| error.to_string())?;
62
63 debug!("Found {} new segment headers", new_segment_headers.len());
64
65 if !new_segment_headers.is_empty() {
66 segment_headers_store.add_segment_headers(&new_segment_headers)?;
67 }
68 }
69
70 let mut imported_blocks = 0;
71 let mut reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
72 let segment_indices_iter = (*last_completed_segment_index + SegmentIndex::ONE)
74 ..=segment_headers_store
75 .max_segment_index()
76 .expect("Exists, we have inserted segment headers above; qed");
77 let mut segment_indices_iter = segment_indices_iter.peekable();
78
79 while let Some(segment_index) = segment_indices_iter.next() {
80 debug!( %segment_index, "Processing segment");
81
82 let segment_header = segment_headers_store
83 .get_segment_header(segment_index)
84 .expect("Statically guaranteed to exist, see checks above; qed");
85
86 let last_archived_maybe_partial_block_number = segment_header.last_archived_block().number;
87 let last_archived_block_partial = segment_header
88 .last_archived_block()
89 .archived_progress
90 .partial()
91 .is_some();
92
93 trace!(
94 %segment_index,
95 last_archived_maybe_partial_block_number,
96 last_archived_block_partial,
97 "Checking segment header"
98 );
99
100 let info = client.info();
101 let last_archived_maybe_partial_block_number =
102 NumberFor::<Block>::from(last_archived_maybe_partial_block_number);
103 if *last_processed_block_number >= last_archived_maybe_partial_block_number {
107 debug!(
108 %segment_index,
109 %last_processed_block_number,
110 %last_archived_maybe_partial_block_number,
111 %last_archived_block_partial,
112 "Already processed last (possibly partial) block in segment, resetting reconstructor",
113 );
114 *last_completed_segment_index = segment_index;
115 reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
117 continue;
118 }
119 if last_archived_maybe_partial_block_number == *last_processed_block_number + One::one()
122 && last_archived_block_partial
123 {
124 if segment_indices_iter.peek().is_none() {
125 *last_completed_segment_index = segment_index.saturating_sub(SegmentIndex::ONE);
127
128 debug!(
133 %segment_index,
134 %last_processed_block_number,
135 %last_archived_maybe_partial_block_number,
136 %last_archived_block_partial,
137 "No more segments, snap sync is about to finish",
138 );
139 continue;
140 } else {
141 info!(
147 %segment_index,
148 %last_processed_block_number,
149 %last_archived_maybe_partial_block_number,
150 %last_archived_block_partial,
151 "Downloading entire segment for one partial block",
152 );
153 }
154 }
155
156 let segment_pieces = download_segment_pieces(segment_index, piece_getter)
157 .await
158 .map_err(|error| format!("Failed to download segment pieces: {error}"))?;
159 let segment_contents_fut = task::spawn_blocking({
161 let reconstructor = reconstructor.clone();
162
163 move || {
164 reconstructor
165 .lock()
166 .expect("Panic if previous thread panicked when holding the mutex")
167 .add_segment(segment_pieces.as_ref())
168 }
169 });
170 let blocks = segment_contents_fut
171 .await
172 .expect("Panic if blocking task panicked")
173 .map_err(|error| error.to_string())?
174 .blocks;
175 trace!( %segment_index, "Segment reconstructed successfully");
176
177 let mut blocks_to_import = Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize);
178
179 let mut best_block_number = info.best_number;
180 for (block_number, block_bytes) in blocks {
181 let block_number = block_number.into();
182 if block_number == 0u32.into() {
183 let signed_block = client
184 .block(
185 client
186 .hash(block_number)?
187 .expect("Genesis block hash must always be found; qed"),
188 )?
189 .expect("Genesis block data must always be found; qed");
190
191 if encode_block(signed_block) != block_bytes {
192 return Err(Error::Other(
193 "Wrong genesis block, block import failed".to_string(),
194 ));
195 }
196 }
197
198 while block_number.saturating_sub(best_block_number) >= QUEUED_BLOCKS_LIMIT.into() {
204 let just_queued_blocks_count = blocks_to_import.len();
205 if !blocks_to_import.is_empty() {
206 let importing_blocks = std::mem::replace(
211 &mut blocks_to_import,
212 Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize),
213 );
214 import_queue_service
216 .import_blocks(BlockOrigin::NetworkInitialSync, importing_blocks);
217 }
218 trace!(
219 %block_number,
220 %best_block_number,
221 %just_queued_blocks_count,
222 %QUEUED_BLOCKS_LIMIT,
223 "Number of importing blocks reached queue limit, waiting before retrying"
224 );
225 tokio::time::sleep(WAIT_FOR_BLOCKS_TO_IMPORT).await;
226 best_block_number = client.info().best_number;
227 }
228
229 let signed_block =
230 decode_block::<Block>(&block_bytes).map_err(|error| error.to_string())?;
231
232 *last_processed_block_number = block_number;
233
234 if client.expect_header(signed_block.block.hash()).is_ok() {
238 continue;
239 }
240
241 let SignedBlock {
242 block,
243 justifications,
244 } = signed_block;
245 let (header, extrinsics) = block.deconstruct();
246 let hash = header.hash();
247
248 blocks_to_import.push(IncomingBlock {
249 hash,
250 header: Some(header),
251 body: Some(extrinsics),
252 indexed_body: None,
253 justifications,
254 origin: None,
255 allow_missing_state: false,
256 import_existing: false,
257 state: None,
258 skip_execution: false,
259 });
260
261 imported_blocks += 1;
262
263 if imported_blocks % 1000 == 0 {
264 debug!("Adding block {} from DSN to the import queue", block_number);
265 }
266 }
267
268 if !blocks_to_import.is_empty() {
269 import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
271 }
272
273 if last_archived_block_partial {
275 *last_completed_segment_index = segment_index.saturating_sub(SegmentIndex::ONE);
276 } else {
277 *last_completed_segment_index = segment_index;
278 }
279 }
280
281 Ok(imported_blocks)
282}