1#[cfg(test)]
34mod tests;
35
36use crate::slot_worker::SubspaceSyncOracle;
37use crate::{SubspaceLink, SubspaceNotificationSender};
38use futures::StreamExt;
39use parity_scale_codec::{Decode, Encode};
40use parking_lot::RwLock;
41use rand::prelude::*;
42use rand_chacha::ChaCha8Rng;
43use rayon::ThreadPoolBuilder;
44use rayon::prelude::*;
45use sc_client_api::{
46    AuxStore, Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, LockImportRun,
47};
48use sc_telemetry::{CONSENSUS_INFO, TelemetryHandle, telemetry};
49use sc_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
50use sp_api::ProvideRuntimeApi;
51use sp_blockchain::HeaderBackend;
52use sp_consensus::SyncOracle;
53use sp_consensus_subspace::{SubspaceApi, SubspaceJustification};
54use sp_objects::ObjectsApi;
55use sp_runtime::Justifications;
56use sp_runtime::generic::SignedBlock;
57use sp_runtime::traits::{
58    Block as BlockT, BlockNumber as BlockNumberT, CheckedSub, Header, NumberFor, One, Zero,
59};
60use std::error::Error;
61use std::future::Future;
62use std::num::NonZeroU32;
63use std::slice;
64use std::sync::Arc;
65use std::sync::atomic::{AtomicU16, Ordering};
66use std::time::Duration;
67use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
68use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject};
69use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
70use subspace_core_primitives::{BlockNumber, PublicKey};
71use subspace_erasure_coding::ErasureCoding;
72use subspace_kzg::Kzg;
73use tracing::{debug, info, trace, warn};
74
75const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 6;
78const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
80
81pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
90
91#[derive(Debug)]
92struct SegmentHeadersStoreInner<AS> {
93    aux_store: Arc<AS>,
94    next_key_index: AtomicU16,
95    cache: RwLock<Vec<SegmentHeader>>,
97}
98
99#[derive(Debug)]
110pub struct SegmentHeadersStore<AS> {
111    inner: Arc<SegmentHeadersStoreInner<AS>>,
112    confirmation_depth_k: BlockNumber,
113}
114
115impl<AS> Clone for SegmentHeadersStore<AS> {
116    fn clone(&self) -> Self {
117        Self {
118            inner: Arc::clone(&self.inner),
119            confirmation_depth_k: self.confirmation_depth_k,
120        }
121    }
122}
123
124impl<AS> SegmentHeadersStore<AS>
125where
126    AS: AuxStore,
127{
128    const KEY_PREFIX: &'static [u8] = b"segment-headers";
129    const INITIAL_CACHE_CAPACITY: usize = 1_000;
130
131    pub fn new(
133        aux_store: Arc<AS>,
134        confirmation_depth_k: BlockNumber,
135    ) -> sp_blockchain::Result<Self> {
136        let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
137
138        debug!("Started loading segment headers into cache");
139        let mut next_key_index = 0;
143        while let Some(segment_headers) =
144            aux_store
145                .get_aux(&Self::key(next_key_index))?
146                .map(|segment_header| {
147                    Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
148                        .expect("Always correct segment header unless DB is corrupted; qed")
149                })
150        {
151            cache.extend(segment_headers);
152            next_key_index += 1;
153        }
154        debug!("Finished loading segment headers into cache");
155
156        Ok(Self {
157            inner: Arc::new(SegmentHeadersStoreInner {
158                aux_store,
159                next_key_index: AtomicU16::new(next_key_index),
160                cache: RwLock::new(cache),
161            }),
162            confirmation_depth_k,
163        })
164    }
165
166    pub fn last_segment_header(&self) -> Option<SegmentHeader> {
168        self.inner.cache.read().last().cloned()
169    }
170
171    pub fn max_segment_index(&self) -> Option<SegmentIndex> {
173        let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
174        Some(SegmentIndex::from(segment_index))
175    }
176
177    pub fn add_segment_headers(
181        &self,
182        segment_headers: &[SegmentHeader],
183    ) -> sp_blockchain::Result<()> {
184        let mut maybe_last_segment_index = self.max_segment_index();
185        let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
186        for segment_header in segment_headers {
189            let segment_index = segment_header.segment_index();
190            match maybe_last_segment_index {
191                Some(last_segment_index) => {
192                    if segment_index <= last_segment_index {
193                        continue;
195                    }
196
197                    if segment_index != last_segment_index + SegmentIndex::ONE {
198                        let error = format!(
199                            "Segment index {segment_index} must strictly follow {last_segment_index}, can't store segment header"
200                        );
201                        return Err(sp_blockchain::Error::Application(error.into()));
202                    }
203
204                    segment_headers_to_store.push(segment_header);
205                    maybe_last_segment_index.replace(segment_index);
206                }
207                None => {
208                    if segment_index != SegmentIndex::ZERO {
209                        let error = format!(
210                            "First segment header index must be zero, found index {segment_index}"
211                        );
212                        return Err(sp_blockchain::Error::Application(error.into()));
213                    }
214
215                    segment_headers_to_store.push(segment_header);
216                    maybe_last_segment_index.replace(segment_index);
217                }
218            }
219        }
220
221        if segment_headers_to_store.is_empty() {
222            return Ok(());
223        }
224
225        {
229            let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
230            let key = Self::key(key_index);
231            let value = segment_headers_to_store.encode();
232            let insert_data = vec![(key.as_slice(), value.as_slice())];
233
234            self.inner.aux_store.insert_aux(&insert_data, &[])?;
235        }
236        self.inner.cache.write().extend(segment_headers_to_store);
237
238        Ok(())
239    }
240
241    pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
243        self.inner
244            .cache
245            .read()
246            .get(u64::from(segment_index) as usize)
247            .copied()
248    }
249
250    fn key(key_index: u16) -> Vec<u8> {
251        (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
252    }
253
254    pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
256        let Some(last_segment_index) = self.max_segment_index() else {
257            return Vec::new();
259        };
260
261        if block_number == 1 {
263            return vec![
266                self.get_segment_header(SegmentIndex::ZERO)
267                    .expect("Segment headers are stored in monotonically increasing order; qed"),
268            ];
269        }
270
271        if last_segment_index == SegmentIndex::ZERO {
272            return Vec::new();
274        }
275
276        let mut current_segment_index = last_segment_index;
277        loop {
278            let current_segment_header = self
281                .get_segment_header(current_segment_index)
282                .expect("Segment headers are stored in monotonically increasing order; qed");
283
284            let target_block_number =
286                current_segment_header.last_archived_block().number + 1 + self.confirmation_depth_k;
287            if target_block_number == block_number {
288                let mut headers_for_block = vec![current_segment_header];
289
290                let last_archived_block_number =
292                    current_segment_header.last_archived_block().number;
293                let mut segment_index = current_segment_index - SegmentIndex::ONE;
294
295                while let Some(segment_header) = self.get_segment_header(segment_index) {
296                    if segment_header.last_archived_block().number == last_archived_block_number {
297                        headers_for_block.insert(0, segment_header);
298                        segment_index -= SegmentIndex::ONE;
299                    } else {
300                        break;
301                    }
302                }
303
304                return headers_for_block;
305            }
306
307            if target_block_number > block_number {
309                if current_segment_index > SegmentIndex::ONE {
311                    current_segment_index -= SegmentIndex::ONE
312                } else {
313                    break;
314                }
315            } else {
316                return Vec::new();
318            }
319        }
320
321        Vec::new()
323    }
324}
325
326#[derive(Debug, Clone)]
328pub struct ArchivedSegmentNotification {
329    pub archived_segment: Arc<NewArchivedSegment>,
331    pub acknowledgement_sender: TracingUnboundedSender<()>,
335}
336
337#[derive(Debug, Clone)]
340pub struct ObjectMappingNotification {
341    pub object_mapping: Vec<GlobalObject>,
345    pub block_number: BlockNumber,
347    }
349
350#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
352pub enum CreateObjectMappings {
353    Block(NonZeroU32),
362
363    Yes,
365
366    #[default]
368    No,
369}
370
371impl CreateObjectMappings {
372    fn block(&self) -> Option<BlockNumber> {
375        match self {
376            CreateObjectMappings::Block(block) => Some(block.get()),
377            CreateObjectMappings::Yes => None,
378            CreateObjectMappings::No => None,
379        }
380    }
381
382    pub fn is_enabled(&self) -> bool {
384        !matches!(self, CreateObjectMappings::No)
385    }
386
387    pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
389        if !self.is_enabled() {
390            return false;
391        }
392
393        if let Some(target_block) = self.block() {
394            return block >= target_block;
395        }
396
397        true
399    }
400}
401
402fn find_last_archived_block<Block, Client, AS>(
403    client: &Client,
404    segment_headers_store: &SegmentHeadersStore<AS>,
405    best_block_to_archive: NumberFor<Block>,
406    create_object_mappings: bool,
407) -> sp_blockchain::Result<Option<(SegmentHeader, SignedBlock<Block>, BlockObjectMapping)>>
408where
409    Block: BlockT,
410    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
411    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
412    AS: AuxStore,
413{
414    let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
415        return Ok(None);
416    };
417
418    if max_segment_index == SegmentIndex::ZERO {
419        return Ok(None);
421    }
422
423    for segment_header in (SegmentIndex::ZERO..=max_segment_index)
424        .rev()
425        .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
426    {
427        let last_archived_block_number = segment_header.last_archived_block().number;
428
429        if NumberFor::<Block>::from(last_archived_block_number) > best_block_to_archive {
430            continue;
434        }
435        let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
436            continue;
439        };
440
441        let Some(last_archived_block) = client.block(last_archived_block_hash)? else {
442            continue;
444        };
445
446        let block_object_mappings = if create_object_mappings {
448            client
449                .runtime_api()
450                .extract_block_object_mapping(
451                    *last_archived_block.block.header().parent_hash(),
452                    last_archived_block.block.clone(),
453                )
454                .unwrap_or_default()
455        } else {
456            BlockObjectMapping::default()
457        };
458
459        return Ok(Some((
460            segment_header,
461            last_archived_block,
462            block_object_mappings,
463        )));
464    }
465
466    Ok(None)
467}
468
469pub fn recreate_genesis_segment<Block, Client>(
471    client: &Client,
472    kzg: Kzg,
473    erasure_coding: ErasureCoding,
474) -> Result<Option<NewArchivedSegment>, Box<dyn Error>>
475where
476    Block: BlockT,
477    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
478    Client::Api: ObjectsApi<Block>,
479{
480    let genesis_hash = client.info().genesis_hash;
481    let Some(signed_block) = client.block(genesis_hash)? else {
482        return Ok(None);
483    };
484
485    let encoded_block = encode_block(signed_block);
486
487    let block_outcome = Archiver::new(kzg, erasure_coding).add_block(
489        encoded_block,
490        BlockObjectMapping::default(),
491        false,
492    );
493    let new_archived_segment = block_outcome
494        .archived_segments
495        .into_iter()
496        .next()
497        .expect("Genesis block always results in exactly one archived segment; qed");
498
499    Ok(Some(new_archived_segment))
500}
501
502struct InitializedArchiver<Block>
503where
504    Block: BlockT,
505{
506    archiver: Archiver,
507    best_archived_block: (Block::Hash, NumberFor<Block>),
508}
509
510pub fn encode_block<Block>(mut signed_block: SignedBlock<Block>) -> Vec<u8>
515where
516    Block: BlockT,
517{
518    if signed_block.block.header().number().is_zero() {
519        let mut encoded_block = signed_block.encode();
520
521        let encoded_block_length = encoded_block.len();
522
523        encoded_block.resize(RecordedHistorySegment::SIZE, 0);
530        let mut rng = ChaCha8Rng::from_seed(
531            signed_block
532                .block
533                .header()
534                .state_root()
535                .as_ref()
536                .try_into()
537                .expect("State root in Subspace must be 32 bytes, panic otherwise; qed"),
538        );
539        rng.fill(&mut encoded_block[encoded_block_length..]);
540
541        encoded_block
542    } else {
543        if let Some(justifications) = signed_block.justifications.take() {
545            let mut filtered_justifications = justifications.into_iter().filter(|justification| {
546                let Some(subspace_justification) =
548                    SubspaceJustification::try_from_justification(justification)
549                        .and_then(|subspace_justification| subspace_justification.ok())
550                else {
551                    return false;
552                };
553
554                subspace_justification.must_be_archived()
555            });
556
557            if let Some(first_justification) = filtered_justifications.next() {
558                let mut justifications = Justifications::from(first_justification);
559
560                for justification in filtered_justifications {
561                    justifications.append(justification);
562                }
563
564                signed_block.justifications = Some(justifications);
565            }
566        }
567
568        signed_block.encode()
569    }
570}
571
572pub fn decode_block<Block>(
574    mut encoded_block: &[u8],
575) -> Result<SignedBlock<Block>, parity_scale_codec::Error>
576where
577    Block: BlockT,
578{
579    SignedBlock::<Block>::decode(&mut encoded_block)
580}
581
582fn initialize_archiver<Block, Client, AS>(
583    segment_headers_store: &SegmentHeadersStore<AS>,
584    subspace_link: &SubspaceLink<Block>,
585    client: &Client,
586    create_object_mappings: CreateObjectMappings,
587) -> sp_blockchain::Result<InitializedArchiver<Block>>
588where
589    Block: BlockT,
590    Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
591    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
592    AS: AuxStore,
593{
594    let client_info = client.info();
595    let best_block_number = TryInto::<BlockNumber>::try_into(client_info.best_number)
596        .unwrap_or_else(|_| {
597            unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
598        });
599
600    let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k();
601
602    let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
603    if let Some(block_number) = create_object_mappings.block() {
608        best_block_to_archive = best_block_to_archive.min(block_number);
611    }
612
613    if (best_block_to_archive..best_block_number)
614        .any(|block_number| client.hash(block_number.into()).ok().flatten().is_none())
615    {
616        best_block_to_archive = best_block_number;
620    }
621
622    if create_object_mappings.is_enabled() && best_block_to_archive >= 1 {
626        let Some(best_block_to_archive_hash) = client.hash(best_block_to_archive.into())? else {
627            let error = format!(
628                "Missing hash for mapping block {best_block_to_archive}, \
629                try a higher block number, or wipe your node and restart with `--sync full`"
630            );
631            return Err(sp_blockchain::Error::Application(error.into()));
632        };
633
634        let Some(best_block_data) = client.block(best_block_to_archive_hash)? else {
635            let error = format!(
636                "Missing data for mapping block {best_block_to_archive} \
637                hash {best_block_to_archive_hash}, \
638                try a higher block number, or wipe your node and restart with `--sync full`"
639            );
640            return Err(sp_blockchain::Error::Application(error.into()));
641        };
642
643        client
645            .runtime_api()
646            .extract_block_object_mapping(
647                *best_block_data.block.header().parent_hash(),
648                best_block_data.block.clone(),
649            )
650            .map_err(|error| {
651                sp_blockchain::Error::Application(
652                    format!(
653                        "Missing state for mapping block {best_block_to_archive} \
654                        hash {best_block_to_archive_hash}: {error}, \
655                        try a higher block number, or wipe your node and restart with `--sync full`"
656                    )
657                    .into(),
658                )
659            })?;
660    }
661
662    let maybe_last_archived_block = find_last_archived_block(
663        client,
664        segment_headers_store,
665        best_block_to_archive.into(),
666        create_object_mappings.is_enabled(),
667    )?;
668
669    let have_last_segment_header = maybe_last_archived_block.is_some();
670    let mut best_archived_block = None;
671
672    let mut archiver =
673        if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
674            maybe_last_archived_block
675        {
676            let last_archived_block_number = last_segment_header.last_archived_block().number;
678            info!(
679                %last_archived_block_number,
680                "Resuming archiver from last archived block",
681            );
682
683            best_archived_block.replace((
686                last_archived_block.block.hash(),
687                *last_archived_block.block.header().number(),
688            ));
689
690            let last_archived_block_encoded = encode_block(last_archived_block);
691
692            Archiver::with_initial_state(
693                subspace_link.kzg().clone(),
694                subspace_link.erasure_coding().clone(),
695                last_segment_header,
696                &last_archived_block_encoded,
697                block_object_mappings,
698            )
699            .map_err(|error| {
700                sp_blockchain::Error::Application(
701                    format!("Incorrect parameters for archiver: {error:?} {last_segment_header:?}")
702                        .into(),
703                )
704            })?
705        } else {
706            info!("Starting archiving from genesis");
707
708            Archiver::new(
709                subspace_link.kzg().clone(),
710                subspace_link.erasure_coding().clone(),
711            )
712        };
713
714    {
716        let blocks_to_archive_from = archiver
717            .last_archived_block_number()
718            .map(|n| n + 1)
719            .unwrap_or_default();
720        let blocks_to_archive_to = best_block_number
721            .checked_sub(confirmation_depth_k)
722            .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
723            .or({
724                if have_last_segment_header {
725                    None
726                } else {
727                    Some(0)
729                }
730            });
731
732        if let Some(blocks_to_archive_to) = blocks_to_archive_to {
733            info!(
734                "Archiving already produced blocks {}..={}",
735                blocks_to_archive_from, blocks_to_archive_to,
736            );
737
738            let thread_pool = ThreadPoolBuilder::new()
739                .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
740                .build()
741                .map_err(|error| {
742                    sp_blockchain::Error::Backend(format!(
743                        "Failed to create thread pool for archiver initialization: {error}"
744                    ))
745                })?;
746            let blocks_to_archive = thread_pool.install(|| {
748                (blocks_to_archive_from..=blocks_to_archive_to)
749                    .into_par_iter()
750                    .map_init(
751                        || client.runtime_api(),
752                        |runtime_api, block_number| {
753                            let block_hash = client
754                                .hash(block_number.into())?
755                                .expect("All blocks since last archived must be present; qed");
756
757                            let block = client
758                                .block(block_hash)?
759                                .expect("All blocks since last archived must be present; qed");
760
761                            let block_object_mappings =
762                                if create_object_mappings.is_enabled_for_block(block_number) {
763                                    runtime_api
764                                        .extract_block_object_mapping(
765                                            *block.block.header().parent_hash(),
766                                            block.block.clone(),
767                                        )
768                                        .unwrap_or_default()
769                                } else {
770                                    BlockObjectMapping::default()
771                                };
772
773                            Ok((block, block_object_mappings))
774                        },
775                    )
776                    .collect::<sp_blockchain::Result<Vec<(SignedBlock<_>, _)>>>()
777            })?;
778
779            best_archived_block =
780                blocks_to_archive
781                    .last()
782                    .map(|(block, _block_object_mappings)| {
783                        (block.block.hash(), *block.block.header().number())
784                    });
785
786            for (signed_block, block_object_mappings) in blocks_to_archive {
787                let block_number_to_archive = *signed_block.block.header().number();
788                let encoded_block = encode_block(signed_block);
789
790                debug!(
791                    "Encoded block {} has size of {}",
792                    block_number_to_archive,
793                    bytesize::to_string(encoded_block.len() as u64, true),
794                );
795
796                let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false);
797                send_object_mapping_notification(
798                    &subspace_link.object_mapping_notification_sender,
799                    block_outcome.object_mapping,
800                    block_number_to_archive,
801                );
802                let new_segment_headers: Vec<SegmentHeader> = block_outcome
803                    .archived_segments
804                    .iter()
805                    .map(|archived_segment| archived_segment.segment_header)
806                    .collect();
807
808                if !new_segment_headers.is_empty() {
809                    segment_headers_store.add_segment_headers(&new_segment_headers)?;
810                }
811            }
812        }
813    }
814
815    Ok(InitializedArchiver {
816        archiver,
817        best_archived_block: best_archived_block
818            .expect("Must always set if there is no logical error; qed"),
819    })
820}
821
822fn finalize_block<Block, Backend, Client>(
823    client: &Client,
824    telemetry: Option<&TelemetryHandle>,
825    hash: Block::Hash,
826    number: NumberFor<Block>,
827) where
828    Block: BlockT,
829    Backend: BackendT<Block>,
830    Client: LockImportRun<Block, Backend> + Finalizer<Block, Backend>,
831{
832    if number.is_zero() {
833        return;
835    }
836    let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
839        client
842            .apply_finality(import_op, hash, None, true)
843            .map_err(|error| {
844                warn!(
845                    "Error applying finality to block {:?}: {}",
846                    (hash, number),
847                    error
848                );
849                error
850            })?;
851
852        debug!("Finalizing blocks up to ({:?}, {})", number, hash);
853
854        telemetry!(
855            telemetry;
856            CONSENSUS_INFO;
857            "subspace.finalized_blocks_up_to";
858            "number" => ?number, "hash" => ?hash,
859        );
860
861        Ok(())
862    });
863}
864
865pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
895    segment_headers_store: SegmentHeadersStore<AS>,
896    subspace_link: SubspaceLink<Block>,
897    client: Arc<Client>,
898    sync_oracle: SubspaceSyncOracle<SO>,
899    telemetry: Option<TelemetryHandle>,
900    create_object_mappings: CreateObjectMappings,
901) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
902where
903    Block: BlockT,
904    Backend: BackendT<Block>,
905    Client: ProvideRuntimeApi<Block>
906        + BlockBackend<Block>
907        + HeaderBackend<Block>
908        + LockImportRun<Block, Backend>
909        + Finalizer<Block, Backend>
910        + BlockchainEvents<Block>
911        + AuxStore
912        + Send
913        + Sync
914        + 'static,
915    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
916    AS: AuxStore + Send + Sync + 'static,
917    SO: SyncOracle + Send + Sync + 'static,
918{
919    if create_object_mappings.is_enabled() {
920        info!(
921            ?create_object_mappings,
922            "Creating object mappings from the configured block onwards"
923        );
924    } else {
925        info!("Not creating object mappings");
926    }
927
928    let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
929        Some(initialize_archiver(
930            &segment_headers_store,
931            &subspace_link,
932            client.as_ref(),
933            create_object_mappings,
934        )?)
935    } else {
936        None
937    };
938
939    let mut block_importing_notification_stream = subspace_link
941        .block_importing_notification_stream
942        .subscribe();
943
944    Ok(async move {
945        let archiver = match maybe_archiver {
946            Some(archiver) => archiver,
947            None => initialize_archiver(
948                &segment_headers_store,
949                &subspace_link,
950                client.as_ref(),
951                create_object_mappings,
952            )?,
953        };
954        let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k().into();
955
956        let InitializedArchiver {
957            mut archiver,
958            best_archived_block,
959        } = archiver;
960        let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block;
961
962        while let Some(block_importing_notification) =
963            block_importing_notification_stream.next().await
964        {
965            let importing_block_number = block_importing_notification.block_number;
966            let block_number_to_archive =
967                match importing_block_number.checked_sub(&confirmation_depth_k) {
968                    Some(block_number_to_archive) => block_number_to_archive,
969                    None => {
970                        continue;
972                    }
973                };
974
975            let last_archived_block_number = segment_headers_store
976                .last_segment_header()
977                .expect("Exists after archiver initialization; qed")
978                .last_archived_block()
979                .number;
980            let create_mappings =
981                create_object_mappings.is_enabled_for_block(last_archived_block_number);
982            let last_archived_block_number = NumberFor::<Block>::from(last_archived_block_number);
983            trace!(
984                %importing_block_number,
985                %block_number_to_archive,
986                %best_archived_block_number,
987                %last_archived_block_number,
988                "Checking if block needs to be skipped"
989            );
990
991            let skip_last_archived_blocks =
993                last_archived_block_number > block_number_to_archive && !create_mappings;
994            if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
995                debug!(
997                    %importing_block_number,
998                    %block_number_to_archive,
999                    %best_archived_block_number,
1000                    %last_archived_block_number,
1001                    "Skipping already archived block",
1002                );
1003                continue;
1004            }
1005
1006            if best_archived_block_number + One::one() != block_number_to_archive {
1011                InitializedArchiver {
1012                    archiver,
1013                    best_archived_block: (best_archived_block_hash, best_archived_block_number),
1014                } = initialize_archiver(
1015                    &segment_headers_store,
1016                    &subspace_link,
1017                    client.as_ref(),
1018                    create_object_mappings,
1019                )?;
1020
1021                if best_archived_block_number + One::one() == block_number_to_archive {
1022                    } else if best_archived_block_number >= block_number_to_archive {
1024                    continue;
1027                } else if client
1028                    .block_hash(importing_block_number - One::one())?
1029                    .is_none()
1030                {
1031                    continue;
1036                } else {
1037                    let error = format!(
1038                        "There was a gap in blockchain history and the last contiguous series of \
1039                        blocks starting with doesn't start with archived segment (best archived \
1040                        block number {best_archived_block_number}, block number to archive \
1041                        {block_number_to_archive}), block about to be imported \
1042                        {importing_block_number}), archiver can't continue",
1043                    );
1044                    return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1045                        error.into(),
1046                    )));
1047                }
1048            }
1049
1050            let max_segment_index_before = segment_headers_store.max_segment_index();
1051            (best_archived_block_hash, best_archived_block_number) = archive_block(
1052                &mut archiver,
1053                segment_headers_store.clone(),
1054                &*client,
1055                &sync_oracle,
1056                subspace_link.object_mapping_notification_sender.clone(),
1057                subspace_link.archived_segment_notification_sender.clone(),
1058                best_archived_block_hash,
1059                block_number_to_archive,
1060                create_object_mappings,
1061            )
1062            .await?;
1063
1064            let max_segment_index = segment_headers_store.max_segment_index();
1065            if max_segment_index_before != max_segment_index {
1066                let maybe_block_number_to_finalize = max_segment_index
1067                    .and_then(|max_segment_index| {
1069                        max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
1070                    })
1071                    .and_then(|segment_index| {
1072                        segment_headers_store.get_segment_header(segment_index)
1073                    })
1074                    .map(|segment_header| segment_header.last_archived_block().number)
1075                    .map(|block_number| block_number_to_archive.min(block_number.into()))
1078                    .filter(|block_number| *block_number > client.info().finalized_number);
1080
1081                if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
1082                    {
1083                        let mut import_notification = client.every_import_notification_stream();
1084
1085                        drop(block_importing_notification);
1088
1089                        while let Some(notification) = import_notification.next().await {
1090                            if notification.header.number() == &importing_block_number {
1092                                break;
1093                            }
1094                        }
1095                    }
1096
1097                    if let Some(block_hash_to_finalize) =
1100                        client.block_hash(block_number_to_finalize)?
1101                    {
1102                        finalize_block(
1103                            &*client,
1104                            telemetry.as_ref(),
1105                            block_hash_to_finalize,
1106                            block_number_to_finalize,
1107                        );
1108                    }
1109                }
1110            }
1111        }
1112
1113        Ok(())
1114    })
1115}
1116
1117#[allow(clippy::too_many_arguments)]
1119async fn archive_block<Block, Backend, Client, AS, SO>(
1120    archiver: &mut Archiver,
1121    segment_headers_store: SegmentHeadersStore<AS>,
1122    client: &Client,
1123    sync_oracle: &SubspaceSyncOracle<SO>,
1124    object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
1125    archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
1126    best_archived_block_hash: Block::Hash,
1127    block_number_to_archive: NumberFor<Block>,
1128    create_object_mappings: CreateObjectMappings,
1129) -> sp_blockchain::Result<(Block::Hash, NumberFor<Block>)>
1130where
1131    Block: BlockT,
1132    Backend: BackendT<Block>,
1133    Client: ProvideRuntimeApi<Block>
1134        + BlockBackend<Block>
1135        + HeaderBackend<Block>
1136        + LockImportRun<Block, Backend>
1137        + Finalizer<Block, Backend>
1138        + AuxStore
1139        + Send
1140        + Sync
1141        + 'static,
1142    Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
1143    AS: AuxStore + Send + Sync + 'static,
1144    SO: SyncOracle + Send + Sync + 'static,
1145{
1146    let block = client
1147        .block(
1148            client
1149                .block_hash(block_number_to_archive)?
1150                .expect("Older block by number must always exist"),
1151        )?
1152        .expect("Older block by number must always exist");
1153
1154    let parent_block_hash = *block.block.header().parent_hash();
1155    let block_hash_to_archive = block.block.hash();
1156
1157    debug!(
1158        "Archiving block {:?} ({})",
1159        block_number_to_archive, block_hash_to_archive
1160    );
1161
1162    if parent_block_hash != best_archived_block_hash {
1163        let error = format!(
1164            "Attempt to switch to a different fork beyond archiving depth, \
1165            can't do it: parent block hash {parent_block_hash}, best archived block hash {best_archived_block_hash}"
1166        );
1167        return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1168            error.into(),
1169        )));
1170    }
1171
1172    let create_mappings = create_object_mappings.is_enabled_for_block(
1173        block_number_to_archive.try_into().unwrap_or_else(|_| {
1174            unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed")
1175        }),
1176    );
1177
1178    let block_object_mappings = if create_mappings {
1179        client
1180            .runtime_api()
1181            .extract_block_object_mapping(parent_block_hash, block.block.clone())
1182            .map_err(|error| {
1183                sp_blockchain::Error::Application(
1184                    format!("Failed to retrieve block object mappings: {error}").into(),
1185                )
1186            })?
1187    } else {
1188        BlockObjectMapping::default()
1189    };
1190
1191    let encoded_block = encode_block(block);
1192    debug!(
1193        "Encoded block {} has size of {}",
1194        block_number_to_archive,
1195        bytesize::to_string(encoded_block.len() as u64, true),
1196    );
1197
1198    let block_outcome = archiver.add_block(
1199        encoded_block,
1200        block_object_mappings,
1201        !sync_oracle.is_major_syncing(),
1202    );
1203    send_object_mapping_notification(
1204        &object_mapping_notification_sender,
1205        block_outcome.object_mapping,
1206        block_number_to_archive,
1207    );
1208    for archived_segment in block_outcome.archived_segments {
1209        let segment_header = archived_segment.segment_header;
1210
1211        segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
1212
1213        send_archived_segment_notification(&archived_segment_notification_sender, archived_segment)
1214            .await;
1215    }
1216
1217    Ok((block_hash_to_archive, block_number_to_archive))
1218}
1219
1220fn send_object_mapping_notification<BlockNum>(
1221    object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
1222    object_mapping: Vec<GlobalObject>,
1223    block_number: BlockNum,
1224) where
1225    BlockNum: BlockNumberT,
1226{
1227    if object_mapping.is_empty() {
1228        return;
1229    }
1230
1231    let block_number = TryInto::<BlockNumber>::try_into(block_number).unwrap_or_else(|_| {
1232        unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
1233    });
1234
1235    let object_mapping_notification = ObjectMappingNotification {
1236        object_mapping,
1237        block_number,
1238    };
1239
1240    object_mapping_notification_sender.notify(move || object_mapping_notification);
1241}
1242
1243async fn send_archived_segment_notification(
1244    archived_segment_notification_sender: &SubspaceNotificationSender<ArchivedSegmentNotification>,
1245    archived_segment: NewArchivedSegment,
1246) {
1247    let segment_index = archived_segment.segment_header.segment_index();
1248    let (acknowledgement_sender, mut acknowledgement_receiver) =
1249        tracing_unbounded::<()>("subspace_acknowledgement", 1000);
1250    let archived_segment = Arc::new(archived_segment);
1253    let archived_segment_notification = ArchivedSegmentNotification {
1254        archived_segment: Arc::clone(&archived_segment),
1255        acknowledgement_sender,
1256    };
1257
1258    archived_segment_notification_sender.notify(move || archived_segment_notification);
1259
1260    let wait_fut = async {
1261        while acknowledgement_receiver.next().await.is_some() {
1262            debug!(
1263                "Archived segment notification acknowledged: {}",
1264                segment_index
1265            );
1266        }
1267    };
1268
1269    if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
1270        .await
1271        .is_err()
1272    {
1273        warn!(
1274            "Archived segment notification was not acknowledged and reached timeout, continue \
1275            regardless"
1276        );
1277    }
1278}