Skip to main content

sc_consensus_subspace/
archiver.rs

1//! Consensus archiver responsible for archival of blockchain history, it is driven by block import
2//! pipeline.
3//!
4//! Implements archiving process in Subspace blockchain that converts blockchain history (blocks)
5//! into archived history (pieces).
6//!
7//! The main entry point here is [`create_subspace_archiver`] that will create a task, which while
8//! driven will perform the archiving itself.
9//!
10//! Archiving is triggered by block importing notification ([`SubspaceLink::block_importing_notification_stream`])
11//! and tries to archive the block at [`ChainConstants::confirmation_depth_k`](sp_consensus_subspace::ChainConstants::confirmation_depth_k)
12//! depth from the block being imported. Block import will then wait for archiver to acknowledge
13//! processing, which is necessary for ensuring that when the next block is imported, inherents will
14//! contain segment header of newly archived block (must happen exactly in the next block).
15//!
16//! Archiving itself will also wait for acknowledgement by various subscribers before proceeding,
17//! which includes farmer subscription, in case of reference implementation via RPC
18//! (`sc-consensus-subspace-rpc`), but could also be in other ways.
19//!
20//! [`SegmentHeadersStore`] is maintained as a data structure containing all known (including future
21//! in case of syncing) segment headers. This data structure contents is then made available to
22//! other parts of the protocol that need to know what correct archival history of the blockchain
23//! looks like. For example, it is used during node sync and farmer plotting in order to verify
24//! pieces of archival history received from other network participants.
25//!
26//! [`recreate_genesis_segment`] is a bit of a hack and is useful for deriving of the genesis
27//! segment that is special case since we don't have enough data in the blockchain history itself
28//! during genesis in order to do the archiving.
29//!
30//! [`encode_block`] and [`decode_block`] are symmetric encoding/decoding functions turning
31//! [`SignedBlock`]s into bytes and back.
32
33#[cfg(test)]
34mod tests;
35
36use crate::slot_worker::SubspaceSyncOracle;
37use crate::{SubspaceLink, SubspaceNotificationSender};
38use futures::StreamExt;
39use parity_scale_codec::{Decode, Encode};
40use parking_lot::RwLock;
41use rand::prelude::*;
42use rand_chacha::ChaCha8Rng;
43use rayon::ThreadPoolBuilder;
44use rayon::prelude::*;
45use sc_client_api::{
46    AuxStore, Backend as BackendT, BlockBackend, BlockImportOperation, BlockchainEvents, Finalizer,
47    LockImportRun,
48};
49use sc_telemetry::{CONSENSUS_INFO, TelemetryHandle, telemetry};
50use sc_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
51use sp_api::ProvideRuntimeApi;
52use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
53use sp_consensus::SyncOracle;
54use sp_consensus_subspace::{SubspaceApi, SubspaceJustification};
55use sp_objects::ObjectsApi;
56use sp_runtime::Justifications;
57use sp_runtime::generic::SignedBlock;
58use sp_runtime::traits::{
59    Block as BlockT, BlockNumber as BlockNumberT, CheckedSub, Header, NumberFor, One, Zero,
60};
61use std::error::Error;
62use std::future::Future;
63use std::num::NonZeroU32;
64use std::slice;
65use std::sync::Arc;
66use std::sync::atomic::{AtomicU16, Ordering};
67use std::time::Duration;
68use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
69use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject};
70use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
71use subspace_core_primitives::{BlockNumber, PublicKey};
72use subspace_erasure_coding::ErasureCoding;
73use subspace_kzg::Kzg;
74use tracing::{debug, info, trace, warn};
75
76/// Number of WASM instances is 8, this is a bit lower to avoid warnings exceeding number of
77/// instances
78const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 6;
79/// Do not wait for acknowledgements beyond this time limit
80const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
81
82/// How deep (in segments) should block be in order to be finalized.
83///
84/// This is required for full nodes to not prune recent history such that keep-up sync in Substrate
85/// works even without archival nodes (initial sync will be done from DSN).
86///
87/// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
88/// Substrate and is not worth it right now.
89/// https://github.com/paritytech/substrate/discussions/14359
90pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
91
92#[derive(Debug)]
93struct SegmentHeadersStoreInner<AS> {
94    aux_store: Arc<AS>,
95    next_key_index: AtomicU16,
96    /// In-memory cache of segment headers
97    cache: RwLock<Vec<SegmentHeader>>,
98}
99
100/// Persistent storage of segment headers.
101///
102/// It maintains all known segment headers. During sync from DSN it is possible that this data structure contains
103/// segment headers that from the point of view of the tip of the current chain are "in the future". This is expected
104/// and must be accounted for in the archiver and other places.
105///
106/// Segment headers are stored in batches (which is more efficient to store and retrieve). Each next batch contains
107/// distinct segment headers with monotonically increasing segment indices. During instantiation all previously stored
108/// batches will be read and in-memory representation of the whole contents will be created such that queries to this
109/// data structure are quick and not involving any disk I/O.
110#[derive(Debug)]
111pub struct SegmentHeadersStore<AS> {
112    inner: Arc<SegmentHeadersStoreInner<AS>>,
113    confirmation_depth_k: BlockNumber,
114}
115
116impl<AS> Clone for SegmentHeadersStore<AS> {
117    fn clone(&self) -> Self {
118        Self {
119            inner: Arc::clone(&self.inner),
120            confirmation_depth_k: self.confirmation_depth_k,
121        }
122    }
123}
124
125impl<AS> SegmentHeadersStore<AS>
126where
127    AS: AuxStore,
128{
129    const KEY_PREFIX: &'static [u8] = b"segment-headers";
130    const INITIAL_CACHE_CAPACITY: usize = 1_000;
131
132    /// Create new instance
133    pub fn new(
134        aux_store: Arc<AS>,
135        confirmation_depth_k: BlockNumber,
136    ) -> sp_blockchain::Result<Self> {
137        let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
138
139        debug!("Started loading segment headers into cache");
140        // Segment headers are stored in batches (which is more efficient to store and retrieve), this is why code deals
141        // with key indices here rather that segment indices. Essentially this iterates over keys from 0 until missing
142        // entry is hit, which becomes the next key index where additional segment headers will be stored.
143        let mut next_key_index = 0;
144        while let Some(segment_headers) =
145            aux_store
146                .get_aux(&Self::key(next_key_index))?
147                .map(|segment_header| {
148                    Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
149                        .expect("Always correct segment header unless DB is corrupted; qed")
150                })
151        {
152            cache.extend(segment_headers);
153            next_key_index += 1;
154        }
155        debug!("Finished loading segment headers into cache");
156
157        Ok(Self {
158            inner: Arc::new(SegmentHeadersStoreInner {
159                aux_store,
160                next_key_index: AtomicU16::new(next_key_index),
161                cache: RwLock::new(cache),
162            }),
163            confirmation_depth_k,
164        })
165    }
166
167    /// Returns last observed segment header
168    pub fn last_segment_header(&self) -> Option<SegmentHeader> {
169        self.inner.cache.read().last().cloned()
170    }
171
172    /// Returns last observed segment index
173    pub fn max_segment_index(&self) -> Option<SegmentIndex> {
174        let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
175        Some(SegmentIndex::from(segment_index))
176    }
177
178    /// Add segment headers.
179    ///
180    /// Multiple can be inserted for efficiency purposes.
181    pub fn add_segment_headers(
182        &self,
183        segment_headers: &[SegmentHeader],
184    ) -> sp_blockchain::Result<()> {
185        let mut maybe_last_segment_index = self.max_segment_index();
186        let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
187        // Check all input segment headers to see which ones are not stored yet and verifying that segment indices are
188        // monotonically increasing
189        for segment_header in segment_headers {
190            let segment_index = segment_header.segment_index();
191            match maybe_last_segment_index {
192                Some(last_segment_index) => {
193                    if segment_index <= last_segment_index {
194                        // Skip already stored segment headers
195                        continue;
196                    }
197
198                    if segment_index != last_segment_index + SegmentIndex::ONE {
199                        let error = format!(
200                            "Segment index {segment_index} must strictly follow {last_segment_index}, can't store segment header"
201                        );
202                        return Err(sp_blockchain::Error::Application(error.into()));
203                    }
204
205                    segment_headers_to_store.push(segment_header);
206                    maybe_last_segment_index.replace(segment_index);
207                }
208                None => {
209                    if segment_index != SegmentIndex::ZERO {
210                        let error = format!(
211                            "First segment header index must be zero, found index {segment_index}"
212                        );
213                        return Err(sp_blockchain::Error::Application(error.into()));
214                    }
215
216                    segment_headers_to_store.push(segment_header);
217                    maybe_last_segment_index.replace(segment_index);
218                }
219            }
220        }
221
222        if segment_headers_to_store.is_empty() {
223            return Ok(());
224        }
225
226        // Insert all new segment headers into vacant key index for efficiency purposes
227        // TODO: Do compaction when we have too many keys: combine multiple segment headers into a
228        //  single entry for faster retrievals and more compact storage
229        {
230            let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
231            let key = Self::key(key_index);
232            let value = segment_headers_to_store.encode();
233            let insert_data = vec![(key.as_slice(), value.as_slice())];
234
235            self.inner.aux_store.insert_aux(&insert_data, &[])?;
236        }
237        self.inner.cache.write().extend(segment_headers_to_store);
238
239        Ok(())
240    }
241
242    /// Get a single segment header
243    pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
244        self.inner
245            .cache
246            .read()
247            .get(u64::from(segment_index) as usize)
248            .copied()
249    }
250
251    fn key(key_index: u16) -> Vec<u8> {
252        (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
253    }
254
255    /// Get segment headers that are expected to be included at specified block number.
256    pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
257        let Some(last_segment_index) = self.max_segment_index() else {
258            // Not initialized
259            return Vec::new();
260        };
261
262        // Special case for the initial segment (for genesis block).
263        if block_number == 1 {
264            // If there is a segment index present, and we store monotonically increasing segment
265            // headers, then the first header exists.
266            return vec![
267                self.get_segment_header(SegmentIndex::ZERO)
268                    .expect("Segment headers are stored in monotonically increasing order; qed"),
269            ];
270        }
271
272        if last_segment_index == SegmentIndex::ZERO {
273            // Genesis segment already included in block #1
274            return Vec::new();
275        }
276
277        let mut current_segment_index = last_segment_index;
278        loop {
279            // If the current segment index present, and we store monotonically increasing segment
280            // headers, then the current segment header exists as well.
281            let current_segment_header = self
282                .get_segment_header(current_segment_index)
283                .expect("Segment headers are stored in monotonically increasing order; qed");
284
285            // The block immediately after the archived segment adding the confirmation depth
286            let target_block_number =
287                current_segment_header.last_archived_block().number + 1 + self.confirmation_depth_k;
288            if target_block_number == block_number {
289                let mut headers_for_block = vec![current_segment_header];
290
291                // Check block spanning multiple segments
292                let last_archived_block_number =
293                    current_segment_header.last_archived_block().number;
294                let mut segment_index = current_segment_index - SegmentIndex::ONE;
295
296                while let Some(segment_header) = self.get_segment_header(segment_index) {
297                    if segment_header.last_archived_block().number == last_archived_block_number {
298                        headers_for_block.insert(0, segment_header);
299                        segment_index -= SegmentIndex::ONE;
300                    } else {
301                        break;
302                    }
303                }
304
305                return headers_for_block;
306            }
307
308            // iterate segments further
309            if target_block_number > block_number {
310                // no need to check the initial segment
311                if current_segment_index > SegmentIndex::ONE {
312                    current_segment_index -= SegmentIndex::ONE
313                } else {
314                    break;
315                }
316            } else {
317                // No segment headers required
318                return Vec::new();
319            }
320        }
321
322        // No segment headers required
323        Vec::new()
324    }
325}
326
327/// Notification with block header hash that needs to be signed and sender for signature.
328#[derive(Debug, Clone)]
329pub struct ArchivedSegmentNotification {
330    /// Archived segment.
331    pub archived_segment: Arc<NewArchivedSegment>,
332    /// Sender that signified the fact of receiving archived segment by farmer.
333    ///
334    /// This must be used to send a message or else block import pipeline will get stuck.
335    pub acknowledgement_sender: TracingUnboundedSender<()>,
336}
337
338/// Notification with incrementally generated object mappings for a block (and any previous block
339/// continuation)
340#[derive(Debug, Clone)]
341pub struct ObjectMappingNotification {
342    /// Incremental object mappings for a block (and any previous block continuation).
343    ///
344    /// The archived data won't be available in pieces until the entire segment is full and archived.
345    pub object_mapping: Vec<GlobalObject>,
346    /// The block that these mappings are from.
347    pub block_number: BlockNumber,
348    // TODO: add an acknowledgement_sender for backpressure if needed
349}
350
351/// Whether to create object mappings.
352#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
353pub enum CreateObjectMappings {
354    /// Start creating object mappings from this block number.
355    ///
356    /// This can be lower than the latest archived block, but must be greater than genesis.
357    ///
358    /// The genesis block doesn't have mappings, so starting mappings at genesis is pointless.
359    /// The archiver will fail if it can't get the data for this block, but snap sync doesn't store
360    /// the genesis data on disk.  So avoiding genesis also avoids this error.
361    /// <https://github.com/paritytech/polkadot-sdk/issues/5366>
362    Block(NonZeroU32),
363
364    /// Create object mappings as archiving is happening.
365    Yes,
366
367    /// Don't create object mappings.
368    #[default]
369    No,
370}
371
372impl CreateObjectMappings {
373    /// The fixed block number to start creating object mappings from.
374    /// If there is no fixed block number, or mappings are disabled, returns None.
375    fn block(&self) -> Option<BlockNumber> {
376        match self {
377            CreateObjectMappings::Block(block) => Some(block.get()),
378            CreateObjectMappings::Yes => None,
379            CreateObjectMappings::No => None,
380        }
381    }
382
383    /// Returns true if object mappings will be created from a past or future block.
384    pub fn is_enabled(&self) -> bool {
385        !matches!(self, CreateObjectMappings::No)
386    }
387
388    /// Does the supplied block number need object mappings?
389    pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
390        if !self.is_enabled() {
391            return false;
392        }
393
394        if let Some(target_block) = self.block() {
395            return block >= target_block;
396        }
397
398        // We're continuing where we left off, so all blocks get mappings.
399        true
400    }
401}
402
403fn find_last_archived_block<Block, Client, AS>(
404    client: &Client,
405    segment_headers_store: &SegmentHeadersStore<AS>,
406    best_block_to_archive: NumberFor<Block>,
407    create_object_mappings: bool,
408) -> sp_blockchain::Result<Option<(SegmentHeader, SignedBlock<Block>, BlockObjectMapping)>>
409where
410    Block: BlockT,
411    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
412    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
413    AS: AuxStore,
414{
415    let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
416        return Ok(None);
417    };
418
419    if max_segment_index == SegmentIndex::ZERO {
420        // Just genesis, nothing else to check
421        return Ok(None);
422    }
423
424    for segment_header in (SegmentIndex::ZERO..=max_segment_index)
425        .rev()
426        .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
427    {
428        let last_archived_block_number = segment_header.last_archived_block().number;
429
430        if NumberFor::<Block>::from(last_archived_block_number) > best_block_to_archive {
431            // Last archived block in segment header is too high for current state of the chain
432            // (segment headers store may know about more blocks in existence than is currently
433            // imported)
434            continue;
435        }
436        let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
437            // This block number is not in our chain yet (segment headers store may know about more
438            // blocks in existence than is currently imported)
439            continue;
440        };
441
442        let Some(last_archived_block) = client.block(last_archived_block_hash)? else {
443            // This block data was already pruned (but the headers weren't)
444            continue;
445        };
446
447        // If we're starting mapping creation at this block, return its mappings.
448        let block_object_mappings = if create_object_mappings {
449            client
450                .runtime_api()
451                .extract_block_object_mapping(
452                    *last_archived_block.block.header().parent_hash(),
453                    last_archived_block.block.clone(),
454                )
455                .unwrap_or_default()
456        } else {
457            BlockObjectMapping::default()
458        };
459
460        return Ok(Some((
461            segment_header,
462            last_archived_block,
463            block_object_mappings,
464        )));
465    }
466
467    Ok(None)
468}
469
470/// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned
471pub fn recreate_genesis_segment<Block, Client>(
472    client: &Client,
473    kzg: Kzg,
474    erasure_coding: ErasureCoding,
475) -> Result<Option<NewArchivedSegment>, Box<dyn Error>>
476where
477    Block: BlockT,
478    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
479    Client::Api: ObjectsApi<Block>,
480{
481    let genesis_hash = client.info().genesis_hash;
482    let Some(signed_block) = client.block(genesis_hash)? else {
483        return Ok(None);
484    };
485
486    let encoded_block = encode_block(signed_block);
487
488    // There are no mappings in the genesis block, so they can be ignored
489    let block_outcome = Archiver::new(kzg, erasure_coding).add_block(
490        encoded_block,
491        BlockObjectMapping::default(),
492        false,
493    );
494    let new_archived_segment = block_outcome
495        .archived_segments
496        .into_iter()
497        .next()
498        .expect("Genesis block always results in exactly one archived segment; qed");
499
500    Ok(Some(new_archived_segment))
501}
502
503struct InitializedArchiver<Block>
504where
505    Block: BlockT,
506{
507    archiver: Archiver,
508    best_archived_block: (Block::Hash, NumberFor<Block>),
509}
510
511/// Encode block for archiving purposes.
512///
513/// Only specific Subspace justifications are included in the encoding, determined by result of
514/// [`SubspaceJustification::must_be_archived`], other justifications are filtered-out.
515pub fn encode_block<Block>(mut signed_block: SignedBlock<Block>) -> Vec<u8>
516where
517    Block: BlockT,
518{
519    if signed_block.block.header().number().is_zero() {
520        let mut encoded_block = signed_block.encode();
521
522        let encoded_block_length = encoded_block.len();
523
524        // We extend encoding of genesis block with extra data such that the very first archived
525        // segment can be produced right away, bootstrapping the farming process.
526        //
527        // Note: we add it to the end of the encoded block, so during decoding it'll actually be
528        // ignored (unless `DecodeAll::decode_all()` is used) even though it is technically present
529        // in encoded form.
530        encoded_block.resize(RecordedHistorySegment::SIZE, 0);
531        let mut rng = ChaCha8Rng::from_seed(
532            signed_block
533                .block
534                .header()
535                .state_root()
536                .as_ref()
537                .try_into()
538                .expect("State root in Subspace must be 32 bytes, panic otherwise; qed"),
539        );
540        rng.fill(&mut encoded_block[encoded_block_length..]);
541
542        encoded_block
543    } else {
544        // Filter out non-canonical justifications
545        if let Some(justifications) = signed_block.justifications.take() {
546            let mut filtered_justifications = justifications.into_iter().filter(|justification| {
547                // Only Subspace justifications are to be archived
548                let Some(subspace_justification) =
549                    SubspaceJustification::try_from_justification(justification)
550                        .and_then(|subspace_justification| subspace_justification.ok())
551                else {
552                    return false;
553                };
554
555                subspace_justification.must_be_archived()
556            });
557
558            if let Some(first_justification) = filtered_justifications.next() {
559                let mut justifications = Justifications::from(first_justification);
560
561                for justification in filtered_justifications {
562                    justifications.append(justification);
563                }
564
565                signed_block.justifications = Some(justifications);
566            }
567        }
568
569        signed_block.encode()
570    }
571}
572
573/// Symmetrical to [`encode_block()`], used to decode previously encoded blocks
574pub fn decode_block<Block>(
575    mut encoded_block: &[u8],
576) -> Result<SignedBlock<Block>, parity_scale_codec::Error>
577where
578    Block: BlockT,
579{
580    SignedBlock::<Block>::decode(&mut encoded_block)
581}
582
583fn initialize_archiver<Block, Client, AS>(
584    segment_headers_store: &SegmentHeadersStore<AS>,
585    subspace_link: &SubspaceLink<Block>,
586    client: &Client,
587    create_object_mappings: CreateObjectMappings,
588) -> sp_blockchain::Result<InitializedArchiver<Block>>
589where
590    Block: BlockT,
591    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
592    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
593    AS: AuxStore,
594{
595    let client_info = client.info();
596    let best_block_number = TryInto::<BlockNumber>::try_into(client_info.best_number)
597        .unwrap_or_else(|_| {
598            unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
599        });
600
601    let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k();
602
603    let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
604    // Choose a lower block number if we want to get mappings from that specific block.
605    // If we are continuing from where we left off, we don't need to change the block number to archive.
606    // If there is no path to this block from the tip due to snap sync, we'll start archiving from
607    // an earlier segment, then start mapping again once archiving reaches this block.
608    if let Some(block_number) = create_object_mappings.block() {
609        // There aren't any mappings in the genesis block, so starting there is pointless.
610        // (And causes errors on restart, because genesis block data is never stored during snap sync.)
611        best_block_to_archive = best_block_to_archive.min(block_number);
612    }
613
614    if (best_block_to_archive..best_block_number)
615        .any(|block_number| client.hash(block_number.into()).ok().flatten().is_none())
616    {
617        // If there are blocks missing headers between best block to archive and best block of the
618        // blockchain it means newer block was inserted in some special way and as such is by
619        // definition valid, so we can simply assume that is our best block to archive instead
620        best_block_to_archive = best_block_number;
621    }
622
623    // If the user chooses an object mapping start block we don't have data or state for, we can't
624    // create mappings for it, so the node must exit with an error. We ignore genesis here, because
625    // it doesn't have mappings.
626    if create_object_mappings.is_enabled() && best_block_to_archive >= 1 {
627        let Some(best_block_to_archive_hash) = client.hash(best_block_to_archive.into())? else {
628            let error = format!(
629                "Missing hash for mapping block {best_block_to_archive}, \
630                try a higher block number, or wipe your node and restart with `--sync full`"
631            );
632            return Err(sp_blockchain::Error::Application(error.into()));
633        };
634
635        let Some(best_block_data) = client.block(best_block_to_archive_hash)? else {
636            let error = format!(
637                "Missing data for mapping block {best_block_to_archive} \
638                hash {best_block_to_archive_hash}, \
639                try a higher block number, or wipe your node and restart with `--sync full`"
640            );
641            return Err(sp_blockchain::Error::Application(error.into()));
642        };
643
644        // Similarly, state can be pruned, even if the data is present
645        client
646            .runtime_api()
647            .extract_block_object_mapping(
648                *best_block_data.block.header().parent_hash(),
649                best_block_data.block.clone(),
650            )
651            .map_err(|error| {
652                sp_blockchain::Error::Application(
653                    format!(
654                        "Missing state for mapping block {best_block_to_archive} \
655                        hash {best_block_to_archive_hash}: {error}, \
656                        try a higher block number, or wipe your node and restart with `--sync full`"
657                    )
658                    .into(),
659                )
660            })?;
661    }
662
663    let maybe_last_archived_block = find_last_archived_block(
664        client,
665        segment_headers_store,
666        best_block_to_archive.into(),
667        create_object_mappings.is_enabled(),
668    )?;
669
670    let have_last_segment_header = maybe_last_archived_block.is_some();
671    let mut best_archived_block = None;
672
673    let mut archiver =
674        if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
675            maybe_last_archived_block
676        {
677            // Continuing from existing initial state
678            let last_archived_block_number = last_segment_header.last_archived_block().number;
679            info!(
680                %last_archived_block_number,
681                "Resuming archiver from last archived block",
682            );
683
684            // Set initial value, this is needed in case only genesis block was archived and there
685            // is nothing else available
686            best_archived_block.replace((
687                last_archived_block.block.hash(),
688                *last_archived_block.block.header().number(),
689            ));
690
691            let last_archived_block_encoded = encode_block(last_archived_block);
692
693            Archiver::with_initial_state(
694                subspace_link.kzg().clone(),
695                subspace_link.erasure_coding().clone(),
696                last_segment_header,
697                &last_archived_block_encoded,
698                block_object_mappings,
699            )
700            .map_err(|error| {
701                sp_blockchain::Error::Application(
702                    format!("Incorrect parameters for archiver: {error:?} {last_segment_header:?}")
703                        .into(),
704                )
705            })?
706        } else {
707            info!("Starting archiving from genesis");
708
709            Archiver::new(
710                subspace_link.kzg().clone(),
711                subspace_link.erasure_coding().clone(),
712            )
713        };
714
715    // Process blocks since last fully archived block up to the current head minus K
716    {
717        let blocks_to_archive_from = archiver
718            .last_archived_block_number()
719            .map(|n| n + 1)
720            .unwrap_or_default();
721        let blocks_to_archive_to = best_block_number
722            .checked_sub(confirmation_depth_k)
723            .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
724            .or({
725                if have_last_segment_header {
726                    None
727                } else {
728                    // If not continuation, archive genesis block
729                    Some(0)
730                }
731            });
732
733        if let Some(blocks_to_archive_to) = blocks_to_archive_to {
734            info!(
735                "Archiving already produced blocks {}..={}",
736                blocks_to_archive_from, blocks_to_archive_to,
737            );
738
739            let thread_pool = ThreadPoolBuilder::new()
740                .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
741                .build()
742                .map_err(|error| {
743                    sp_blockchain::Error::Backend(format!(
744                        "Failed to create thread pool for archiver initialization: {error}"
745                    ))
746                })?;
747            // We need to limit number of threads to avoid running out of WASM instances
748            let blocks_to_archive = thread_pool.install(|| {
749                (blocks_to_archive_from..=blocks_to_archive_to)
750                    .into_par_iter()
751                    .map_init(
752                        || client.runtime_api(),
753                        |runtime_api, block_number| {
754                            let block_hash = client
755                                .hash(block_number.into())?
756                                .expect("All blocks since last archived must be present; qed");
757
758                            let block = client
759                                .block(block_hash)?
760                                .expect("All blocks since last archived must be present; qed");
761
762                            let block_object_mappings =
763                                if create_object_mappings.is_enabled_for_block(block_number) {
764                                    runtime_api
765                                        .extract_block_object_mapping(
766                                            *block.block.header().parent_hash(),
767                                            block.block.clone(),
768                                        )
769                                        .unwrap_or_default()
770                                } else {
771                                    BlockObjectMapping::default()
772                                };
773
774                            Ok((block, block_object_mappings))
775                        },
776                    )
777                    .collect::<sp_blockchain::Result<Vec<(SignedBlock<_>, _)>>>()
778            })?;
779
780            best_archived_block =
781                blocks_to_archive
782                    .last()
783                    .map(|(block, _block_object_mappings)| {
784                        (block.block.hash(), *block.block.header().number())
785                    });
786
787            for (signed_block, block_object_mappings) in blocks_to_archive {
788                let block_number_to_archive = *signed_block.block.header().number();
789                let encoded_block = encode_block(signed_block);
790
791                debug!(
792                    "Encoded block {} has size of {}",
793                    block_number_to_archive,
794                    bytesize::ByteSize::b(encoded_block.len() as u64)
795                        .display()
796                        .iec(),
797                );
798
799                let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false);
800                send_object_mapping_notification(
801                    &subspace_link.object_mapping_notification_sender,
802                    block_outcome.object_mapping,
803                    block_number_to_archive,
804                );
805                let new_segment_headers: Vec<SegmentHeader> = block_outcome
806                    .archived_segments
807                    .iter()
808                    .map(|archived_segment| archived_segment.segment_header)
809                    .collect();
810
811                if !new_segment_headers.is_empty() {
812                    segment_headers_store.add_segment_headers(&new_segment_headers)?;
813                }
814            }
815        }
816    }
817
818    Ok(InitializedArchiver {
819        archiver,
820        best_archived_block: best_archived_block
821            .expect("Must always set if there is no logical error; qed"),
822    })
823}
824
825fn finalize_block<Block, Backend, Client>(
826    client: &Client,
827    backend: &Backend,
828    telemetry: Option<&TelemetryHandle>,
829    hash: Block::Hash,
830    number: NumberFor<Block>,
831) where
832    Block: BlockT,
833    Backend: BackendT<Block>,
834    Client: LockImportRun<Block, Backend> + Finalizer<Block, Backend> + HeaderBackend<Block>,
835{
836    if number.is_zero() {
837        // Block zero is finalized already and generates unnecessary warning if called again
838        return;
839    }
840
841    let displaced = match client.header(hash) {
842        Ok(Some(header)) => {
843            let parent_hash = *header.parent_hash();
844            match backend
845                .blockchain()
846                .displaced_leaves_after_finalizing(hash, number, parent_hash)
847            {
848                Ok(d) => Some(d),
849                Err(error) => {
850                    warn!(
851                        ?error,
852                        ?hash,
853                        ?parent_hash,
854                        ?number,
855                        "Failed to compute displaced leaves; their block_weight entries will leak"
856                    );
857                    None
858                }
859            }
860        }
861        Ok(None) => {
862            warn!(
863                ?hash,
864                ?number,
865                "Header missing for finalized hash; skipping displaced-fork cleanup"
866            );
867            None
868        }
869        Err(error) => {
870            warn!(
871                ?error,
872                ?hash,
873                ?number,
874                "Failed to load header for finalized hash; skipping displaced-fork cleanup"
875            );
876            None
877        }
878    };
879
880    // We don't have anything useful to do with this result yet, the only source of errors was
881    // logged already inside
882    let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
883        // Ideally some handle to a synchronization oracle would be used to avoid unconditionally
884        // notifying.
885        client
886            .apply_finality(import_op, hash, None, true)
887            .map_err(|error| {
888                warn!(
889                    "Error applying finality to block {:?}: {}",
890                    (hash, number),
891                    error
892                );
893                error
894            })?;
895
896        if let Some(d) = &displaced
897            && !d.displaced_blocks.is_empty()
898        {
899            let count = d.displaced_blocks.len();
900            let tombstones = d
901                .displaced_blocks
902                .iter()
903                .map(|h| (crate::aux_schema::block_weight_key(h), None));
904            import_op.op.insert_aux(tombstones).map_err(|error| {
905                warn!(
906                    ?error,
907                    count, "Failed to queue displaced fork tombstones; rolling back finalization",
908                );
909                error
910            })?;
911        }
912
913        debug!("Finalizing blocks up to ({:?}, {})", number, hash);
914
915        telemetry!(
916            telemetry;
917            CONSENSUS_INFO;
918            "subspace.finalized_blocks_up_to";
919            "number" => ?number, "hash" => ?hash,
920        );
921
922        Ok(())
923    });
924}
925
926/// Create an archiver task.
927///
928/// Archiver task will listen for importing blocks and archive blocks at `K` depth, producing pieces
929/// and segment headers (segment headers are then added back to the blockchain as
930/// `store_segment_header` extrinsic).
931///
932/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
933///
934/// Archiver is only able to move forward and doesn't support reorgs. Upon restart it will check
935/// [`SegmentHeadersStore`] and chain history to reconstruct "current" state it was in before last
936/// shutdown and continue incrementally archiving blockchain history from there.
937///
938/// Archiving is triggered by block importing notification ([`SubspaceLink::block_importing_notification_stream`])
939/// and tries to archive the block at [`ChainConstants::confirmation_depth_k`](sp_consensus_subspace::ChainConstants::confirmation_depth_k)
940/// depth from the block being imported. Block import will then wait for archiver to acknowledge
941/// processing, which is necessary for ensuring that when the next block is imported, inherents will
942/// contain segment header of newly archived block (must happen exactly in the next block).
943///
944/// `create_object_mappings` controls when object mappings are created for archived blocks. When
945/// these mappings are created, a ([`SubspaceLink::object_mapping_notification_stream`])
946/// notification will be sent.
947///
948/// Once segment header is archived, notification ([`SubspaceLink::archived_segment_notification_stream`])
949/// will be sent and archiver will be paused until all receivers have provided an acknowledgement
950/// for it.
951///
952/// Archiving will be incremental during normal operation to decrease impact on block import and
953/// non-incremental heavily parallel during sync process since parallel implementation is more
954/// efficient overall and during sync only total sync time matters.
955pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
956    segment_headers_store: SegmentHeadersStore<AS>,
957    subspace_link: SubspaceLink<Block>,
958    client: Arc<Client>,
959    backend: Arc<Backend>,
960    sync_oracle: SubspaceSyncOracle<SO>,
961    telemetry: Option<TelemetryHandle>,
962    create_object_mappings: CreateObjectMappings,
963) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
964where
965    Block: BlockT,
966    Backend: BackendT<Block> + 'static,
967    Client: ProvideRuntimeApi<Block>
968        + BlockBackend<Block>
969        + HeaderBackend<Block>
970        + LockImportRun<Block, Backend>
971        + Finalizer<Block, Backend>
972        + BlockchainEvents<Block>
973        + AuxStore
974        + Send
975        + Sync
976        + 'static,
977    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
978    AS: AuxStore + Send + Sync + 'static,
979    SO: SyncOracle + Send + Sync + 'static,
980{
981    if create_object_mappings.is_enabled() {
982        info!(
983            ?create_object_mappings,
984            "Creating object mappings from the configured block onwards"
985        );
986    } else {
987        info!("Not creating object mappings");
988    }
989
990    let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
991        Some(initialize_archiver(
992            &segment_headers_store,
993            &subspace_link,
994            client.as_ref(),
995            create_object_mappings,
996        )?)
997    } else {
998        None
999    };
1000
1001    // Subscribing synchronously before returning
1002    let mut block_importing_notification_stream = subspace_link
1003        .block_importing_notification_stream
1004        .subscribe();
1005
1006    Ok(async move {
1007        let archiver = match maybe_archiver {
1008            Some(archiver) => archiver,
1009            None => initialize_archiver(
1010                &segment_headers_store,
1011                &subspace_link,
1012                client.as_ref(),
1013                create_object_mappings,
1014            )?,
1015        };
1016        let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k().into();
1017
1018        let InitializedArchiver {
1019            mut archiver,
1020            best_archived_block,
1021        } = archiver;
1022        let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block;
1023
1024        while let Some(block_importing_notification) =
1025            block_importing_notification_stream.next().await
1026        {
1027            let importing_block_number = block_importing_notification.block_number;
1028            let block_number_to_archive =
1029                match importing_block_number.checked_sub(&confirmation_depth_k) {
1030                    Some(block_number_to_archive) => block_number_to_archive,
1031                    None => {
1032                        // Too early to archive blocks
1033                        continue;
1034                    }
1035                };
1036
1037            let last_archived_block_number = segment_headers_store
1038                .last_segment_header()
1039                .expect("Exists after archiver initialization; qed")
1040                .last_archived_block()
1041                .number;
1042            let create_mappings =
1043                create_object_mappings.is_enabled_for_block(last_archived_block_number);
1044            let last_archived_block_number = NumberFor::<Block>::from(last_archived_block_number);
1045            trace!(
1046                %importing_block_number,
1047                %block_number_to_archive,
1048                %best_archived_block_number,
1049                %last_archived_block_number,
1050                "Checking if block needs to be skipped"
1051            );
1052
1053            // Skip archived blocks, unless we're producing object mappings for them
1054            let skip_last_archived_blocks =
1055                last_archived_block_number > block_number_to_archive && !create_mappings;
1056            if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
1057                // This block was already archived, skip
1058                debug!(
1059                    %importing_block_number,
1060                    %block_number_to_archive,
1061                    %best_archived_block_number,
1062                    %last_archived_block_number,
1063                    "Skipping already archived block",
1064                );
1065                continue;
1066            }
1067
1068            // In case there was a block gap re-initialize archiver and continue with current
1069            // block number (rather than block number at some depth) to allow for special sync
1070            // modes where pre-verified blocks are inserted at some point in the future comparing to
1071            // previously existing blocks
1072            if best_archived_block_number + One::one() != block_number_to_archive {
1073                InitializedArchiver {
1074                    archiver,
1075                    best_archived_block: (best_archived_block_hash, best_archived_block_number),
1076                } = initialize_archiver(
1077                    &segment_headers_store,
1078                    &subspace_link,
1079                    client.as_ref(),
1080                    create_object_mappings,
1081                )?;
1082
1083                if best_archived_block_number + One::one() == block_number_to_archive {
1084                    // As expected, can archive this block
1085                } else if best_archived_block_number >= block_number_to_archive {
1086                    // Special sync mode where verified blocks were inserted into blockchain
1087                    // directly, archiving of this block will naturally happen later
1088                    continue;
1089                } else if client
1090                    .block_hash(importing_block_number - One::one())?
1091                    .is_none()
1092                {
1093                    // We may have imported some block using special sync mode and block we're about
1094                    // to import is the first one after the gap at which archiver is supposed to be
1095                    // initialized, but we are only about to import it, so wait for the next block
1096                    // for now
1097                    continue;
1098                } else {
1099                    let error = format!(
1100                        "There was a gap in blockchain history and the last contiguous series of \
1101                        blocks starting with doesn't start with archived segment (best archived \
1102                        block number {best_archived_block_number}, block number to archive \
1103                        {block_number_to_archive}), block about to be imported \
1104                        {importing_block_number}), archiver can't continue",
1105                    );
1106                    return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1107                        error.into(),
1108                    )));
1109                }
1110            }
1111
1112            let max_segment_index_before = segment_headers_store.max_segment_index();
1113            (best_archived_block_hash, best_archived_block_number) = archive_block(
1114                &mut archiver,
1115                segment_headers_store.clone(),
1116                &*client,
1117                &sync_oracle,
1118                subspace_link.object_mapping_notification_sender.clone(),
1119                subspace_link.archived_segment_notification_sender.clone(),
1120                best_archived_block_hash,
1121                block_number_to_archive,
1122                create_object_mappings,
1123            )
1124            .await?;
1125
1126            let max_segment_index = segment_headers_store.max_segment_index();
1127            if max_segment_index_before != max_segment_index {
1128                let maybe_block_number_to_finalize = max_segment_index
1129                    // Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments
1130                    .and_then(|max_segment_index| {
1131                        max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
1132                    })
1133                    .and_then(|segment_index| {
1134                        segment_headers_store.get_segment_header(segment_index)
1135                    })
1136                    .map(|segment_header| segment_header.last_archived_block().number)
1137                    // Make sure not to finalize block number that does not yet exist (segment
1138                    // headers store may contain future blocks during initial sync)
1139                    .map(|block_number| block_number_to_archive.min(block_number.into()))
1140                    // Do not finalize blocks twice
1141                    .filter(|block_number| *block_number > client.info().finalized_number);
1142
1143                if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
1144                    {
1145                        let mut import_notification = client.every_import_notification_stream();
1146
1147                        // Drop notification to drop acknowledgement and allow block import to
1148                        // proceed
1149                        drop(block_importing_notification);
1150
1151                        while let Some(notification) = import_notification.next().await {
1152                            // Wait for importing block to finish importing
1153                            if notification.header.number() == &importing_block_number {
1154                                break;
1155                            }
1156                        }
1157                    }
1158
1159                    // Block is not guaranteed to be present this deep if we have only synced recent
1160                    // blocks.
1161                    if let Some(block_hash_to_finalize) =
1162                        client.block_hash(block_number_to_finalize)?
1163                    {
1164                        finalize_block(
1165                            &*client,
1166                            &*backend,
1167                            telemetry.as_ref(),
1168                            block_hash_to_finalize,
1169                            block_number_to_finalize,
1170                        );
1171                    }
1172                }
1173            }
1174        }
1175
1176        Ok(())
1177    })
1178}
1179
1180/// Tries to archive `block_number` and returns new (or old if not changed) best archived block
1181#[allow(clippy::too_many_arguments)]
1182async fn archive_block<Block, Backend, Client, AS, SO>(
1183    archiver: &mut Archiver,
1184    segment_headers_store: SegmentHeadersStore<AS>,
1185    client: &Client,
1186    sync_oracle: &SubspaceSyncOracle<SO>,
1187    object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
1188    archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
1189    best_archived_block_hash: Block::Hash,
1190    block_number_to_archive: NumberFor<Block>,
1191    create_object_mappings: CreateObjectMappings,
1192) -> sp_blockchain::Result<(Block::Hash, NumberFor<Block>)>
1193where
1194    Block: BlockT,
1195    Backend: BackendT<Block>,
1196    Client: ProvideRuntimeApi<Block>
1197        + BlockBackend<Block>
1198        + HeaderBackend<Block>
1199        + LockImportRun<Block, Backend>
1200        + Finalizer<Block, Backend>
1201        + AuxStore
1202        + Send
1203        + Sync
1204        + 'static,
1205    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
1206    AS: AuxStore + Send + Sync + 'static,
1207    SO: SyncOracle + Send + Sync + 'static,
1208{
1209    let block = client
1210        .block(
1211            client
1212                .block_hash(block_number_to_archive)?
1213                .expect("Older block by number must always exist"),
1214        )?
1215        .expect("Older block by number must always exist");
1216
1217    let parent_block_hash = *block.block.header().parent_hash();
1218    let block_hash_to_archive = block.block.hash();
1219
1220    debug!(
1221        "Archiving block {:?} ({})",
1222        block_number_to_archive, block_hash_to_archive
1223    );
1224
1225    if parent_block_hash != best_archived_block_hash {
1226        let error = format!(
1227            "Attempt to switch to a different fork beyond archiving depth, \
1228            can't do it: parent block hash {parent_block_hash}, best archived block hash {best_archived_block_hash}"
1229        );
1230        return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1231            error.into(),
1232        )));
1233    }
1234
1235    let create_mappings = create_object_mappings.is_enabled_for_block(
1236        block_number_to_archive.try_into().unwrap_or_else(|_| {
1237            unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed")
1238        }),
1239    );
1240
1241    let block_object_mappings = if create_mappings {
1242        client
1243            .runtime_api()
1244            .extract_block_object_mapping(parent_block_hash, block.block.clone())
1245            .map_err(|error| {
1246                sp_blockchain::Error::Application(
1247                    format!("Failed to retrieve block object mappings: {error}").into(),
1248                )
1249            })?
1250    } else {
1251        BlockObjectMapping::default()
1252    };
1253
1254    let encoded_block = encode_block(block);
1255    debug!(
1256        "Encoded block {} has size of {}",
1257        block_number_to_archive,
1258        bytesize::ByteSize::b(encoded_block.len() as u64)
1259            .display()
1260            .iec(),
1261    );
1262
1263    let block_outcome = archiver.add_block(
1264        encoded_block,
1265        block_object_mappings,
1266        !sync_oracle.is_major_syncing(),
1267    );
1268    send_object_mapping_notification(
1269        &object_mapping_notification_sender,
1270        block_outcome.object_mapping,
1271        block_number_to_archive,
1272    );
1273    for archived_segment in block_outcome.archived_segments {
1274        let segment_header = archived_segment.segment_header;
1275
1276        segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
1277
1278        send_archived_segment_notification(&archived_segment_notification_sender, archived_segment)
1279            .await;
1280    }
1281
1282    Ok((block_hash_to_archive, block_number_to_archive))
1283}
1284
1285fn send_object_mapping_notification<BlockNum>(
1286    object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
1287    object_mapping: Vec<GlobalObject>,
1288    block_number: BlockNum,
1289) where
1290    BlockNum: BlockNumberT,
1291{
1292    if object_mapping.is_empty() {
1293        return;
1294    }
1295
1296    let block_number = TryInto::<BlockNumber>::try_into(block_number).unwrap_or_else(|_| {
1297        unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
1298    });
1299
1300    let object_mapping_notification = ObjectMappingNotification {
1301        object_mapping,
1302        block_number,
1303    };
1304
1305    object_mapping_notification_sender.notify(move || object_mapping_notification);
1306}
1307
1308async fn send_archived_segment_notification(
1309    archived_segment_notification_sender: &SubspaceNotificationSender<ArchivedSegmentNotification>,
1310    archived_segment: NewArchivedSegment,
1311) {
1312    let segment_index = archived_segment.segment_header.segment_index();
1313    let (acknowledgement_sender, mut acknowledgement_receiver) =
1314        tracing_unbounded::<()>("subspace_acknowledgement", 1000);
1315    // Keep `archived_segment` around until all acknowledgements are received since some receivers
1316    // might use weak references
1317    let archived_segment = Arc::new(archived_segment);
1318    let archived_segment_notification = ArchivedSegmentNotification {
1319        archived_segment: Arc::clone(&archived_segment),
1320        acknowledgement_sender,
1321    };
1322
1323    archived_segment_notification_sender.notify(move || archived_segment_notification);
1324
1325    let wait_fut = async {
1326        while acknowledgement_receiver.next().await.is_some() {
1327            debug!(
1328                "Archived segment notification acknowledged: {}",
1329                segment_index
1330            );
1331        }
1332    };
1333
1334    if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
1335        .await
1336        .is_err()
1337    {
1338        warn!(
1339            "Archived segment notification was not acknowledged and reached timeout, continue \
1340            regardless"
1341        );
1342    }
1343}