Skip to main content

sc_consensus_subspace/
archiver.rs

1//! Consensus archiver responsible for archival of blockchain history, it is driven by block import
2//! pipeline.
3//!
4//! Implements archiving process in Subspace blockchain that converts blockchain history (blocks)
5//! into archived history (pieces).
6//!
7//! The main entry point here is [`create_subspace_archiver`] that will create a task, which while
8//! driven will perform the archiving itself.
9//!
10//! Archiving is triggered by block importing notification ([`SubspaceLink::block_importing_notification_stream`])
11//! and tries to archive the block at [`ChainConstants::confirmation_depth_k`](sp_consensus_subspace::ChainConstants::confirmation_depth_k)
12//! depth from the block being imported. Block import will then wait for archiver to acknowledge
13//! processing, which is necessary for ensuring that when the next block is imported, inherents will
14//! contain segment header of newly archived block (must happen exactly in the next block).
15//!
16//! Archiving itself will also wait for acknowledgement by various subscribers before proceeding,
17//! which includes farmer subscription, in case of reference implementation via RPC
18//! (`sc-consensus-subspace-rpc`), but could also be in other ways.
19//!
20//! [`SegmentHeadersStore`] is maintained as a data structure containing all known (including future
21//! in case of syncing) segment headers. This data structure contents is then made available to
22//! other parts of the protocol that need to know what correct archival history of the blockchain
23//! looks like. For example, it is used during node sync and farmer plotting in order to verify
24//! pieces of archival history received from other network participants.
25//!
26//! [`recreate_genesis_segment`] is a bit of a hack and is useful for deriving of the genesis
27//! segment that is special case since we don't have enough data in the blockchain history itself
28//! during genesis in order to do the archiving.
29//!
30//! [`encode_block`] and [`decode_block`] are symmetric encoding/decoding functions turning
31//! [`SignedBlock`]s into bytes and back.
32
33#[cfg(test)]
34mod tests;
35
36use crate::slot_worker::SubspaceSyncOracle;
37use crate::{SubspaceLink, SubspaceNotificationSender};
38use futures::StreamExt;
39use parity_scale_codec::{Decode, Encode};
40use parking_lot::RwLock;
41use rand::prelude::*;
42use rand_chacha::ChaCha8Rng;
43use rayon::ThreadPoolBuilder;
44use rayon::prelude::*;
45use sc_client_api::{
46    AuxStore, Backend as BackendT, BlockBackend, BlockImportOperation, BlockchainEvents, Finalizer,
47    LockImportRun,
48};
49use sc_telemetry::{CONSENSUS_INFO, TelemetryHandle, telemetry};
50use sc_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
51use sp_api::ProvideRuntimeApi;
52use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
53use sp_consensus::SyncOracle;
54use sp_consensus_subspace::{SubspaceApi, SubspaceJustification};
55use sp_objects::ObjectsApi;
56use sp_runtime::Justifications;
57use sp_runtime::generic::SignedBlock;
58use sp_runtime::traits::{
59    Block as BlockT, BlockNumber as BlockNumberT, CheckedSub, Header, NumberFor, One, Zero,
60};
61use std::error::Error;
62use std::future::Future;
63use std::num::NonZeroU32;
64use std::slice;
65use std::sync::Arc;
66use std::sync::atomic::{AtomicU16, Ordering};
67use std::time::Duration;
68use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
69use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject};
70use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
71use subspace_core_primitives::{BlockNumber, PublicKey};
72use subspace_erasure_coding::ErasureCoding;
73use subspace_kzg::Kzg;
74use tracing::{debug, info, trace, warn};
75
76/// Number of WASM instances is 8, this is a bit lower to avoid warnings exceeding number of
77/// instances
78const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 6;
79/// Do not wait for acknowledgements beyond this time limit
80const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
81
82/// How deep (in segments) should block be in order to be finalized.
83///
84/// This is required for full nodes to not prune recent history such that keep-up sync in Substrate
85/// works even without archival nodes (initial sync will be done from DSN).
86///
87/// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
88/// Substrate and is not worth it right now.
89/// https://github.com/paritytech/substrate/discussions/14359
90pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
91
92#[derive(Debug)]
93struct SegmentHeadersStoreInner<AS> {
94    aux_store: Arc<AS>,
95    next_key_index: AtomicU16,
96    /// In-memory cache of segment headers
97    cache: RwLock<Vec<SegmentHeader>>,
98}
99
100/// Persistent storage of segment headers.
101///
102/// It maintains all known segment headers. During sync from DSN it is possible that this data structure contains
103/// segment headers that from the point of view of the tip of the current chain are "in the future". This is expected
104/// and must be accounted for in the archiver and other places.
105///
106/// Segment headers are stored in batches (which is more efficient to store and retrieve). Each next batch contains
107/// distinct segment headers with monotonically increasing segment indices. During instantiation all previously stored
108/// batches will be read and in-memory representation of the whole contents will be created such that queries to this
109/// data structure are quick and not involving any disk I/O.
110#[derive(Debug)]
111pub struct SegmentHeadersStore<AS> {
112    inner: Arc<SegmentHeadersStoreInner<AS>>,
113    confirmation_depth_k: BlockNumber,
114}
115
116impl<AS> Clone for SegmentHeadersStore<AS> {
117    fn clone(&self) -> Self {
118        Self {
119            inner: Arc::clone(&self.inner),
120            confirmation_depth_k: self.confirmation_depth_k,
121        }
122    }
123}
124
125impl<AS> SegmentHeadersStore<AS>
126where
127    AS: AuxStore,
128{
129    const KEY_PREFIX: &'static [u8] = b"segment-headers";
130    const INITIAL_CACHE_CAPACITY: usize = 1_000;
131
132    /// Create new instance
133    pub fn new(
134        aux_store: Arc<AS>,
135        confirmation_depth_k: BlockNumber,
136    ) -> sp_blockchain::Result<Self> {
137        let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
138
139        debug!("Started loading segment headers into cache");
140        // Segment headers are stored in batches (which is more efficient to store and retrieve), this is why code deals
141        // with key indices here rather that segment indices. Essentially this iterates over keys from 0 until missing
142        // entry is hit, which becomes the next key index where additional segment headers will be stored.
143        let mut next_key_index = 0;
144        while let Some(segment_headers) =
145            aux_store
146                .get_aux(&Self::key(next_key_index))?
147                .map(|segment_header| {
148                    Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
149                        .expect("Always correct segment header unless DB is corrupted; qed")
150                })
151        {
152            cache.extend(segment_headers);
153            next_key_index += 1;
154        }
155        debug!("Finished loading segment headers into cache");
156
157        Ok(Self {
158            inner: Arc::new(SegmentHeadersStoreInner {
159                aux_store,
160                next_key_index: AtomicU16::new(next_key_index),
161                cache: RwLock::new(cache),
162            }),
163            confirmation_depth_k,
164        })
165    }
166
167    /// Returns last observed segment header
168    pub fn last_segment_header(&self) -> Option<SegmentHeader> {
169        self.inner.cache.read().last().cloned()
170    }
171
172    /// Returns last observed segment index
173    pub fn max_segment_index(&self) -> Option<SegmentIndex> {
174        let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
175        Some(SegmentIndex::from(segment_index))
176    }
177
178    /// Add segment headers.
179    ///
180    /// Multiple can be inserted for efficiency purposes.
181    pub fn add_segment_headers(
182        &self,
183        segment_headers: &[SegmentHeader],
184    ) -> sp_blockchain::Result<()> {
185        let mut maybe_last_segment_index = self.max_segment_index();
186        let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
187        // Check all input segment headers to see which ones are not stored yet and verifying that segment indices are
188        // monotonically increasing
189        for segment_header in segment_headers {
190            let segment_index = segment_header.segment_index();
191            match maybe_last_segment_index {
192                Some(last_segment_index) => {
193                    if segment_index <= last_segment_index {
194                        // Skip already stored segment headers
195                        continue;
196                    }
197
198                    if segment_index != last_segment_index + SegmentIndex::ONE {
199                        let error = format!(
200                            "Segment index {segment_index} must strictly follow {last_segment_index}, can't store segment header"
201                        );
202                        return Err(sp_blockchain::Error::Application(error.into()));
203                    }
204
205                    segment_headers_to_store.push(segment_header);
206                    maybe_last_segment_index.replace(segment_index);
207                }
208                None => {
209                    if segment_index != SegmentIndex::ZERO {
210                        let error = format!(
211                            "First segment header index must be zero, found index {segment_index}"
212                        );
213                        return Err(sp_blockchain::Error::Application(error.into()));
214                    }
215
216                    segment_headers_to_store.push(segment_header);
217                    maybe_last_segment_index.replace(segment_index);
218                }
219            }
220        }
221
222        if segment_headers_to_store.is_empty() {
223            return Ok(());
224        }
225
226        // Insert all new segment headers into vacant key index for efficiency purposes
227        // TODO: Do compaction when we have too many keys: combine multiple segment headers into a
228        //  single entry for faster retrievals and more compact storage
229        {
230            let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
231            let key = Self::key(key_index);
232            let value = segment_headers_to_store.encode();
233            let insert_data = vec![(key.as_slice(), value.as_slice())];
234
235            self.inner.aux_store.insert_aux(&insert_data, &[])?;
236        }
237        self.inner.cache.write().extend(segment_headers_to_store);
238
239        Ok(())
240    }
241
242    /// Get a single segment header
243    pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
244        self.inner
245            .cache
246            .read()
247            .get(u64::from(segment_index) as usize)
248            .copied()
249    }
250
251    fn key(key_index: u16) -> Vec<u8> {
252        (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
253    }
254
255    /// Get segment headers that are expected to be included at specified block number.
256    pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
257        let Some(last_segment_index) = self.max_segment_index() else {
258            // Not initialized
259            return Vec::new();
260        };
261
262        // Special case for the initial segment (for genesis block).
263        if block_number == 1 {
264            // If there is a segment index present, and we store monotonically increasing segment
265            // headers, then the first header exists.
266            return vec![
267                self.get_segment_header(SegmentIndex::ZERO)
268                    .expect("Segment headers are stored in monotonically increasing order; qed"),
269            ];
270        }
271
272        if last_segment_index == SegmentIndex::ZERO {
273            // Genesis segment already included in block #1
274            return Vec::new();
275        }
276
277        let mut current_segment_index = last_segment_index;
278        loop {
279            // If the current segment index present, and we store monotonically increasing segment
280            // headers, then the current segment header exists as well.
281            let current_segment_header = self
282                .get_segment_header(current_segment_index)
283                .expect("Segment headers are stored in monotonically increasing order; qed");
284
285            // The block immediately after the archived segment adding the confirmation depth
286            let target_block_number =
287                current_segment_header.last_archived_block().number + 1 + self.confirmation_depth_k;
288            if target_block_number == block_number {
289                let mut headers_for_block = vec![current_segment_header];
290
291                // Check block spanning multiple segments
292                let last_archived_block_number =
293                    current_segment_header.last_archived_block().number;
294                let mut segment_index = current_segment_index - SegmentIndex::ONE;
295
296                while let Some(segment_header) = self.get_segment_header(segment_index) {
297                    if segment_header.last_archived_block().number == last_archived_block_number {
298                        headers_for_block.insert(0, segment_header);
299                        segment_index -= SegmentIndex::ONE;
300                    } else {
301                        break;
302                    }
303                }
304
305                return headers_for_block;
306            }
307
308            // iterate segments further
309            if target_block_number > block_number {
310                // no need to check the initial segment
311                if current_segment_index > SegmentIndex::ONE {
312                    current_segment_index -= SegmentIndex::ONE
313                } else {
314                    break;
315                }
316            } else {
317                // No segment headers required
318                return Vec::new();
319            }
320        }
321
322        // No segment headers required
323        Vec::new()
324    }
325}
326
327/// Notification with block header hash that needs to be signed and sender for signature.
328#[derive(Debug, Clone)]
329pub struct ArchivedSegmentNotification {
330    /// Archived segment.
331    pub archived_segment: Arc<NewArchivedSegment>,
332    /// Sender that signified the fact of receiving archived segment by farmer.
333    ///
334    /// This must be used to send a message or else block import pipeline will get stuck.
335    pub acknowledgement_sender: TracingUnboundedSender<()>,
336}
337
338/// Notification with incrementally generated object mappings for a block (and any previous block
339/// continuation)
340#[derive(Debug, Clone)]
341pub struct ObjectMappingNotification {
342    /// Incremental object mappings for a block (and any previous block continuation).
343    ///
344    /// The archived data won't be available in pieces until the entire segment is full and archived.
345    pub object_mapping: Vec<GlobalObject>,
346    /// The block that these mappings are from.
347    pub block_number: BlockNumber,
348    // TODO: add an acknowledgement_sender for backpressure if needed
349}
350
351/// Whether to create object mappings.
352#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
353pub enum CreateObjectMappings {
354    /// Start creating object mappings from this block number.
355    ///
356    /// This can be lower than the latest archived block, but must be greater than genesis.
357    ///
358    /// The genesis block doesn't have mappings, so starting mappings at genesis is pointless.
359    /// The archiver will fail if it can't get the data for this block, but snap sync doesn't store
360    /// the genesis data on disk.  So avoiding genesis also avoids this error.
361    /// <https://github.com/paritytech/polkadot-sdk/issues/5366>
362    Block(NonZeroU32),
363
364    /// Create object mappings as archiving is happening.
365    Yes,
366
367    /// Don't create object mappings.
368    #[default]
369    No,
370}
371
372impl CreateObjectMappings {
373    /// The fixed block number to start creating object mappings from.
374    /// If there is no fixed block number, or mappings are disabled, returns None.
375    fn block(&self) -> Option<BlockNumber> {
376        match self {
377            CreateObjectMappings::Block(block) => Some(block.get()),
378            CreateObjectMappings::Yes => None,
379            CreateObjectMappings::No => None,
380        }
381    }
382
383    /// Returns true if object mappings will be created from a past or future block.
384    pub fn is_enabled(&self) -> bool {
385        !matches!(self, CreateObjectMappings::No)
386    }
387
388    /// Does the supplied block number need object mappings?
389    pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
390        if !self.is_enabled() {
391            return false;
392        }
393
394        if let Some(target_block) = self.block() {
395            return block >= target_block;
396        }
397
398        // We're continuing where we left off, so all blocks get mappings.
399        true
400    }
401}
402
403fn find_last_archived_block<Block, Client, AS>(
404    client: &Client,
405    segment_headers_store: &SegmentHeadersStore<AS>,
406    best_block_to_archive: NumberFor<Block>,
407    create_object_mappings: bool,
408) -> sp_blockchain::Result<Option<(SegmentHeader, SignedBlock<Block>, BlockObjectMapping)>>
409where
410    Block: BlockT,
411    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
412    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
413    AS: AuxStore,
414{
415    let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
416        return Ok(None);
417    };
418
419    if max_segment_index == SegmentIndex::ZERO {
420        // Just genesis, nothing else to check
421        return Ok(None);
422    }
423
424    for segment_header in (SegmentIndex::ZERO..=max_segment_index)
425        .rev()
426        .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
427    {
428        let last_archived_block_number = segment_header.last_archived_block().number;
429
430        if NumberFor::<Block>::from(last_archived_block_number) > best_block_to_archive {
431            // Last archived block in segment header is too high for current state of the chain
432            // (segment headers store may know about more blocks in existence than is currently
433            // imported)
434            continue;
435        }
436        let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
437            // This block number is not in our chain yet (segment headers store may know about more
438            // blocks in existence than is currently imported)
439            continue;
440        };
441
442        let Some(last_archived_block) = client.block(last_archived_block_hash)? else {
443            // This block data was already pruned (but the headers weren't)
444            continue;
445        };
446
447        // If we're starting mapping creation at this block, return its mappings.
448        let block_object_mappings = if create_object_mappings {
449            client
450                .runtime_api()
451                .extract_block_object_mapping(
452                    *last_archived_block.block.header().parent_hash(),
453                    last_archived_block.block.clone(),
454                )
455                .unwrap_or_default()
456        } else {
457            BlockObjectMapping::default()
458        };
459
460        return Ok(Some((
461            segment_header,
462            last_archived_block,
463            block_object_mappings,
464        )));
465    }
466
467    Ok(None)
468}
469
470/// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned
471pub fn recreate_genesis_segment<Block, Client>(
472    client: &Client,
473    kzg: Kzg,
474    erasure_coding: ErasureCoding,
475) -> Result<Option<NewArchivedSegment>, Box<dyn Error>>
476where
477    Block: BlockT,
478    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
479    Client::Api: ObjectsApi<Block>,
480{
481    let genesis_hash = client.info().genesis_hash;
482    let Some(signed_block) = client.block(genesis_hash)? else {
483        return Ok(None);
484    };
485
486    let encoded_block = encode_block(signed_block);
487
488    // There are no mappings in the genesis block, so they can be ignored
489    let block_outcome = Archiver::new(kzg, erasure_coding).add_block(
490        encoded_block,
491        BlockObjectMapping::default(),
492        false,
493    );
494    let new_archived_segment = block_outcome
495        .archived_segments
496        .into_iter()
497        .next()
498        .expect("Genesis block always results in exactly one archived segment; qed");
499
500    Ok(Some(new_archived_segment))
501}
502
503struct InitializedArchiver<Block>
504where
505    Block: BlockT,
506{
507    archiver: Archiver,
508    best_archived_block: (Block::Hash, NumberFor<Block>),
509}
510
511/// Encode block for archiving purposes.
512///
513/// Only specific Subspace justifications are included in the encoding, determined by result of
514/// [`SubspaceJustification::must_be_archived`], other justifications are filtered-out.
515pub fn encode_block<Block>(mut signed_block: SignedBlock<Block>) -> Vec<u8>
516where
517    Block: BlockT,
518{
519    if signed_block.block.header().number().is_zero() {
520        let mut encoded_block = signed_block.encode();
521
522        let encoded_block_length = encoded_block.len();
523
524        // We extend encoding of genesis block with extra data such that the very first archived
525        // segment can be produced right away, bootstrapping the farming process.
526        //
527        // Note: we add it to the end of the encoded block, so during decoding it'll actually be
528        // ignored (unless `DecodeAll::decode_all()` is used) even though it is technically present
529        // in encoded form.
530        encoded_block.resize(RecordedHistorySegment::SIZE, 0);
531        let mut rng = ChaCha8Rng::from_seed(
532            signed_block
533                .block
534                .header()
535                .state_root()
536                .as_ref()
537                .try_into()
538                .expect("State root in Subspace must be 32 bytes, panic otherwise; qed"),
539        );
540        rng.fill(&mut encoded_block[encoded_block_length..]);
541
542        encoded_block
543    } else {
544        // Filter out non-canonical justifications
545        if let Some(justifications) = signed_block.justifications.take() {
546            let mut filtered_justifications = justifications.into_iter().filter(|justification| {
547                // Only Subspace justifications are to be archived
548                let Some(subspace_justification) =
549                    SubspaceJustification::try_from_justification(justification)
550                        .and_then(|subspace_justification| subspace_justification.ok())
551                else {
552                    return false;
553                };
554
555                subspace_justification.must_be_archived()
556            });
557
558            if let Some(first_justification) = filtered_justifications.next() {
559                let mut justifications = Justifications::from(first_justification);
560
561                for justification in filtered_justifications {
562                    justifications.append(justification);
563                }
564
565                signed_block.justifications = Some(justifications);
566            }
567        }
568
569        signed_block.encode()
570    }
571}
572
573/// Symmetrical to [`encode_block()`], used to decode previously encoded blocks
574pub fn decode_block<Block>(
575    mut encoded_block: &[u8],
576) -> Result<SignedBlock<Block>, parity_scale_codec::Error>
577where
578    Block: BlockT,
579{
580    SignedBlock::<Block>::decode(&mut encoded_block)
581}
582
583fn initialize_archiver<Block, Client, AS>(
584    segment_headers_store: &SegmentHeadersStore<AS>,
585    subspace_link: &SubspaceLink<Block>,
586    client: &Client,
587    create_object_mappings: CreateObjectMappings,
588) -> sp_blockchain::Result<InitializedArchiver<Block>>
589where
590    Block: BlockT,
591    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
592    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
593    AS: AuxStore,
594{
595    let client_info = client.info();
596    let best_block_number = TryInto::<BlockNumber>::try_into(client_info.best_number)
597        .unwrap_or_else(|_| {
598            unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
599        });
600
601    let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k();
602
603    let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
604    // Choose a lower block number if we want to get mappings from that specific block.
605    // If we are continuing from where we left off, we don't need to change the block number to archive.
606    // If there is no path to this block from the tip due to snap sync, we'll start archiving from
607    // an earlier segment, then start mapping again once archiving reaches this block.
608    if let Some(block_number) = create_object_mappings.block() {
609        // There aren't any mappings in the genesis block, so starting there is pointless.
610        // (And causes errors on restart, because genesis block data is never stored during snap sync.)
611        best_block_to_archive = best_block_to_archive.min(block_number);
612    }
613
614    if (best_block_to_archive..best_block_number)
615        .any(|block_number| client.hash(block_number.into()).ok().flatten().is_none())
616    {
617        // If there are blocks missing headers between best block to archive and best block of the
618        // blockchain it means newer block was inserted in some special way and as such is by
619        // definition valid, so we can simply assume that is our best block to archive instead
620        best_block_to_archive = best_block_number;
621    }
622
623    // If the user chooses an object mapping start block we don't have data or state for, we can't
624    // create mappings for it, so the node must exit with an error. We ignore genesis here, because
625    // it doesn't have mappings.
626    if create_object_mappings.is_enabled() && best_block_to_archive >= 1 {
627        let Some(best_block_to_archive_hash) = client.hash(best_block_to_archive.into())? else {
628            let error = format!(
629                "Missing hash for mapping block {best_block_to_archive}, \
630                try a higher block number, or wipe your node and restart with `--sync full`"
631            );
632            return Err(sp_blockchain::Error::Application(error.into()));
633        };
634
635        let Some(best_block_data) = client.block(best_block_to_archive_hash)? else {
636            let error = format!(
637                "Missing data for mapping block {best_block_to_archive} \
638                hash {best_block_to_archive_hash}, \
639                try a higher block number, or wipe your node and restart with `--sync full`"
640            );
641            return Err(sp_blockchain::Error::Application(error.into()));
642        };
643
644        // Similarly, state can be pruned, even if the data is present
645        client
646            .runtime_api()
647            .extract_block_object_mapping(
648                *best_block_data.block.header().parent_hash(),
649                best_block_data.block.clone(),
650            )
651            .map_err(|error| {
652                sp_blockchain::Error::Application(
653                    format!(
654                        "Missing state for mapping block {best_block_to_archive} \
655                        hash {best_block_to_archive_hash}: {error}, \
656                        try a higher block number, or wipe your node and restart with `--sync full`"
657                    )
658                    .into(),
659                )
660            })?;
661    }
662
663    let maybe_last_archived_block = find_last_archived_block(
664        client,
665        segment_headers_store,
666        best_block_to_archive.into(),
667        create_object_mappings.is_enabled(),
668    )?;
669
670    let have_last_segment_header = maybe_last_archived_block.is_some();
671    let mut best_archived_block = None;
672
673    let mut archiver =
674        if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
675            maybe_last_archived_block
676        {
677            // Continuing from existing initial state
678            let last_archived_block_number = last_segment_header.last_archived_block().number;
679            info!(
680                %last_archived_block_number,
681                "Resuming archiver from last archived block",
682            );
683
684            // Set initial value, this is needed in case only genesis block was archived and there
685            // is nothing else available
686            best_archived_block.replace((
687                last_archived_block.block.hash(),
688                *last_archived_block.block.header().number(),
689            ));
690
691            let last_archived_block_encoded = encode_block(last_archived_block);
692
693            Archiver::with_initial_state(
694                subspace_link.kzg().clone(),
695                subspace_link.erasure_coding().clone(),
696                last_segment_header,
697                &last_archived_block_encoded,
698                block_object_mappings,
699            )
700            .map_err(|error| {
701                sp_blockchain::Error::Application(
702                    format!("Incorrect parameters for archiver: {error:?} {last_segment_header:?}")
703                        .into(),
704                )
705            })?
706        } else {
707            info!("Starting archiving from genesis");
708
709            Archiver::new(
710                subspace_link.kzg().clone(),
711                subspace_link.erasure_coding().clone(),
712            )
713        };
714
715    // Process blocks since last fully archived block up to the current head minus K
716    {
717        let blocks_to_archive_from = archiver
718            .last_archived_block_number()
719            .map(|n| n + 1)
720            .unwrap_or_default();
721        let blocks_to_archive_to = best_block_number
722            .checked_sub(confirmation_depth_k)
723            .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
724            .or({
725                if have_last_segment_header {
726                    None
727                } else {
728                    // If not continuation, archive genesis block
729                    Some(0)
730                }
731            });
732
733        if let Some(blocks_to_archive_to) = blocks_to_archive_to {
734            info!(
735                "Archiving already produced blocks {}..={}",
736                blocks_to_archive_from, blocks_to_archive_to,
737            );
738
739            let thread_pool = ThreadPoolBuilder::new()
740                .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
741                .build()
742                .map_err(|error| {
743                    sp_blockchain::Error::Backend(format!(
744                        "Failed to create thread pool for archiver initialization: {error}"
745                    ))
746                })?;
747            // We need to limit number of threads to avoid running out of WASM instances
748            let blocks_to_archive = thread_pool.install(|| {
749                (blocks_to_archive_from..=blocks_to_archive_to)
750                    .into_par_iter()
751                    .map_init(
752                        || client.runtime_api(),
753                        |runtime_api, block_number| {
754                            let block_hash = client
755                                .hash(block_number.into())?
756                                .expect("All blocks since last archived must be present; qed");
757
758                            let block = client
759                                .block(block_hash)?
760                                .expect("All blocks since last archived must be present; qed");
761
762                            let block_object_mappings =
763                                if create_object_mappings.is_enabled_for_block(block_number) {
764                                    runtime_api
765                                        .extract_block_object_mapping(
766                                            *block.block.header().parent_hash(),
767                                            block.block.clone(),
768                                        )
769                                        .unwrap_or_default()
770                                } else {
771                                    BlockObjectMapping::default()
772                                };
773
774                            Ok((block, block_object_mappings))
775                        },
776                    )
777                    .collect::<sp_blockchain::Result<Vec<(SignedBlock<_>, _)>>>()
778            })?;
779
780            best_archived_block =
781                blocks_to_archive
782                    .last()
783                    .map(|(block, _block_object_mappings)| {
784                        (block.block.hash(), *block.block.header().number())
785                    });
786
787            for (signed_block, block_object_mappings) in blocks_to_archive {
788                let block_number_to_archive = *signed_block.block.header().number();
789                let encoded_block = encode_block(signed_block);
790
791                debug!(
792                    "Encoded block {} has size of {}",
793                    block_number_to_archive,
794                    bytesize::to_string(encoded_block.len() as u64, true),
795                );
796
797                let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false);
798                send_object_mapping_notification(
799                    &subspace_link.object_mapping_notification_sender,
800                    block_outcome.object_mapping,
801                    block_number_to_archive,
802                );
803                let new_segment_headers: Vec<SegmentHeader> = block_outcome
804                    .archived_segments
805                    .iter()
806                    .map(|archived_segment| archived_segment.segment_header)
807                    .collect();
808
809                if !new_segment_headers.is_empty() {
810                    segment_headers_store.add_segment_headers(&new_segment_headers)?;
811                }
812            }
813        }
814    }
815
816    Ok(InitializedArchiver {
817        archiver,
818        best_archived_block: best_archived_block
819            .expect("Must always set if there is no logical error; qed"),
820    })
821}
822
823fn finalize_block<Block, Backend, Client>(
824    client: &Client,
825    backend: &Backend,
826    telemetry: Option<&TelemetryHandle>,
827    hash: Block::Hash,
828    number: NumberFor<Block>,
829) where
830    Block: BlockT,
831    Backend: BackendT<Block>,
832    Client: LockImportRun<Block, Backend> + Finalizer<Block, Backend> + HeaderBackend<Block>,
833{
834    if number.is_zero() {
835        // Block zero is finalized already and generates unnecessary warning if called again
836        return;
837    }
838
839    let displaced = match client.header(hash) {
840        Ok(Some(header)) => {
841            let parent_hash = *header.parent_hash();
842            match backend
843                .blockchain()
844                .displaced_leaves_after_finalizing(hash, number, parent_hash)
845            {
846                Ok(d) => Some(d),
847                Err(error) => {
848                    warn!(
849                        ?error,
850                        ?hash,
851                        ?parent_hash,
852                        ?number,
853                        "Failed to compute displaced leaves; their block_weight entries will leak"
854                    );
855                    None
856                }
857            }
858        }
859        Ok(None) => {
860            warn!(
861                ?hash,
862                ?number,
863                "Header missing for finalized hash; skipping displaced-fork cleanup"
864            );
865            None
866        }
867        Err(error) => {
868            warn!(
869                ?error,
870                ?hash,
871                ?number,
872                "Failed to load header for finalized hash; skipping displaced-fork cleanup"
873            );
874            None
875        }
876    };
877
878    // We don't have anything useful to do with this result yet, the only source of errors was
879    // logged already inside
880    let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
881        // Ideally some handle to a synchronization oracle would be used to avoid unconditionally
882        // notifying.
883        client
884            .apply_finality(import_op, hash, None, true)
885            .map_err(|error| {
886                warn!(
887                    "Error applying finality to block {:?}: {}",
888                    (hash, number),
889                    error
890                );
891                error
892            })?;
893
894        if let Some(d) = &displaced
895            && !d.displaced_blocks.is_empty()
896        {
897            let count = d.displaced_blocks.len();
898            let tombstones = d
899                .displaced_blocks
900                .iter()
901                .map(|h| (crate::aux_schema::block_weight_key(h), None));
902            import_op.op.insert_aux(tombstones).map_err(|error| {
903                warn!(
904                    ?error,
905                    count, "Failed to queue displaced fork tombstones; rolling back finalization",
906                );
907                error
908            })?;
909        }
910
911        debug!("Finalizing blocks up to ({:?}, {})", number, hash);
912
913        telemetry!(
914            telemetry;
915            CONSENSUS_INFO;
916            "subspace.finalized_blocks_up_to";
917            "number" => ?number, "hash" => ?hash,
918        );
919
920        Ok(())
921    });
922}
923
924/// Create an archiver task.
925///
926/// Archiver task will listen for importing blocks and archive blocks at `K` depth, producing pieces
927/// and segment headers (segment headers are then added back to the blockchain as
928/// `store_segment_header` extrinsic).
929///
930/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
931///
932/// Archiver is only able to move forward and doesn't support reorgs. Upon restart it will check
933/// [`SegmentHeadersStore`] and chain history to reconstruct "current" state it was in before last
934/// shutdown and continue incrementally archiving blockchain history from there.
935///
936/// Archiving is triggered by block importing notification ([`SubspaceLink::block_importing_notification_stream`])
937/// and tries to archive the block at [`ChainConstants::confirmation_depth_k`](sp_consensus_subspace::ChainConstants::confirmation_depth_k)
938/// depth from the block being imported. Block import will then wait for archiver to acknowledge
939/// processing, which is necessary for ensuring that when the next block is imported, inherents will
940/// contain segment header of newly archived block (must happen exactly in the next block).
941///
942/// `create_object_mappings` controls when object mappings are created for archived blocks. When
943/// these mappings are created, a ([`SubspaceLink::object_mapping_notification_stream`])
944/// notification will be sent.
945///
946/// Once segment header is archived, notification ([`SubspaceLink::archived_segment_notification_stream`])
947/// will be sent and archiver will be paused until all receivers have provided an acknowledgement
948/// for it.
949///
950/// Archiving will be incremental during normal operation to decrease impact on block import and
951/// non-incremental heavily parallel during sync process since parallel implementation is more
952/// efficient overall and during sync only total sync time matters.
953pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
954    segment_headers_store: SegmentHeadersStore<AS>,
955    subspace_link: SubspaceLink<Block>,
956    client: Arc<Client>,
957    backend: Arc<Backend>,
958    sync_oracle: SubspaceSyncOracle<SO>,
959    telemetry: Option<TelemetryHandle>,
960    create_object_mappings: CreateObjectMappings,
961) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
962where
963    Block: BlockT,
964    Backend: BackendT<Block> + 'static,
965    Client: ProvideRuntimeApi<Block>
966        + BlockBackend<Block>
967        + HeaderBackend<Block>
968        + LockImportRun<Block, Backend>
969        + Finalizer<Block, Backend>
970        + BlockchainEvents<Block>
971        + AuxStore
972        + Send
973        + Sync
974        + 'static,
975    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
976    AS: AuxStore + Send + Sync + 'static,
977    SO: SyncOracle + Send + Sync + 'static,
978{
979    if create_object_mappings.is_enabled() {
980        info!(
981            ?create_object_mappings,
982            "Creating object mappings from the configured block onwards"
983        );
984    } else {
985        info!("Not creating object mappings");
986    }
987
988    let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
989        Some(initialize_archiver(
990            &segment_headers_store,
991            &subspace_link,
992            client.as_ref(),
993            create_object_mappings,
994        )?)
995    } else {
996        None
997    };
998
999    // Subscribing synchronously before returning
1000    let mut block_importing_notification_stream = subspace_link
1001        .block_importing_notification_stream
1002        .subscribe();
1003
1004    Ok(async move {
1005        let archiver = match maybe_archiver {
1006            Some(archiver) => archiver,
1007            None => initialize_archiver(
1008                &segment_headers_store,
1009                &subspace_link,
1010                client.as_ref(),
1011                create_object_mappings,
1012            )?,
1013        };
1014        let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k().into();
1015
1016        let InitializedArchiver {
1017            mut archiver,
1018            best_archived_block,
1019        } = archiver;
1020        let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block;
1021
1022        while let Some(block_importing_notification) =
1023            block_importing_notification_stream.next().await
1024        {
1025            let importing_block_number = block_importing_notification.block_number;
1026            let block_number_to_archive =
1027                match importing_block_number.checked_sub(&confirmation_depth_k) {
1028                    Some(block_number_to_archive) => block_number_to_archive,
1029                    None => {
1030                        // Too early to archive blocks
1031                        continue;
1032                    }
1033                };
1034
1035            let last_archived_block_number = segment_headers_store
1036                .last_segment_header()
1037                .expect("Exists after archiver initialization; qed")
1038                .last_archived_block()
1039                .number;
1040            let create_mappings =
1041                create_object_mappings.is_enabled_for_block(last_archived_block_number);
1042            let last_archived_block_number = NumberFor::<Block>::from(last_archived_block_number);
1043            trace!(
1044                %importing_block_number,
1045                %block_number_to_archive,
1046                %best_archived_block_number,
1047                %last_archived_block_number,
1048                "Checking if block needs to be skipped"
1049            );
1050
1051            // Skip archived blocks, unless we're producing object mappings for them
1052            let skip_last_archived_blocks =
1053                last_archived_block_number > block_number_to_archive && !create_mappings;
1054            if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
1055                // This block was already archived, skip
1056                debug!(
1057                    %importing_block_number,
1058                    %block_number_to_archive,
1059                    %best_archived_block_number,
1060                    %last_archived_block_number,
1061                    "Skipping already archived block",
1062                );
1063                continue;
1064            }
1065
1066            // In case there was a block gap re-initialize archiver and continue with current
1067            // block number (rather than block number at some depth) to allow for special sync
1068            // modes where pre-verified blocks are inserted at some point in the future comparing to
1069            // previously existing blocks
1070            if best_archived_block_number + One::one() != block_number_to_archive {
1071                InitializedArchiver {
1072                    archiver,
1073                    best_archived_block: (best_archived_block_hash, best_archived_block_number),
1074                } = initialize_archiver(
1075                    &segment_headers_store,
1076                    &subspace_link,
1077                    client.as_ref(),
1078                    create_object_mappings,
1079                )?;
1080
1081                if best_archived_block_number + One::one() == block_number_to_archive {
1082                    // As expected, can archive this block
1083                } else if best_archived_block_number >= block_number_to_archive {
1084                    // Special sync mode where verified blocks were inserted into blockchain
1085                    // directly, archiving of this block will naturally happen later
1086                    continue;
1087                } else if client
1088                    .block_hash(importing_block_number - One::one())?
1089                    .is_none()
1090                {
1091                    // We may have imported some block using special sync mode and block we're about
1092                    // to import is the first one after the gap at which archiver is supposed to be
1093                    // initialized, but we are only about to import it, so wait for the next block
1094                    // for now
1095                    continue;
1096                } else {
1097                    let error = format!(
1098                        "There was a gap in blockchain history and the last contiguous series of \
1099                        blocks starting with doesn't start with archived segment (best archived \
1100                        block number {best_archived_block_number}, block number to archive \
1101                        {block_number_to_archive}), block about to be imported \
1102                        {importing_block_number}), archiver can't continue",
1103                    );
1104                    return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1105                        error.into(),
1106                    )));
1107                }
1108            }
1109
1110            let max_segment_index_before = segment_headers_store.max_segment_index();
1111            (best_archived_block_hash, best_archived_block_number) = archive_block(
1112                &mut archiver,
1113                segment_headers_store.clone(),
1114                &*client,
1115                &sync_oracle,
1116                subspace_link.object_mapping_notification_sender.clone(),
1117                subspace_link.archived_segment_notification_sender.clone(),
1118                best_archived_block_hash,
1119                block_number_to_archive,
1120                create_object_mappings,
1121            )
1122            .await?;
1123
1124            let max_segment_index = segment_headers_store.max_segment_index();
1125            if max_segment_index_before != max_segment_index {
1126                let maybe_block_number_to_finalize = max_segment_index
1127                    // Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments
1128                    .and_then(|max_segment_index| {
1129                        max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
1130                    })
1131                    .and_then(|segment_index| {
1132                        segment_headers_store.get_segment_header(segment_index)
1133                    })
1134                    .map(|segment_header| segment_header.last_archived_block().number)
1135                    // Make sure not to finalize block number that does not yet exist (segment
1136                    // headers store may contain future blocks during initial sync)
1137                    .map(|block_number| block_number_to_archive.min(block_number.into()))
1138                    // Do not finalize blocks twice
1139                    .filter(|block_number| *block_number > client.info().finalized_number);
1140
1141                if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
1142                    {
1143                        let mut import_notification = client.every_import_notification_stream();
1144
1145                        // Drop notification to drop acknowledgement and allow block import to
1146                        // proceed
1147                        drop(block_importing_notification);
1148
1149                        while let Some(notification) = import_notification.next().await {
1150                            // Wait for importing block to finish importing
1151                            if notification.header.number() == &importing_block_number {
1152                                break;
1153                            }
1154                        }
1155                    }
1156
1157                    // Block is not guaranteed to be present this deep if we have only synced recent
1158                    // blocks.
1159                    if let Some(block_hash_to_finalize) =
1160                        client.block_hash(block_number_to_finalize)?
1161                    {
1162                        finalize_block(
1163                            &*client,
1164                            &*backend,
1165                            telemetry.as_ref(),
1166                            block_hash_to_finalize,
1167                            block_number_to_finalize,
1168                        );
1169                    }
1170                }
1171            }
1172        }
1173
1174        Ok(())
1175    })
1176}
1177
1178/// Tries to archive `block_number` and returns new (or old if not changed) best archived block
1179#[allow(clippy::too_many_arguments)]
1180async fn archive_block<Block, Backend, Client, AS, SO>(
1181    archiver: &mut Archiver,
1182    segment_headers_store: SegmentHeadersStore<AS>,
1183    client: &Client,
1184    sync_oracle: &SubspaceSyncOracle<SO>,
1185    object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
1186    archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
1187    best_archived_block_hash: Block::Hash,
1188    block_number_to_archive: NumberFor<Block>,
1189    create_object_mappings: CreateObjectMappings,
1190) -> sp_blockchain::Result<(Block::Hash, NumberFor<Block>)>
1191where
1192    Block: BlockT,
1193    Backend: BackendT<Block>,
1194    Client: ProvideRuntimeApi<Block>
1195        + BlockBackend<Block>
1196        + HeaderBackend<Block>
1197        + LockImportRun<Block, Backend>
1198        + Finalizer<Block, Backend>
1199        + AuxStore
1200        + Send
1201        + Sync
1202        + 'static,
1203    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
1204    AS: AuxStore + Send + Sync + 'static,
1205    SO: SyncOracle + Send + Sync + 'static,
1206{
1207    let block = client
1208        .block(
1209            client
1210                .block_hash(block_number_to_archive)?
1211                .expect("Older block by number must always exist"),
1212        )?
1213        .expect("Older block by number must always exist");
1214
1215    let parent_block_hash = *block.block.header().parent_hash();
1216    let block_hash_to_archive = block.block.hash();
1217
1218    debug!(
1219        "Archiving block {:?} ({})",
1220        block_number_to_archive, block_hash_to_archive
1221    );
1222
1223    if parent_block_hash != best_archived_block_hash {
1224        let error = format!(
1225            "Attempt to switch to a different fork beyond archiving depth, \
1226            can't do it: parent block hash {parent_block_hash}, best archived block hash {best_archived_block_hash}"
1227        );
1228        return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1229            error.into(),
1230        )));
1231    }
1232
1233    let create_mappings = create_object_mappings.is_enabled_for_block(
1234        block_number_to_archive.try_into().unwrap_or_else(|_| {
1235            unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed")
1236        }),
1237    );
1238
1239    let block_object_mappings = if create_mappings {
1240        client
1241            .runtime_api()
1242            .extract_block_object_mapping(parent_block_hash, block.block.clone())
1243            .map_err(|error| {
1244                sp_blockchain::Error::Application(
1245                    format!("Failed to retrieve block object mappings: {error}").into(),
1246                )
1247            })?
1248    } else {
1249        BlockObjectMapping::default()
1250    };
1251
1252    let encoded_block = encode_block(block);
1253    debug!(
1254        "Encoded block {} has size of {}",
1255        block_number_to_archive,
1256        bytesize::to_string(encoded_block.len() as u64, true),
1257    );
1258
1259    let block_outcome = archiver.add_block(
1260        encoded_block,
1261        block_object_mappings,
1262        !sync_oracle.is_major_syncing(),
1263    );
1264    send_object_mapping_notification(
1265        &object_mapping_notification_sender,
1266        block_outcome.object_mapping,
1267        block_number_to_archive,
1268    );
1269    for archived_segment in block_outcome.archived_segments {
1270        let segment_header = archived_segment.segment_header;
1271
1272        segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
1273
1274        send_archived_segment_notification(&archived_segment_notification_sender, archived_segment)
1275            .await;
1276    }
1277
1278    Ok((block_hash_to_archive, block_number_to_archive))
1279}
1280
1281fn send_object_mapping_notification<BlockNum>(
1282    object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
1283    object_mapping: Vec<GlobalObject>,
1284    block_number: BlockNum,
1285) where
1286    BlockNum: BlockNumberT,
1287{
1288    if object_mapping.is_empty() {
1289        return;
1290    }
1291
1292    let block_number = TryInto::<BlockNumber>::try_into(block_number).unwrap_or_else(|_| {
1293        unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
1294    });
1295
1296    let object_mapping_notification = ObjectMappingNotification {
1297        object_mapping,
1298        block_number,
1299    };
1300
1301    object_mapping_notification_sender.notify(move || object_mapping_notification);
1302}
1303
1304async fn send_archived_segment_notification(
1305    archived_segment_notification_sender: &SubspaceNotificationSender<ArchivedSegmentNotification>,
1306    archived_segment: NewArchivedSegment,
1307) {
1308    let segment_index = archived_segment.segment_header.segment_index();
1309    let (acknowledgement_sender, mut acknowledgement_receiver) =
1310        tracing_unbounded::<()>("subspace_acknowledgement", 1000);
1311    // Keep `archived_segment` around until all acknowledgements are received since some receivers
1312    // might use weak references
1313    let archived_segment = Arc::new(archived_segment);
1314    let archived_segment_notification = ArchivedSegmentNotification {
1315        archived_segment: Arc::clone(&archived_segment),
1316        acknowledgement_sender,
1317    };
1318
1319    archived_segment_notification_sender.notify(move || archived_segment_notification);
1320
1321    let wait_fut = async {
1322        while acknowledgement_receiver.next().await.is_some() {
1323            debug!(
1324                "Archived segment notification acknowledged: {}",
1325                segment_index
1326            );
1327        }
1328    };
1329
1330    if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
1331        .await
1332        .is_err()
1333    {
1334        warn!(
1335            "Archived segment notification was not acknowledged and reached timeout, continue \
1336            regardless"
1337        );
1338    }
1339}