subspace_service/sync_from_dsn/
import_blocks.rs

1use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
2use crate::sync_from_dsn::{PieceGetter, LOG_TARGET};
3use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
4use sc_consensus::import_queue::ImportQueueService;
5use sc_consensus::IncomingBlock;
6use sc_consensus_subspace::archiver::{decode_block, encode_block, SegmentHeadersStore};
7use sc_service::Error;
8use sc_tracing::tracing::{debug, trace};
9use sp_consensus::BlockOrigin;
10use sp_runtime::generic::SignedBlock;
11use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One};
12use sp_runtime::Saturating;
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15use subspace_archiving::reconstructor::Reconstructor;
16use subspace_core_primitives::segments::SegmentIndex;
17use subspace_core_primitives::BlockNumber;
18use subspace_data_retrieval::segment_downloading::download_segment_pieces;
19use subspace_erasure_coding::ErasureCoding;
20use tokio::task;
21
22/// How many blocks to queue before pausing and waiting for blocks to be imported, this is
23/// essentially used to ensure we use a bounded amount of RAM during sync process.
24const QUEUED_BLOCKS_LIMIT: BlockNumber = 500;
25/// Time to wait for blocks to import if import is too slow
26const WAIT_FOR_BLOCKS_TO_IMPORT: Duration = Duration::from_secs(1);
27
28/// Starts the process of importing blocks.
29///
30/// Returns number of downloaded blocks.
31#[allow(clippy::too_many_arguments)]
32pub(super) async fn import_blocks_from_dsn<Block, AS, Client, PG, IQS>(
33    segment_headers_store: &SegmentHeadersStore<AS>,
34    segment_header_downloader: &SegmentHeaderDownloader,
35    client: &Client,
36    piece_getter: &PG,
37    import_queue_service: &mut IQS,
38    last_processed_segment_index: &mut SegmentIndex,
39    last_processed_block_number: &mut NumberFor<Block>,
40    erasure_coding: &ErasureCoding,
41) -> Result<u64, Error>
42where
43    Block: BlockT,
44    AS: AuxStore + Send + Sync + 'static,
45    Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
46    PG: PieceGetter,
47    IQS: ImportQueueService<Block> + ?Sized,
48{
49    {
50        let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
51            Error::Other(
52                "Archiver needs to be initialized before syncing from DSN to populate the very \
53                first segment"
54                    .to_string(),
55            )
56        })?;
57
58        let new_segment_headers = segment_header_downloader
59            .get_segment_headers(&last_segment_header)
60            .await
61            .map_err(|error| error.to_string())?;
62
63        debug!(target: LOG_TARGET, "Found {} new segment headers", new_segment_headers.len());
64
65        if !new_segment_headers.is_empty() {
66            segment_headers_store.add_segment_headers(&new_segment_headers)?;
67        }
68    }
69
70    let mut imported_blocks = 0;
71    let mut reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
72    // Start from the first unprocessed segment and process all segments known so far
73    let segment_indices_iter = (*last_processed_segment_index + SegmentIndex::ONE)
74        ..=segment_headers_store
75            .max_segment_index()
76            .expect("Exists, we have inserted segment headers above; qed");
77    let mut segment_indices_iter = segment_indices_iter.peekable();
78
79    while let Some(segment_index) = segment_indices_iter.next() {
80        debug!(target: LOG_TARGET, %segment_index, "Processing segment");
81
82        let segment_header = segment_headers_store
83            .get_segment_header(segment_index)
84            .expect("Statically guaranteed to exist, see checks above; qed");
85
86        let last_archived_maybe_partial_block_number = segment_header.last_archived_block().number;
87        let last_archived_block_partial = segment_header
88            .last_archived_block()
89            .archived_progress
90            .partial()
91            .is_some();
92
93        trace!(
94            target: LOG_TARGET,
95            %segment_index,
96            last_archived_maybe_partial_block_number,
97            last_archived_block_partial,
98            "Checking segment header"
99        );
100
101        let info = client.info();
102        let last_archived_maybe_partial_block_number =
103            NumberFor::<Block>::from(last_archived_maybe_partial_block_number);
104        // We have already processed the last block in this segment, or one higher than it,
105        // so it can't change. Resetting the reconstructor loses any partial blocks, so we
106        // only reset if the (possibly partial) last block has been processed.
107        if *last_processed_block_number >= last_archived_maybe_partial_block_number {
108            *last_processed_segment_index = segment_index;
109            // Reset reconstructor instance
110            reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
111            continue;
112        }
113        // Just one partial unprocessed block and this was the last segment available, so nothing to
114        // import
115        if last_archived_maybe_partial_block_number == *last_processed_block_number + One::one()
116            && last_archived_block_partial
117            && segment_indices_iter.peek().is_none()
118        {
119            // Reset reconstructor instance
120            reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
121            continue;
122        }
123
124        let segment_pieces = download_segment_pieces(segment_index, piece_getter)
125            .await
126            .map_err(|error| format!("Failed to download segment pieces: {error}"))?;
127        // CPU-intensive piece and segment reconstruction code can block the async executor.
128        let segment_contents_fut = task::spawn_blocking({
129            let reconstructor = reconstructor.clone();
130
131            move || {
132                reconstructor
133                    .lock()
134                    .expect("Panic if previous thread panicked when holding the mutex")
135                    .add_segment(segment_pieces.as_ref())
136            }
137        });
138        let blocks = segment_contents_fut
139            .await
140            .expect("Panic if blocking task panicked")
141            .map_err(|error| error.to_string())?
142            .blocks;
143        trace!(target: LOG_TARGET, %segment_index, "Segment reconstructed successfully");
144
145        let mut blocks_to_import = Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize);
146
147        let mut best_block_number = info.best_number;
148        for (block_number, block_bytes) in blocks {
149            let block_number = block_number.into();
150            if block_number == 0u32.into() {
151                let signed_block = client
152                    .block(
153                        client
154                            .hash(block_number)?
155                            .expect("Genesis block hash must always be found; qed"),
156                    )?
157                    .expect("Genesis block data must always be found; qed");
158
159                if encode_block(signed_block) != block_bytes {
160                    return Err(Error::Other(
161                        "Wrong genesis block, block import failed".to_string(),
162                    ));
163                }
164            }
165
166            // Limit number of queued blocks for import
167            // NOTE: Since best block number might be non-canonical, we might actually have more
168            // than `QUEUED_BLOCKS_LIMIT` elements in the queue, but it should be rare and
169            // insignificant. Feel free to address this in case you have a good strategy, but it
170            // seems like complexity is not worth it.
171            while block_number.saturating_sub(best_block_number) >= QUEUED_BLOCKS_LIMIT.into() {
172                let just_queued_blocks_count = blocks_to_import.len();
173                if !blocks_to_import.is_empty() {
174                    // This vector is quite large (~150kB), so replacing it with an uninitialized
175                    // vector with the correct capacity is faster than cloning and clearing it.
176                    // (Cloning requires a memcpy, which pages in and sets all the memory, which is
177                    // a waste just before clearing it.)
178                    let importing_blocks = std::mem::replace(
179                        &mut blocks_to_import,
180                        Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize),
181                    );
182                    // Import queue handles verification and importing it into the client
183                    import_queue_service
184                        .import_blocks(BlockOrigin::NetworkInitialSync, importing_blocks);
185                }
186                trace!(
187                    target: LOG_TARGET,
188                    %block_number,
189                    %best_block_number,
190                    %just_queued_blocks_count,
191                    %QUEUED_BLOCKS_LIMIT,
192                    "Number of importing blocks reached queue limit, waiting before retrying"
193                );
194                tokio::time::sleep(WAIT_FOR_BLOCKS_TO_IMPORT).await;
195                best_block_number = client.info().best_number;
196            }
197
198            let signed_block =
199                decode_block::<Block>(&block_bytes).map_err(|error| error.to_string())?;
200
201            *last_processed_block_number = block_number;
202
203            // No need to import blocks that are already present, if block is not present it might
204            // correspond to a short fork, so we need to import it even if we already have another
205            // block at this height
206            if client.expect_header(signed_block.block.hash()).is_ok() {
207                continue;
208            }
209
210            let SignedBlock {
211                block,
212                justifications,
213            } = signed_block;
214            let (header, extrinsics) = block.deconstruct();
215            let hash = header.hash();
216
217            blocks_to_import.push(IncomingBlock {
218                hash,
219                header: Some(header),
220                body: Some(extrinsics),
221                indexed_body: None,
222                justifications,
223                origin: None,
224                allow_missing_state: false,
225                import_existing: false,
226                state: None,
227                skip_execution: false,
228            });
229
230            imported_blocks += 1;
231
232            if imported_blocks % 1000 == 0 {
233                debug!(target: LOG_TARGET, "Adding block {} from DSN to the import queue", block_number);
234            }
235        }
236
237        if !blocks_to_import.is_empty() {
238            // Import queue handles verification and importing it into the client
239            import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
240        }
241
242        *last_processed_segment_index = segment_index;
243    }
244
245    Ok(imported_blocks)
246}