sc_consensus_subspace/
archiver.rs

1//! Consensus archiver responsible for archival of blockchain history, it is driven by block import
2//! pipeline.
3//!
4//! Implements archiving process in Subspace blockchain that converts blockchain history (blocks)
5//! into archived history (pieces).
6//!
7//! The main entry point here is [`create_subspace_archiver`] that will create a task, which while
8//! driven will perform the archiving itself.
9//!
10//! Archiving is triggered by block importing notification ([`SubspaceLink::block_importing_notification_stream`])
11//! and tries to archive the block at [`ChainConstants::confirmation_depth_k`](sp_consensus_subspace::ChainConstants::confirmation_depth_k)
12//! depth from the block being imported. Block import will then wait for archiver to acknowledge
13//! processing, which is necessary for ensuring that when the next block is imported, inherents will
14//! contain segment header of newly archived block (must happen exactly in the next block).
15//!
16//! Archiving itself will also wait for acknowledgement by various subscribers before proceeding,
17//! which includes farmer subscription, in case of reference implementation via RPC
18//! (`sc-consensus-subspace-rpc`), but could also be in other ways.
19//!
20//! [`SegmentHeadersStore`] is maintained as a data structure containing all known (including future
21//! in case of syncing) segment headers. This data structure contents is then made available to
22//! other parts of the protocol that need to know what correct archival history of the blockchain
23//! looks like. For example, it is used during node sync and farmer plotting in order to verify
24//! pieces of archival history received from other network participants.
25//!
26//! [`recreate_genesis_segment`] is a bit of a hack and is useful for deriving of the genesis
27//! segment that is special case since we don't have enough data in the blockchain history itself
28//! during genesis in order to do the archiving.
29//!
30//! [`encode_block`] and [`decode_block`] are symmetric encoding/decoding functions turning
31//! [`SignedBlock`]s into bytes and back.
32
33#[cfg(test)]
34mod tests;
35
36use crate::slot_worker::SubspaceSyncOracle;
37use crate::{SubspaceLink, SubspaceNotificationSender};
38use futures::StreamExt;
39use parity_scale_codec::{Decode, Encode};
40use parking_lot::RwLock;
41use rand::prelude::*;
42use rand_chacha::ChaCha8Rng;
43use rayon::prelude::*;
44use rayon::ThreadPoolBuilder;
45use sc_client_api::{
46    AuxStore, Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, LockImportRun,
47};
48use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
49use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
50use sp_api::ProvideRuntimeApi;
51use sp_blockchain::HeaderBackend;
52use sp_consensus::SyncOracle;
53use sp_consensus_subspace::{SubspaceApi, SubspaceJustification};
54use sp_objects::ObjectsApi;
55use sp_runtime::generic::SignedBlock;
56use sp_runtime::traits::{
57    Block as BlockT, BlockNumber as BlockNumberT, CheckedSub, Header, NumberFor, One, Zero,
58};
59use sp_runtime::Justifications;
60use std::error::Error;
61use std::future::Future;
62use std::num::NonZeroU32;
63use std::slice;
64use std::sync::atomic::{AtomicU16, Ordering};
65use std::sync::Arc;
66use std::time::Duration;
67use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
68use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject};
69use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
70use subspace_core_primitives::{BlockNumber, PublicKey};
71use subspace_erasure_coding::ErasureCoding;
72use subspace_kzg::Kzg;
73use tracing::{debug, info, trace, warn};
74
75/// Number of WASM instances is 8, this is a bit lower to avoid warnings exceeding number of
76/// instances
77const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 6;
78/// Do not wait for acknowledgements beyond this time limit
79const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
80
81/// How deep (in segments) should block be in order to be finalized.
82///
83/// This is required for full nodes to not prune recent history such that keep-up sync in Substrate
84/// works even without archival nodes (initial sync will be done from DSN).
85///
86/// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
87/// Substrate and is not worth it right now.
88/// https://github.com/paritytech/substrate/discussions/14359
89pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
90
91#[derive(Debug)]
92struct SegmentHeadersStoreInner<AS> {
93    aux_store: Arc<AS>,
94    next_key_index: AtomicU16,
95    /// In-memory cache of segment headers
96    cache: RwLock<Vec<SegmentHeader>>,
97}
98
99/// Persistent storage of segment headers.
100///
101/// It maintains all known segment headers. During sync from DSN it is possible that this data structure contains
102/// segment headers that from the point of view of the tip of the current chain are "in the future". This is expected
103/// and must be accounted for in the archiver and other places.
104///
105/// Segment headers are stored in batches (which is more efficient to store and retrieve). Each next batch contains
106/// distinct segment headers with monotonically increasing segment indices. During instantiation all previously stored
107/// batches will be read and in-memory representation of the whole contents will be created such that queries to this
108/// data structure are quick and not involving any disk I/O.
109#[derive(Debug)]
110pub struct SegmentHeadersStore<AS> {
111    inner: Arc<SegmentHeadersStoreInner<AS>>,
112    confirmation_depth_k: BlockNumber,
113}
114
115impl<AS> Clone for SegmentHeadersStore<AS> {
116    fn clone(&self) -> Self {
117        Self {
118            inner: Arc::clone(&self.inner),
119            confirmation_depth_k: self.confirmation_depth_k,
120        }
121    }
122}
123
124impl<AS> SegmentHeadersStore<AS>
125where
126    AS: AuxStore,
127{
128    const KEY_PREFIX: &'static [u8] = b"segment-headers";
129    const INITIAL_CACHE_CAPACITY: usize = 1_000;
130
131    /// Create new instance
132    pub fn new(
133        aux_store: Arc<AS>,
134        confirmation_depth_k: BlockNumber,
135    ) -> sp_blockchain::Result<Self> {
136        let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
137
138        debug!("Started loading segment headers into cache");
139        // Segment headers are stored in batches (which is more efficient to store and retrieve), this is why code deals
140        // with key indices here rather that segment indices. Essentially this iterates over keys from 0 until missing
141        // entry is hit, which becomes the next key index where additional segment headers will be stored.
142        let mut next_key_index = 0;
143        while let Some(segment_headers) =
144            aux_store
145                .get_aux(&Self::key(next_key_index))?
146                .map(|segment_header| {
147                    Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
148                        .expect("Always correct segment header unless DB is corrupted; qed")
149                })
150        {
151            cache.extend(segment_headers);
152            next_key_index += 1;
153        }
154        debug!("Finished loading segment headers into cache");
155
156        Ok(Self {
157            inner: Arc::new(SegmentHeadersStoreInner {
158                aux_store,
159                next_key_index: AtomicU16::new(next_key_index),
160                cache: RwLock::new(cache),
161            }),
162            confirmation_depth_k,
163        })
164    }
165
166    /// Returns last observed segment header
167    pub fn last_segment_header(&self) -> Option<SegmentHeader> {
168        self.inner.cache.read().last().cloned()
169    }
170
171    /// Returns last observed segment index
172    pub fn max_segment_index(&self) -> Option<SegmentIndex> {
173        let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
174        Some(SegmentIndex::from(segment_index))
175    }
176
177    /// Add segment headers.
178    ///
179    /// Multiple can be inserted for efficiency purposes.
180    pub fn add_segment_headers(
181        &self,
182        segment_headers: &[SegmentHeader],
183    ) -> sp_blockchain::Result<()> {
184        let mut maybe_last_segment_index = self.max_segment_index();
185        let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
186        // Check all input segment headers to see which ones are not stored yet and verifying that segment indices are
187        // monotonically increasing
188        for segment_header in segment_headers {
189            let segment_index = segment_header.segment_index();
190            match maybe_last_segment_index {
191                Some(last_segment_index) => {
192                    if segment_index <= last_segment_index {
193                        // Skip already stored segment headers
194                        continue;
195                    }
196
197                    if segment_index != last_segment_index + SegmentIndex::ONE {
198                        let error = format!(
199                            "Segment index {} must strictly follow {}, can't store segment header",
200                            segment_index, last_segment_index
201                        );
202                        return Err(sp_blockchain::Error::Application(error.into()));
203                    }
204
205                    segment_headers_to_store.push(segment_header);
206                    maybe_last_segment_index.replace(segment_index);
207                }
208                None => {
209                    if segment_index != SegmentIndex::ZERO {
210                        let error = format!(
211                            "First segment header index must be zero, found index {segment_index}"
212                        );
213                        return Err(sp_blockchain::Error::Application(error.into()));
214                    }
215
216                    segment_headers_to_store.push(segment_header);
217                    maybe_last_segment_index.replace(segment_index);
218                }
219            }
220        }
221
222        if segment_headers_to_store.is_empty() {
223            return Ok(());
224        }
225
226        // Insert all new segment headers into vacant key index for efficiency purposes
227        // TODO: Do compaction when we have too many keys: combine multiple segment headers into a
228        //  single entry for faster retrievals and more compact storage
229        {
230            let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
231            let key = Self::key(key_index);
232            let value = segment_headers_to_store.encode();
233            let insert_data = vec![(key.as_slice(), value.as_slice())];
234
235            self.inner.aux_store.insert_aux(&insert_data, &[])?;
236        }
237        self.inner.cache.write().extend(segment_headers_to_store);
238
239        Ok(())
240    }
241
242    /// Get a single segment header
243    pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
244        self.inner
245            .cache
246            .read()
247            .get(u64::from(segment_index) as usize)
248            .copied()
249    }
250
251    fn key(key_index: u16) -> Vec<u8> {
252        (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
253    }
254
255    /// Get segment headers that are expected to be included at specified block number.
256    pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
257        let Some(last_segment_index) = self.max_segment_index() else {
258            // Not initialized
259            return Vec::new();
260        };
261
262        // Special case for the initial segment (for genesis block).
263        if block_number == 1 {
264            // If there is a segment index present, and we store monotonically increasing segment
265            // headers, then the first header exists.
266            return vec![self
267                .get_segment_header(SegmentIndex::ZERO)
268                .expect("Segment headers are stored in monotonically increasing order; qed")];
269        }
270
271        if last_segment_index == SegmentIndex::ZERO {
272            // Genesis segment already included in block #1
273            return Vec::new();
274        }
275
276        let mut current_segment_index = last_segment_index;
277        loop {
278            // If the current segment index present, and we store monotonically increasing segment
279            // headers, then the current segment header exists as well.
280            let current_segment_header = self
281                .get_segment_header(current_segment_index)
282                .expect("Segment headers are stored in monotonically increasing order; qed");
283
284            // The block immediately after the archived segment adding the confirmation depth
285            let target_block_number =
286                current_segment_header.last_archived_block().number + 1 + self.confirmation_depth_k;
287            if target_block_number == block_number {
288                let mut headers_for_block = vec![current_segment_header];
289
290                // Check block spanning multiple segments
291                let last_archived_block_number =
292                    current_segment_header.last_archived_block().number;
293                let mut segment_index = current_segment_index - SegmentIndex::ONE;
294
295                while let Some(segment_header) = self.get_segment_header(segment_index) {
296                    if segment_header.last_archived_block().number == last_archived_block_number {
297                        headers_for_block.insert(0, segment_header);
298                        segment_index -= SegmentIndex::ONE;
299                    } else {
300                        break;
301                    }
302                }
303
304                return headers_for_block;
305            }
306
307            // iterate segments further
308            if target_block_number > block_number {
309                // no need to check the initial segment
310                if current_segment_index > SegmentIndex::ONE {
311                    current_segment_index -= SegmentIndex::ONE
312                } else {
313                    break;
314                }
315            } else {
316                // No segment headers required
317                return Vec::new();
318            }
319        }
320
321        // No segment headers required
322        Vec::new()
323    }
324}
325
326/// Notification with block header hash that needs to be signed and sender for signature.
327#[derive(Debug, Clone)]
328pub struct ArchivedSegmentNotification {
329    /// Archived segment.
330    pub archived_segment: Arc<NewArchivedSegment>,
331    /// Sender that signified the fact of receiving archived segment by farmer.
332    ///
333    /// This must be used to send a message or else block import pipeline will get stuck.
334    pub acknowledgement_sender: TracingUnboundedSender<()>,
335}
336
337/// Notification with incrementally generated object mappings for a block (and any previous block
338/// continuation)
339#[derive(Debug, Clone)]
340pub struct ObjectMappingNotification {
341    /// Incremental object mappings for a block (and any previous block continuation).
342    ///
343    /// The archived data won't be available in pieces until the entire segment is full and archived.
344    pub object_mapping: Vec<GlobalObject>,
345    /// The block that these mappings are from.
346    pub block_number: BlockNumber,
347    // TODO: add an acknowledgement_sender for backpressure if needed
348}
349
350/// Whether to create object mappings.
351#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
352pub enum CreateObjectMappings {
353    /// Start creating object mappings from this block number.
354    ///
355    /// This can be lower than the latest archived block, but must be greater than genesis.
356    ///
357    /// The genesis block doesn't have mappings, so starting mappings at genesis is pointless.
358    /// The archiver will fail if it can't get the data for this block, but snap sync doesn't store
359    /// the genesis data on disk.  So avoiding genesis also avoids this error.
360    /// <https://github.com/paritytech/polkadot-sdk/issues/5366>
361    Block(NonZeroU32),
362
363    /// Create object mappings as archiving is happening.
364    Yes,
365
366    /// Don't create object mappings.
367    #[default]
368    No,
369}
370
371impl CreateObjectMappings {
372    /// The fixed block number to start creating object mappings from.
373    /// If there is no fixed block number, or mappings are disabled, returns None.
374    fn block(&self) -> Option<BlockNumber> {
375        match self {
376            CreateObjectMappings::Block(block) => Some(block.get()),
377            CreateObjectMappings::Yes => None,
378            CreateObjectMappings::No => None,
379        }
380    }
381
382    /// Returns true if object mappings will be created from a past or future block.
383    pub fn is_enabled(&self) -> bool {
384        !matches!(self, CreateObjectMappings::No)
385    }
386
387    /// Does the supplied block number need object mappings?
388    pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
389        if !self.is_enabled() {
390            return false;
391        }
392
393        if let Some(target_block) = self.block() {
394            return block >= target_block;
395        }
396
397        // We're continuing where we left off, so all blocks get mappings.
398        true
399    }
400}
401
402fn find_last_archived_block<Block, Client, AS>(
403    client: &Client,
404    segment_headers_store: &SegmentHeadersStore<AS>,
405    best_block_to_archive: NumberFor<Block>,
406    create_object_mappings: bool,
407) -> sp_blockchain::Result<Option<(SegmentHeader, SignedBlock<Block>, BlockObjectMapping)>>
408where
409    Block: BlockT,
410    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
411    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
412    AS: AuxStore,
413{
414    let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
415        return Ok(None);
416    };
417
418    if max_segment_index == SegmentIndex::ZERO {
419        // Just genesis, nothing else to check
420        return Ok(None);
421    }
422
423    for segment_header in (SegmentIndex::ZERO..=max_segment_index)
424        .rev()
425        .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
426    {
427        let last_archived_block_number = segment_header.last_archived_block().number;
428
429        if NumberFor::<Block>::from(last_archived_block_number) > best_block_to_archive {
430            // Last archived block in segment header is too high for current state of the chain
431            // (segment headers store may know about more blocks in existence than is currently
432            // imported)
433            continue;
434        }
435        let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
436            // This block number is not in our chain yet (segment headers store may know about more
437            // blocks in existence than is currently imported)
438            continue;
439        };
440
441        let Some(last_archived_block) = client.block(last_archived_block_hash)? else {
442            // This block data was already pruned (but the headers weren't)
443            continue;
444        };
445
446        // If we're starting mapping creation at this block, return its mappings.
447        let block_object_mappings = if create_object_mappings {
448            client
449                .runtime_api()
450                .extract_block_object_mapping(
451                    *last_archived_block.block.header().parent_hash(),
452                    last_archived_block.block.clone(),
453                )
454                .unwrap_or_default()
455        } else {
456            BlockObjectMapping::default()
457        };
458
459        return Ok(Some((
460            segment_header,
461            last_archived_block,
462            block_object_mappings,
463        )));
464    }
465
466    Ok(None)
467}
468
469/// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned
470pub fn recreate_genesis_segment<Block, Client>(
471    client: &Client,
472    kzg: Kzg,
473    erasure_coding: ErasureCoding,
474) -> Result<Option<NewArchivedSegment>, Box<dyn Error>>
475where
476    Block: BlockT,
477    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
478    Client::Api: ObjectsApi<Block>,
479{
480    let genesis_hash = client.info().genesis_hash;
481    let Some(signed_block) = client.block(genesis_hash)? else {
482        return Ok(None);
483    };
484
485    let encoded_block = encode_block(signed_block);
486
487    // There are no mappings in the genesis block, so they can be ignored
488    let block_outcome = Archiver::new(kzg, erasure_coding).add_block(
489        encoded_block,
490        BlockObjectMapping::default(),
491        false,
492    );
493    let new_archived_segment = block_outcome
494        .archived_segments
495        .into_iter()
496        .next()
497        .expect("Genesis block always results in exactly one archived segment; qed");
498
499    Ok(Some(new_archived_segment))
500}
501
502struct InitializedArchiver<Block>
503where
504    Block: BlockT,
505{
506    archiver: Archiver,
507    best_archived_block: (Block::Hash, NumberFor<Block>),
508}
509
510/// Encode block for archiving purposes.
511///
512/// Only specific Subspace justifications are included in the encoding, determined by result of
513/// [`SubspaceJustification::must_be_archived`], other justifications are filtered-out.
514pub fn encode_block<Block>(mut signed_block: SignedBlock<Block>) -> Vec<u8>
515where
516    Block: BlockT,
517{
518    if signed_block.block.header().number().is_zero() {
519        let mut encoded_block = signed_block.encode();
520
521        let encoded_block_length = encoded_block.len();
522
523        // We extend encoding of genesis block with extra data such that the very first archived
524        // segment can be produced right away, bootstrapping the farming process.
525        //
526        // Note: we add it to the end of the encoded block, so during decoding it'll actually be
527        // ignored (unless `DecodeAll::decode_all()` is used) even though it is technically present
528        // in encoded form.
529        encoded_block.resize(RecordedHistorySegment::SIZE, 0);
530        let mut rng = ChaCha8Rng::from_seed(
531            signed_block
532                .block
533                .header()
534                .state_root()
535                .as_ref()
536                .try_into()
537                .expect("State root in Subspace must be 32 bytes, panic otherwise; qed"),
538        );
539        rng.fill(&mut encoded_block[encoded_block_length..]);
540
541        encoded_block
542    } else {
543        // Filter out non-canonical justifications
544        if let Some(justifications) = signed_block.justifications.take() {
545            let mut filtered_justifications = justifications.into_iter().filter(|justification| {
546                // Only Subspace justifications are to be archived
547                let Some(subspace_justification) =
548                    SubspaceJustification::try_from_justification(justification)
549                        .and_then(|subspace_justification| subspace_justification.ok())
550                else {
551                    return false;
552                };
553
554                subspace_justification.must_be_archived()
555            });
556
557            if let Some(first_justification) = filtered_justifications.next() {
558                let mut justifications = Justifications::from(first_justification);
559
560                for justification in filtered_justifications {
561                    justifications.append(justification);
562                }
563
564                signed_block.justifications = Some(justifications);
565            }
566        }
567
568        signed_block.encode()
569    }
570}
571
572/// Symmetrical to [`encode_block()`], used to decode previously encoded blocks
573pub fn decode_block<Block>(
574    mut encoded_block: &[u8],
575) -> Result<SignedBlock<Block>, parity_scale_codec::Error>
576where
577    Block: BlockT,
578{
579    SignedBlock::<Block>::decode(&mut encoded_block)
580}
581
582fn initialize_archiver<Block, Client, AS>(
583    segment_headers_store: &SegmentHeadersStore<AS>,
584    subspace_link: &SubspaceLink<Block>,
585    client: &Client,
586    create_object_mappings: CreateObjectMappings,
587) -> sp_blockchain::Result<InitializedArchiver<Block>>
588where
589    Block: BlockT,
590    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
591    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
592    AS: AuxStore,
593{
594    let client_info = client.info();
595    let best_block_number = TryInto::<BlockNumber>::try_into(client_info.best_number)
596        .unwrap_or_else(|_| {
597            unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
598        });
599
600    let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k();
601
602    let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
603    // Choose a lower block number if we want to get mappings from that specific block.
604    // If we are continuing from where we left off, we don't need to change the block number to archive.
605    // If there is no path to this block from the tip due to snap sync, we'll start archiving from
606    // an earlier segment, then start mapping again once archiving reaches this block.
607    if let Some(block_number) = create_object_mappings.block() {
608        // There aren't any mappings in the genesis block, so starting there is pointless.
609        // (And causes errors on restart, because genesis block data is never stored during snap sync.)
610        best_block_to_archive = best_block_to_archive.min(block_number);
611    }
612
613    if (best_block_to_archive..best_block_number)
614        .any(|block_number| client.hash(block_number.into()).ok().flatten().is_none())
615    {
616        // If there are blocks missing headers between best block to archive and best block of the
617        // blockchain it means newer block was inserted in some special way and as such is by
618        // definition valid, so we can simply assume that is our best block to archive instead
619        best_block_to_archive = best_block_number;
620    }
621
622    // If the user chooses an object mapping start block we don't have data or state for, we can't
623    // create mappings for it, so the node must exit with an error. We ignore genesis here, because
624    // it doesn't have mappings.
625    if create_object_mappings.is_enabled() && best_block_to_archive >= 1 {
626        let Some(best_block_to_archive_hash) = client.hash(best_block_to_archive.into())? else {
627            let error = format!(
628                "Missing hash for mapping block {best_block_to_archive}, \
629                try a higher block number, or wipe your node and restart with `--sync full`"
630            );
631            return Err(sp_blockchain::Error::Application(error.into()));
632        };
633
634        let Some(best_block_data) = client.block(best_block_to_archive_hash)? else {
635            let error = format!(
636                "Missing data for mapping block {best_block_to_archive} \
637                hash {best_block_to_archive_hash}, \
638                try a higher block number, or wipe your node and restart with `--sync full`"
639            );
640            return Err(sp_blockchain::Error::Application(error.into()));
641        };
642
643        // Similarly, state can be pruned, even if the data is present
644        client
645            .runtime_api()
646            .extract_block_object_mapping(
647                *best_block_data.block.header().parent_hash(),
648                best_block_data.block.clone(),
649            )
650            .map_err(|error| {
651                sp_blockchain::Error::Application(
652                    format!(
653                        "Missing state for mapping block {best_block_to_archive} \
654                        hash {best_block_to_archive_hash}: {error}, \
655                        try a higher block number, or wipe your node and restart with `--sync full`"
656                    )
657                    .into(),
658                )
659            })?;
660    }
661
662    let maybe_last_archived_block = find_last_archived_block(
663        client,
664        segment_headers_store,
665        best_block_to_archive.into(),
666        create_object_mappings.is_enabled(),
667    )?;
668
669    let have_last_segment_header = maybe_last_archived_block.is_some();
670    let mut best_archived_block = None;
671
672    let mut archiver =
673        if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
674            maybe_last_archived_block
675        {
676            // Continuing from existing initial state
677            let last_archived_block_number = last_segment_header.last_archived_block().number;
678            info!(
679                %last_archived_block_number,
680                "Resuming archiver from last archived block",
681            );
682
683            // Set initial value, this is needed in case only genesis block was archived and there
684            // is nothing else available
685            best_archived_block.replace((
686                last_archived_block.block.hash(),
687                *last_archived_block.block.header().number(),
688            ));
689
690            let last_archived_block_encoded = encode_block(last_archived_block);
691
692            Archiver::with_initial_state(
693                subspace_link.kzg().clone(),
694                subspace_link.erasure_coding().clone(),
695                last_segment_header,
696                &last_archived_block_encoded,
697                block_object_mappings,
698            )
699            .map_err(|error| {
700                sp_blockchain::Error::Application(
701                    format!("Incorrect parameters for archiver: {error:?} {last_segment_header:?}")
702                        .into(),
703                )
704            })?
705        } else {
706            info!("Starting archiving from genesis");
707
708            Archiver::new(
709                subspace_link.kzg().clone(),
710                subspace_link.erasure_coding().clone(),
711            )
712        };
713
714    // Process blocks since last fully archived block up to the current head minus K
715    {
716        let blocks_to_archive_from = archiver
717            .last_archived_block_number()
718            .map(|n| n + 1)
719            .unwrap_or_default();
720        let blocks_to_archive_to = best_block_number
721            .checked_sub(confirmation_depth_k)
722            .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
723            .or({
724                if have_last_segment_header {
725                    None
726                } else {
727                    // If not continuation, archive genesis block
728                    Some(0)
729                }
730            });
731
732        if let Some(blocks_to_archive_to) = blocks_to_archive_to {
733            info!(
734                "Archiving already produced blocks {}..={}",
735                blocks_to_archive_from, blocks_to_archive_to,
736            );
737
738            let thread_pool = ThreadPoolBuilder::new()
739                .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
740                .build()
741                .map_err(|error| {
742                    sp_blockchain::Error::Backend(format!(
743                        "Failed to create thread pool for archiver initialization: {error}"
744                    ))
745                })?;
746            // We need to limit number of threads to avoid running out of WASM instances
747            let blocks_to_archive = thread_pool.install(|| {
748                (blocks_to_archive_from..=blocks_to_archive_to)
749                    .into_par_iter()
750                    .map_init(
751                        || client.runtime_api(),
752                        |runtime_api, block_number| {
753                            let block_hash = client
754                                .hash(block_number.into())?
755                                .expect("All blocks since last archived must be present; qed");
756
757                            let block = client
758                                .block(block_hash)?
759                                .expect("All blocks since last archived must be present; qed");
760
761                            let block_object_mappings =
762                                if create_object_mappings.is_enabled_for_block(block_number) {
763                                    runtime_api
764                                        .extract_block_object_mapping(
765                                            *block.block.header().parent_hash(),
766                                            block.block.clone(),
767                                        )
768                                        .unwrap_or_default()
769                                } else {
770                                    BlockObjectMapping::default()
771                                };
772
773                            Ok((block, block_object_mappings))
774                        },
775                    )
776                    .collect::<sp_blockchain::Result<Vec<(SignedBlock<_>, _)>>>()
777            })?;
778
779            best_archived_block =
780                blocks_to_archive
781                    .last()
782                    .map(|(block, _block_object_mappings)| {
783                        (block.block.hash(), *block.block.header().number())
784                    });
785
786            for (signed_block, block_object_mappings) in blocks_to_archive {
787                let block_number_to_archive = *signed_block.block.header().number();
788                let encoded_block = encode_block(signed_block);
789
790                debug!(
791                    "Encoded block {} has size of {:.2} kiB",
792                    block_number_to_archive,
793                    encoded_block.len() as f32 / 1024.0
794                );
795
796                let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false);
797                send_object_mapping_notification(
798                    &subspace_link.object_mapping_notification_sender,
799                    block_outcome.object_mapping,
800                    block_number_to_archive,
801                );
802                let new_segment_headers: Vec<SegmentHeader> = block_outcome
803                    .archived_segments
804                    .iter()
805                    .map(|archived_segment| archived_segment.segment_header)
806                    .collect();
807
808                if !new_segment_headers.is_empty() {
809                    segment_headers_store.add_segment_headers(&new_segment_headers)?;
810                }
811            }
812        }
813    }
814
815    Ok(InitializedArchiver {
816        archiver,
817        best_archived_block: best_archived_block
818            .expect("Must always set if there is no logical error; qed"),
819    })
820}
821
822fn finalize_block<Block, Backend, Client>(
823    client: &Client,
824    telemetry: Option<&TelemetryHandle>,
825    hash: Block::Hash,
826    number: NumberFor<Block>,
827) where
828    Block: BlockT,
829    Backend: BackendT<Block>,
830    Client: LockImportRun<Block, Backend> + Finalizer<Block, Backend>,
831{
832    if number.is_zero() {
833        // Block zero is finalized already and generates unnecessary warning if called again
834        return;
835    }
836    // We don't have anything useful to do with this result yet, the only source of errors was
837    // logged already inside
838    let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
839        // Ideally some handle to a synchronization oracle would be used to avoid unconditionally
840        // notifying.
841        client
842            .apply_finality(import_op, hash, None, true)
843            .map_err(|error| {
844                warn!(
845                    "Error applying finality to block {:?}: {}",
846                    (hash, number),
847                    error
848                );
849                error
850            })?;
851
852        debug!("Finalizing blocks up to ({:?}, {})", number, hash);
853
854        telemetry!(
855            telemetry;
856            CONSENSUS_INFO;
857            "subspace.finalized_blocks_up_to";
858            "number" => ?number, "hash" => ?hash,
859        );
860
861        Ok(())
862    });
863}
864
865/// Create an archiver task.
866///
867/// Archiver task will listen for importing blocks and archive blocks at `K` depth, producing pieces
868/// and segment headers (segment headers are then added back to the blockchain as
869/// `store_segment_header` extrinsic).
870///
871/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
872///
873/// Archiver is only able to move forward and doesn't support reorgs. Upon restart it will check
874/// [`SegmentHeadersStore`] and chain history to reconstruct "current" state it was in before last
875/// shutdown and continue incrementally archiving blockchain history from there.
876///
877/// Archiving is triggered by block importing notification ([`SubspaceLink::block_importing_notification_stream`])
878/// and tries to archive the block at [`ChainConstants::confirmation_depth_k`](sp_consensus_subspace::ChainConstants::confirmation_depth_k)
879/// depth from the block being imported. Block import will then wait for archiver to acknowledge
880/// processing, which is necessary for ensuring that when the next block is imported, inherents will
881/// contain segment header of newly archived block (must happen exactly in the next block).
882///
883/// `create_object_mappings` controls when object mappings are created for archived blocks. When
884/// these mappings are created, a ([`SubspaceLink::object_mapping_notification_stream`])
885/// notification will be sent.
886///
887/// Once segment header is archived, notification ([`SubspaceLink::archived_segment_notification_stream`])
888/// will be sent and archiver will be paused until all receivers have provided an acknowledgement
889/// for it.
890///
891/// Archiving will be incremental during normal operation to decrease impact on block import and
892/// non-incremental heavily parallel during sync process since parallel implementation is more
893/// efficient overall and during sync only total sync time matters.
894pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
895    segment_headers_store: SegmentHeadersStore<AS>,
896    subspace_link: SubspaceLink<Block>,
897    client: Arc<Client>,
898    sync_oracle: SubspaceSyncOracle<SO>,
899    telemetry: Option<TelemetryHandle>,
900    create_object_mappings: CreateObjectMappings,
901) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
902where
903    Block: BlockT,
904    Backend: BackendT<Block>,
905    Client: ProvideRuntimeApi<Block>
906        + BlockBackend<Block>
907        + HeaderBackend<Block>
908        + LockImportRun<Block, Backend>
909        + Finalizer<Block, Backend>
910        + BlockchainEvents<Block>
911        + AuxStore
912        + Send
913        + Sync
914        + 'static,
915    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
916    AS: AuxStore + Send + Sync + 'static,
917    SO: SyncOracle + Send + Sync + 'static,
918{
919    if create_object_mappings.is_enabled() {
920        info!(
921            ?create_object_mappings,
922            "Creating object mappings from the configured block onwards"
923        );
924    } else {
925        info!("Not creating object mappings");
926    }
927
928    let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
929        Some(initialize_archiver(
930            &segment_headers_store,
931            &subspace_link,
932            client.as_ref(),
933            create_object_mappings,
934        )?)
935    } else {
936        None
937    };
938
939    // Subscribing synchronously before returning
940    let mut block_importing_notification_stream = subspace_link
941        .block_importing_notification_stream
942        .subscribe();
943
944    Ok(async move {
945        let archiver = match maybe_archiver {
946            Some(archiver) => archiver,
947            None => initialize_archiver(
948                &segment_headers_store,
949                &subspace_link,
950                client.as_ref(),
951                create_object_mappings,
952            )?,
953        };
954        let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k().into();
955
956        let InitializedArchiver {
957            mut archiver,
958            best_archived_block,
959        } = archiver;
960        let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block;
961
962        while let Some(block_importing_notification) =
963            block_importing_notification_stream.next().await
964        {
965            let importing_block_number = block_importing_notification.block_number;
966            let block_number_to_archive =
967                match importing_block_number.checked_sub(&confirmation_depth_k) {
968                    Some(block_number_to_archive) => block_number_to_archive,
969                    None => {
970                        // Too early to archive blocks
971                        continue;
972                    }
973                };
974
975            let last_archived_block_number = segment_headers_store
976                .last_segment_header()
977                .expect("Exists after archiver initialization; qed")
978                .last_archived_block()
979                .number;
980            let create_mappings =
981                create_object_mappings.is_enabled_for_block(last_archived_block_number);
982            let last_archived_block_number = NumberFor::<Block>::from(last_archived_block_number);
983            trace!(
984                %importing_block_number,
985                %block_number_to_archive,
986                %best_archived_block_number,
987                %last_archived_block_number,
988                "Checking if block needs to be skipped"
989            );
990
991            // Skip archived blocks, unless we're producing object mappings for them
992            let skip_last_archived_blocks =
993                last_archived_block_number > block_number_to_archive && !create_mappings;
994            if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
995                // This block was already archived, skip
996                debug!(
997                    %importing_block_number,
998                    %block_number_to_archive,
999                    %best_archived_block_number,
1000                    %last_archived_block_number,
1001                    "Skipping already archived block",
1002                );
1003                continue;
1004            }
1005
1006            // In case there was a block gap re-initialize archiver and continue with current
1007            // block number (rather than block number at some depth) to allow for special sync
1008            // modes where pre-verified blocks are inserted at some point in the future comparing to
1009            // previously existing blocks
1010            if best_archived_block_number + One::one() != block_number_to_archive {
1011                InitializedArchiver {
1012                    archiver,
1013                    best_archived_block: (best_archived_block_hash, best_archived_block_number),
1014                } = initialize_archiver(
1015                    &segment_headers_store,
1016                    &subspace_link,
1017                    client.as_ref(),
1018                    create_object_mappings,
1019                )?;
1020
1021                if best_archived_block_number + One::one() == block_number_to_archive {
1022                    // As expected, can archive this block
1023                } else if best_archived_block_number >= block_number_to_archive {
1024                    // Special sync mode where verified blocks were inserted into blockchain
1025                    // directly, archiving of this block will naturally happen later
1026                    continue;
1027                } else if client
1028                    .block_hash(importing_block_number - One::one())?
1029                    .is_none()
1030                {
1031                    // We may have imported some block using special sync mode and block we're about
1032                    // to import is the first one after the gap at which archiver is supposed to be
1033                    // initialized, but we are only about to import it, so wait for the next block
1034                    // for now
1035                    continue;
1036                } else {
1037                    let error = format!(
1038                        "There was a gap in blockchain history and the last contiguous series of \
1039                        blocks starting with doesn't start with archived segment (best archived \
1040                        block number {best_archived_block_number}, block number to archive \
1041                        {block_number_to_archive}), block about to be imported \
1042                        {importing_block_number}), archiver can't continue",
1043                    );
1044                    return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1045                        error.into(),
1046                    )));
1047                }
1048            }
1049
1050            let max_segment_index_before = segment_headers_store.max_segment_index();
1051            (best_archived_block_hash, best_archived_block_number) = archive_block(
1052                &mut archiver,
1053                segment_headers_store.clone(),
1054                &*client,
1055                &sync_oracle,
1056                subspace_link.object_mapping_notification_sender.clone(),
1057                subspace_link.archived_segment_notification_sender.clone(),
1058                best_archived_block_hash,
1059                block_number_to_archive,
1060                create_object_mappings,
1061            )
1062            .await?;
1063
1064            let max_segment_index = segment_headers_store.max_segment_index();
1065            if max_segment_index_before != max_segment_index {
1066                let maybe_block_number_to_finalize = max_segment_index
1067                    // Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments
1068                    .and_then(|max_segment_index| {
1069                        max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
1070                    })
1071                    .and_then(|segment_index| {
1072                        segment_headers_store.get_segment_header(segment_index)
1073                    })
1074                    .map(|segment_header| segment_header.last_archived_block().number)
1075                    // Make sure not to finalize block number that does not yet exist (segment
1076                    // headers store may contain future blocks during initial sync)
1077                    .map(|block_number| block_number_to_archive.min(block_number.into()))
1078                    // Do not finalize blocks twice
1079                    .filter(|block_number| *block_number > client.info().finalized_number);
1080
1081                if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
1082                    {
1083                        let mut import_notification = client.every_import_notification_stream();
1084
1085                        // Drop notification to drop acknowledgement and allow block import to
1086                        // proceed
1087                        drop(block_importing_notification);
1088
1089                        while let Some(notification) = import_notification.next().await {
1090                            // Wait for importing block to finish importing
1091                            if notification.header.number() == &importing_block_number {
1092                                break;
1093                            }
1094                        }
1095                    }
1096
1097                    // Block is not guaranteed to be present this deep if we have only synced recent
1098                    // blocks
1099                    if let Some(block_hash_to_finalize) =
1100                        client.block_hash(block_number_to_finalize)?
1101                    {
1102                        finalize_block(
1103                            &*client,
1104                            telemetry.as_ref(),
1105                            block_hash_to_finalize,
1106                            block_number_to_finalize,
1107                        );
1108                    }
1109                }
1110            }
1111        }
1112
1113        Ok(())
1114    })
1115}
1116
1117/// Tries to archive `block_number` and returns new (or old if not changed) best archived block
1118#[allow(clippy::too_many_arguments)]
1119async fn archive_block<Block, Backend, Client, AS, SO>(
1120    archiver: &mut Archiver,
1121    segment_headers_store: SegmentHeadersStore<AS>,
1122    client: &Client,
1123    sync_oracle: &SubspaceSyncOracle<SO>,
1124    object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
1125    archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
1126    best_archived_block_hash: Block::Hash,
1127    block_number_to_archive: NumberFor<Block>,
1128    create_object_mappings: CreateObjectMappings,
1129) -> sp_blockchain::Result<(Block::Hash, NumberFor<Block>)>
1130where
1131    Block: BlockT,
1132    Backend: BackendT<Block>,
1133    Client: ProvideRuntimeApi<Block>
1134        + BlockBackend<Block>
1135        + HeaderBackend<Block>
1136        + LockImportRun<Block, Backend>
1137        + Finalizer<Block, Backend>
1138        + AuxStore
1139        + Send
1140        + Sync
1141        + 'static,
1142    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
1143    AS: AuxStore + Send + Sync + 'static,
1144    SO: SyncOracle + Send + Sync + 'static,
1145{
1146    let block = client
1147        .block(
1148            client
1149                .block_hash(block_number_to_archive)?
1150                .expect("Older block by number must always exist"),
1151        )?
1152        .expect("Older block by number must always exist");
1153
1154    let parent_block_hash = *block.block.header().parent_hash();
1155    let block_hash_to_archive = block.block.hash();
1156
1157    debug!(
1158        "Archiving block {:?} ({})",
1159        block_number_to_archive, block_hash_to_archive
1160    );
1161
1162    if parent_block_hash != best_archived_block_hash {
1163        let error = format!(
1164            "Attempt to switch to a different fork beyond archiving depth, \
1165            can't do it: parent block hash {}, best archived block hash {}",
1166            parent_block_hash, best_archived_block_hash
1167        );
1168        return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1169            error.into(),
1170        )));
1171    }
1172
1173    let create_mappings = create_object_mappings.is_enabled_for_block(
1174        block_number_to_archive.try_into().unwrap_or_else(|_| {
1175            unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed")
1176        }),
1177    );
1178
1179    let block_object_mappings = if create_mappings {
1180        client
1181            .runtime_api()
1182            .extract_block_object_mapping(parent_block_hash, block.block.clone())
1183            .map_err(|error| {
1184                sp_blockchain::Error::Application(
1185                    format!("Failed to retrieve block object mappings: {error}").into(),
1186                )
1187            })?
1188    } else {
1189        BlockObjectMapping::default()
1190    };
1191
1192    let encoded_block = encode_block(block);
1193    debug!(
1194        "Encoded block {} has size of {:.2} kiB",
1195        block_number_to_archive,
1196        encoded_block.len() as f32 / 1024.0
1197    );
1198
1199    let block_outcome = archiver.add_block(
1200        encoded_block,
1201        block_object_mappings,
1202        !sync_oracle.is_major_syncing(),
1203    );
1204    send_object_mapping_notification(
1205        &object_mapping_notification_sender,
1206        block_outcome.object_mapping,
1207        block_number_to_archive,
1208    );
1209    for archived_segment in block_outcome.archived_segments {
1210        let segment_header = archived_segment.segment_header;
1211
1212        segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
1213
1214        send_archived_segment_notification(&archived_segment_notification_sender, archived_segment)
1215            .await;
1216    }
1217
1218    Ok((block_hash_to_archive, block_number_to_archive))
1219}
1220
1221fn send_object_mapping_notification<BlockNum>(
1222    object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
1223    object_mapping: Vec<GlobalObject>,
1224    block_number: BlockNum,
1225) where
1226    BlockNum: BlockNumberT,
1227{
1228    if object_mapping.is_empty() {
1229        return;
1230    }
1231
1232    let block_number = TryInto::<BlockNumber>::try_into(block_number).unwrap_or_else(|_| {
1233        unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
1234    });
1235
1236    let object_mapping_notification = ObjectMappingNotification {
1237        object_mapping,
1238        block_number,
1239    };
1240
1241    object_mapping_notification_sender.notify(move || object_mapping_notification);
1242}
1243
1244async fn send_archived_segment_notification(
1245    archived_segment_notification_sender: &SubspaceNotificationSender<ArchivedSegmentNotification>,
1246    archived_segment: NewArchivedSegment,
1247) {
1248    let segment_index = archived_segment.segment_header.segment_index();
1249    let (acknowledgement_sender, mut acknowledgement_receiver) =
1250        tracing_unbounded::<()>("subspace_acknowledgement", 1000);
1251    // Keep `archived_segment` around until all acknowledgements are received since some receivers
1252    // might use weak references
1253    let archived_segment = Arc::new(archived_segment);
1254    let archived_segment_notification = ArchivedSegmentNotification {
1255        archived_segment: Arc::clone(&archived_segment),
1256        acknowledgement_sender,
1257    };
1258
1259    archived_segment_notification_sender.notify(move || archived_segment_notification);
1260
1261    let wait_fut = async {
1262        while acknowledgement_receiver.next().await.is_some() {
1263            debug!(
1264                "Archived segment notification acknowledged: {}",
1265                segment_index
1266            );
1267        }
1268    };
1269
1270    if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
1271        .await
1272        .is_err()
1273    {
1274        warn!(
1275            "Archived segment notification was not acknowledged and reached timeout, continue \
1276            regardless"
1277        );
1278    }
1279}