subspace_service/sync_from_dsn/
import_blocks.rs

1use crate::sync_from_dsn::PieceGetter;
2use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
3use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
4use sc_consensus::IncomingBlock;
5use sc_consensus::import_queue::ImportQueueService;
6use sc_consensus_subspace::archiver::{SegmentHeadersStore, decode_block, encode_block};
7use sc_service::Error;
8use sc_tracing::tracing::{debug, info, trace};
9use sp_consensus::BlockOrigin;
10use sp_runtime::Saturating;
11use sp_runtime::generic::SignedBlock;
12use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One};
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15use subspace_archiving::reconstructor::Reconstructor;
16use subspace_core_primitives::BlockNumber;
17use subspace_core_primitives::segments::SegmentIndex;
18use subspace_data_retrieval::segment_downloading::download_segment_pieces;
19use subspace_erasure_coding::ErasureCoding;
20use tokio::task;
21
22/// How many blocks to queue before pausing and waiting for blocks to be imported, this is
23/// essentially used to ensure we use a bounded amount of RAM during sync process.
24const QUEUED_BLOCKS_LIMIT: BlockNumber = 500;
25/// Time to wait for blocks to import if import is too slow
26const WAIT_FOR_BLOCKS_TO_IMPORT: Duration = Duration::from_secs(1);
27
28/// Starts the process of importing blocks.
29///
30/// Returns number of downloaded blocks.
31#[allow(clippy::too_many_arguments)]
32pub(super) async fn import_blocks_from_dsn<Block, AS, Client, PG, IQS>(
33    segment_headers_store: &SegmentHeadersStore<AS>,
34    segment_header_downloader: &SegmentHeaderDownloader,
35    client: &Client,
36    piece_getter: &PG,
37    import_queue_service: &mut IQS,
38    last_completed_segment_index: &mut SegmentIndex,
39    last_processed_block_number: &mut NumberFor<Block>,
40    erasure_coding: &ErasureCoding,
41) -> Result<u64, Error>
42where
43    Block: BlockT,
44    AS: AuxStore + Send + Sync + 'static,
45    Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
46    PG: PieceGetter,
47    IQS: ImportQueueService<Block> + ?Sized,
48{
49    {
50        let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
51            Error::Other(
52                "Archiver needs to be initialized before syncing from DSN to populate the very \
53                first segment"
54                    .to_string(),
55            )
56        })?;
57
58        let new_segment_headers = segment_header_downloader
59            .get_segment_headers(&last_segment_header)
60            .await
61            .map_err(|error| error.to_string())?;
62
63        debug!("Found {} new segment headers", new_segment_headers.len());
64
65        if !new_segment_headers.is_empty() {
66            segment_headers_store.add_segment_headers(&new_segment_headers)?;
67        }
68    }
69
70    let mut imported_blocks = 0;
71    let mut reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
72    // Start from the first unprocessed segment and process all segments known so far
73    let segment_indices_iter = (*last_completed_segment_index + SegmentIndex::ONE)
74        ..=segment_headers_store
75            .max_segment_index()
76            .expect("Exists, we have inserted segment headers above; qed");
77    let mut segment_indices_iter = segment_indices_iter.peekable();
78
79    while let Some(segment_index) = segment_indices_iter.next() {
80        debug!( %segment_index, "Processing segment");
81
82        let segment_header = segment_headers_store
83            .get_segment_header(segment_index)
84            .expect("Statically guaranteed to exist, see checks above; qed");
85
86        let last_archived_maybe_partial_block_number = segment_header.last_archived_block().number;
87        let last_archived_block_partial = segment_header
88            .last_archived_block()
89            .archived_progress
90            .partial()
91            .is_some();
92
93        trace!(
94            %segment_index,
95            last_archived_maybe_partial_block_number,
96            last_archived_block_partial,
97            "Checking segment header"
98        );
99
100        let info = client.info();
101        let last_archived_maybe_partial_block_number =
102            NumberFor::<Block>::from(last_archived_maybe_partial_block_number);
103        // We have already processed the last block in this segment, or one higher than it,
104        // so it can't change. Resetting the reconstructor loses any partial blocks, so we
105        // only reset if the (possibly partial) last block has been processed.
106        if *last_processed_block_number >= last_archived_maybe_partial_block_number {
107            debug!(
108                %segment_index,
109                %last_processed_block_number,
110                %last_archived_maybe_partial_block_number,
111                %last_archived_block_partial,
112                "Already processed last (possibly partial) block in segment, resetting reconstructor",
113            );
114            *last_completed_segment_index = segment_index;
115            // Reset reconstructor instance
116            reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
117            continue;
118        }
119        // Just one partial unprocessed block and this was the last segment available, so nothing to
120        // import. (But we also haven't finished this segment yet, because of the partial block.)
121        if last_archived_maybe_partial_block_number == *last_processed_block_number + One::one()
122            && last_archived_block_partial
123        {
124            if segment_indices_iter.peek().is_none() {
125                // We haven't fully processed this segment yet, because it ends with a partial block.
126                *last_completed_segment_index = segment_index.saturating_sub(SegmentIndex::ONE);
127
128                // We don't need to reset the reconstructor here. We've finished getting blocks, so
129                // we're about to return and drop the reconstructor and its partial block anyway.
130                // (Normally, we'd need that partial block to avoid a block gap. But we should be close
131                // enough to the tip that normal syncing will fill any gaps.)
132                debug!(
133                    %segment_index,
134                    %last_processed_block_number,
135                    %last_archived_maybe_partial_block_number,
136                    %last_archived_block_partial,
137                    "No more segments, snap sync is about to finish",
138                );
139                continue;
140            } else {
141                // Downloading an entire segment for one partial block should be rare, but if it
142                // happens a lot we want to see it in the logs.
143                //
144                // TODO: if this happens a lot, check for network/DSN sync bugs - we should be able
145                // to sync to near the tip reliably, so we don't have to keep reconstructor state.
146                info!(
147                    %segment_index,
148                    %last_processed_block_number,
149                    %last_archived_maybe_partial_block_number,
150                    %last_archived_block_partial,
151                    "Downloading entire segment for one partial block",
152                );
153            }
154        }
155
156        let segment_pieces = download_segment_pieces(segment_index, piece_getter)
157            .await
158            .map_err(|error| format!("Failed to download segment pieces: {error}"))?;
159        // CPU-intensive piece and segment reconstruction code can block the async executor.
160        let segment_contents_fut = task::spawn_blocking({
161            let reconstructor = reconstructor.clone();
162
163            move || {
164                reconstructor
165                    .lock()
166                    .expect("Panic if previous thread panicked when holding the mutex")
167                    .add_segment(segment_pieces.as_ref())
168            }
169        });
170        let blocks = segment_contents_fut
171            .await
172            .expect("Panic if blocking task panicked")
173            .map_err(|error| error.to_string())?
174            .blocks;
175        trace!( %segment_index, "Segment reconstructed successfully");
176
177        let mut blocks_to_import = Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize);
178
179        let mut best_block_number = info.best_number;
180        for (block_number, block_bytes) in blocks {
181            let block_number = block_number.into();
182            if block_number == 0u32.into() {
183                let signed_block = client
184                    .block(
185                        client
186                            .hash(block_number)?
187                            .expect("Genesis block hash must always be found; qed"),
188                    )?
189                    .expect("Genesis block data must always be found; qed");
190
191                if encode_block(signed_block) != block_bytes {
192                    return Err(Error::Other(
193                        "Wrong genesis block, block import failed".to_string(),
194                    ));
195                }
196            }
197
198            // Limit number of queued blocks for import
199            // NOTE: Since best block number might be non-canonical, we might actually have more
200            // than `QUEUED_BLOCKS_LIMIT` elements in the queue, but it should be rare and
201            // insignificant. Feel free to address this in case you have a good strategy, but it
202            // seems like complexity is not worth it.
203            while block_number.saturating_sub(best_block_number) >= QUEUED_BLOCKS_LIMIT.into() {
204                let just_queued_blocks_count = blocks_to_import.len();
205                if !blocks_to_import.is_empty() {
206                    // This vector is quite large (~150kB), so replacing it with an uninitialized
207                    // vector with the correct capacity is faster than cloning and clearing it.
208                    // (Cloning requires a memcpy, which pages in and sets all the memory, which is
209                    // a waste just before clearing it.)
210                    let importing_blocks = std::mem::replace(
211                        &mut blocks_to_import,
212                        Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize),
213                    );
214                    // Import queue handles verification and importing it into the client
215                    import_queue_service
216                        .import_blocks(BlockOrigin::NetworkInitialSync, importing_blocks);
217                }
218                trace!(
219                    %block_number,
220                    %best_block_number,
221                    %just_queued_blocks_count,
222                    %QUEUED_BLOCKS_LIMIT,
223                    "Number of importing blocks reached queue limit, waiting before retrying"
224                );
225                tokio::time::sleep(WAIT_FOR_BLOCKS_TO_IMPORT).await;
226                best_block_number = client.info().best_number;
227            }
228
229            let signed_block =
230                decode_block::<Block>(&block_bytes).map_err(|error| error.to_string())?;
231
232            *last_processed_block_number = block_number;
233
234            // No need to import blocks that are already present, if block is not present it might
235            // correspond to a short fork, so we need to import it even if we already have another
236            // block at this height
237            if client.expect_header(signed_block.block.hash()).is_ok() {
238                continue;
239            }
240
241            let SignedBlock {
242                block,
243                justifications,
244            } = signed_block;
245            let (header, extrinsics) = block.deconstruct();
246            let hash = header.hash();
247
248            blocks_to_import.push(IncomingBlock {
249                hash,
250                header: Some(header),
251                body: Some(extrinsics),
252                indexed_body: None,
253                justifications,
254                origin: None,
255                allow_missing_state: false,
256                import_existing: false,
257                state: None,
258                skip_execution: false,
259            });
260
261            imported_blocks += 1;
262
263            if imported_blocks % 1000 == 0 {
264                debug!("Adding block {} from DSN to the import queue", block_number);
265            }
266        }
267
268        if !blocks_to_import.is_empty() {
269            // Import queue handles verification and importing it into the client
270            import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
271        }
272
273        // Segments are only fully processed when all their blocks are fully processed.
274        if last_archived_block_partial {
275            *last_completed_segment_index = segment_index.saturating_sub(SegmentIndex::ONE);
276        } else {
277            *last_completed_segment_index = segment_index;
278        }
279    }
280
281    Ok(imported_blocks)
282}