subspace_service/sync_from_dsn/
import_blocks.rs1use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
2use crate::sync_from_dsn::{PieceGetter, LOG_TARGET};
3use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
4use sc_consensus::import_queue::ImportQueueService;
5use sc_consensus::IncomingBlock;
6use sc_consensus_subspace::archiver::{decode_block, encode_block, SegmentHeadersStore};
7use sc_service::Error;
8use sc_tracing::tracing::{debug, trace};
9use sp_consensus::BlockOrigin;
10use sp_runtime::generic::SignedBlock;
11use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One};
12use sp_runtime::Saturating;
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15use subspace_archiving::reconstructor::Reconstructor;
16use subspace_core_primitives::segments::SegmentIndex;
17use subspace_core_primitives::BlockNumber;
18use subspace_data_retrieval::segment_downloading::download_segment_pieces;
19use subspace_erasure_coding::ErasureCoding;
20use tokio::task;
21
22const QUEUED_BLOCKS_LIMIT: BlockNumber = 500;
25const WAIT_FOR_BLOCKS_TO_IMPORT: Duration = Duration::from_secs(1);
27
28#[allow(clippy::too_many_arguments)]
32pub(super) async fn import_blocks_from_dsn<Block, AS, Client, PG, IQS>(
33 segment_headers_store: &SegmentHeadersStore<AS>,
34 segment_header_downloader: &SegmentHeaderDownloader,
35 client: &Client,
36 piece_getter: &PG,
37 import_queue_service: &mut IQS,
38 last_processed_segment_index: &mut SegmentIndex,
39 last_processed_block_number: &mut NumberFor<Block>,
40 erasure_coding: &ErasureCoding,
41) -> Result<u64, Error>
42where
43 Block: BlockT,
44 AS: AuxStore + Send + Sync + 'static,
45 Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
46 PG: PieceGetter,
47 IQS: ImportQueueService<Block> + ?Sized,
48{
49 {
50 let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
51 Error::Other(
52 "Archiver needs to be initialized before syncing from DSN to populate the very \
53 first segment"
54 .to_string(),
55 )
56 })?;
57
58 let new_segment_headers = segment_header_downloader
59 .get_segment_headers(&last_segment_header)
60 .await
61 .map_err(|error| error.to_string())?;
62
63 debug!(target: LOG_TARGET, "Found {} new segment headers", new_segment_headers.len());
64
65 if !new_segment_headers.is_empty() {
66 segment_headers_store.add_segment_headers(&new_segment_headers)?;
67 }
68 }
69
70 let mut imported_blocks = 0;
71 let mut reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
72 let segment_indices_iter = (*last_processed_segment_index + SegmentIndex::ONE)
74 ..=segment_headers_store
75 .max_segment_index()
76 .expect("Exists, we have inserted segment headers above; qed");
77 let mut segment_indices_iter = segment_indices_iter.peekable();
78
79 while let Some(segment_index) = segment_indices_iter.next() {
80 debug!(target: LOG_TARGET, %segment_index, "Processing segment");
81
82 let segment_header = segment_headers_store
83 .get_segment_header(segment_index)
84 .expect("Statically guaranteed to exist, see checks above; qed");
85
86 let last_archived_maybe_partial_block_number = segment_header.last_archived_block().number;
87 let last_archived_block_partial = segment_header
88 .last_archived_block()
89 .archived_progress
90 .partial()
91 .is_some();
92
93 trace!(
94 target: LOG_TARGET,
95 %segment_index,
96 last_archived_maybe_partial_block_number,
97 last_archived_block_partial,
98 "Checking segment header"
99 );
100
101 let info = client.info();
102 let last_archived_maybe_partial_block_number =
103 NumberFor::<Block>::from(last_archived_maybe_partial_block_number);
104 if *last_processed_block_number >= last_archived_maybe_partial_block_number {
108 *last_processed_segment_index = segment_index;
109 reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
111 continue;
112 }
113 if last_archived_maybe_partial_block_number == *last_processed_block_number + One::one()
116 && last_archived_block_partial
117 && segment_indices_iter.peek().is_none()
118 {
119 reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
121 continue;
122 }
123
124 let segment_pieces = download_segment_pieces(segment_index, piece_getter)
125 .await
126 .map_err(|error| format!("Failed to download segment pieces: {error}"))?;
127 let segment_contents_fut = task::spawn_blocking({
129 let reconstructor = reconstructor.clone();
130
131 move || {
132 reconstructor
133 .lock()
134 .expect("Panic if previous thread panicked when holding the mutex")
135 .add_segment(segment_pieces.as_ref())
136 }
137 });
138 let blocks = segment_contents_fut
139 .await
140 .expect("Panic if blocking task panicked")
141 .map_err(|error| error.to_string())?
142 .blocks;
143 trace!(target: LOG_TARGET, %segment_index, "Segment reconstructed successfully");
144
145 let mut blocks_to_import = Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize);
146
147 let mut best_block_number = info.best_number;
148 for (block_number, block_bytes) in blocks {
149 let block_number = block_number.into();
150 if block_number == 0u32.into() {
151 let signed_block = client
152 .block(
153 client
154 .hash(block_number)?
155 .expect("Genesis block hash must always be found; qed"),
156 )?
157 .expect("Genesis block data must always be found; qed");
158
159 if encode_block(signed_block) != block_bytes {
160 return Err(Error::Other(
161 "Wrong genesis block, block import failed".to_string(),
162 ));
163 }
164 }
165
166 while block_number.saturating_sub(best_block_number) >= QUEUED_BLOCKS_LIMIT.into() {
172 let just_queued_blocks_count = blocks_to_import.len();
173 if !blocks_to_import.is_empty() {
174 let importing_blocks = std::mem::replace(
179 &mut blocks_to_import,
180 Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize),
181 );
182 import_queue_service
184 .import_blocks(BlockOrigin::NetworkInitialSync, importing_blocks);
185 }
186 trace!(
187 target: LOG_TARGET,
188 %block_number,
189 %best_block_number,
190 %just_queued_blocks_count,
191 %QUEUED_BLOCKS_LIMIT,
192 "Number of importing blocks reached queue limit, waiting before retrying"
193 );
194 tokio::time::sleep(WAIT_FOR_BLOCKS_TO_IMPORT).await;
195 best_block_number = client.info().best_number;
196 }
197
198 let signed_block =
199 decode_block::<Block>(&block_bytes).map_err(|error| error.to_string())?;
200
201 *last_processed_block_number = block_number;
202
203 if client.expect_header(signed_block.block.hash()).is_ok() {
207 continue;
208 }
209
210 let SignedBlock {
211 block,
212 justifications,
213 } = signed_block;
214 let (header, extrinsics) = block.deconstruct();
215 let hash = header.hash();
216
217 blocks_to_import.push(IncomingBlock {
218 hash,
219 header: Some(header),
220 body: Some(extrinsics),
221 indexed_body: None,
222 justifications,
223 origin: None,
224 allow_missing_state: false,
225 import_existing: false,
226 state: None,
227 skip_execution: false,
228 });
229
230 imported_blocks += 1;
231
232 if imported_blocks % 1000 == 0 {
233 debug!(target: LOG_TARGET, "Adding block {} from DSN to the import queue", block_number);
234 }
235 }
236
237 if !blocks_to_import.is_empty() {
238 import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
240 }
241
242 *last_processed_segment_index = segment_index;
243 }
244
245 Ok(imported_blocks)
246}