1#[cfg(test)]
34mod tests;
35
36use crate::slot_worker::SubspaceSyncOracle;
37use crate::{SubspaceLink, SubspaceNotificationSender};
38use futures::StreamExt;
39use parity_scale_codec::{Decode, Encode};
40use parking_lot::RwLock;
41use rand::prelude::*;
42use rand_chacha::ChaCha8Rng;
43use rayon::prelude::*;
44use rayon::ThreadPoolBuilder;
45use sc_client_api::{
46 AuxStore, Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, LockImportRun,
47};
48use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
49use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
50use sp_api::ProvideRuntimeApi;
51use sp_blockchain::HeaderBackend;
52use sp_consensus::SyncOracle;
53use sp_consensus_subspace::{SubspaceApi, SubspaceJustification};
54use sp_objects::ObjectsApi;
55use sp_runtime::generic::SignedBlock;
56use sp_runtime::traits::{
57 Block as BlockT, BlockNumber as BlockNumberT, CheckedSub, Header, NumberFor, One, Zero,
58};
59use sp_runtime::Justifications;
60use std::error::Error;
61use std::future::Future;
62use std::num::NonZeroU32;
63use std::slice;
64use std::sync::atomic::{AtomicU16, Ordering};
65use std::sync::Arc;
66use std::time::Duration;
67use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
68use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject};
69use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
70use subspace_core_primitives::{BlockNumber, PublicKey};
71use subspace_erasure_coding::ErasureCoding;
72use subspace_kzg::Kzg;
73use tracing::{debug, info, trace, warn};
74
75const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 6;
78const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
80
81pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
90
91#[derive(Debug)]
92struct SegmentHeadersStoreInner<AS> {
93 aux_store: Arc<AS>,
94 next_key_index: AtomicU16,
95 cache: RwLock<Vec<SegmentHeader>>,
97}
98
99#[derive(Debug)]
110pub struct SegmentHeadersStore<AS> {
111 inner: Arc<SegmentHeadersStoreInner<AS>>,
112 confirmation_depth_k: BlockNumber,
113}
114
115impl<AS> Clone for SegmentHeadersStore<AS> {
116 fn clone(&self) -> Self {
117 Self {
118 inner: Arc::clone(&self.inner),
119 confirmation_depth_k: self.confirmation_depth_k,
120 }
121 }
122}
123
124impl<AS> SegmentHeadersStore<AS>
125where
126 AS: AuxStore,
127{
128 const KEY_PREFIX: &'static [u8] = b"segment-headers";
129 const INITIAL_CACHE_CAPACITY: usize = 1_000;
130
131 pub fn new(
133 aux_store: Arc<AS>,
134 confirmation_depth_k: BlockNumber,
135 ) -> sp_blockchain::Result<Self> {
136 let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
137
138 debug!("Started loading segment headers into cache");
139 let mut next_key_index = 0;
143 while let Some(segment_headers) =
144 aux_store
145 .get_aux(&Self::key(next_key_index))?
146 .map(|segment_header| {
147 Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
148 .expect("Always correct segment header unless DB is corrupted; qed")
149 })
150 {
151 cache.extend(segment_headers);
152 next_key_index += 1;
153 }
154 debug!("Finished loading segment headers into cache");
155
156 Ok(Self {
157 inner: Arc::new(SegmentHeadersStoreInner {
158 aux_store,
159 next_key_index: AtomicU16::new(next_key_index),
160 cache: RwLock::new(cache),
161 }),
162 confirmation_depth_k,
163 })
164 }
165
166 pub fn last_segment_header(&self) -> Option<SegmentHeader> {
168 self.inner.cache.read().last().cloned()
169 }
170
171 pub fn max_segment_index(&self) -> Option<SegmentIndex> {
173 let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
174 Some(SegmentIndex::from(segment_index))
175 }
176
177 pub fn add_segment_headers(
181 &self,
182 segment_headers: &[SegmentHeader],
183 ) -> sp_blockchain::Result<()> {
184 let mut maybe_last_segment_index = self.max_segment_index();
185 let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
186 for segment_header in segment_headers {
189 let segment_index = segment_header.segment_index();
190 match maybe_last_segment_index {
191 Some(last_segment_index) => {
192 if segment_index <= last_segment_index {
193 continue;
195 }
196
197 if segment_index != last_segment_index + SegmentIndex::ONE {
198 let error = format!(
199 "Segment index {} must strictly follow {}, can't store segment header",
200 segment_index, last_segment_index
201 );
202 return Err(sp_blockchain::Error::Application(error.into()));
203 }
204
205 segment_headers_to_store.push(segment_header);
206 maybe_last_segment_index.replace(segment_index);
207 }
208 None => {
209 if segment_index != SegmentIndex::ZERO {
210 let error = format!(
211 "First segment header index must be zero, found index {segment_index}"
212 );
213 return Err(sp_blockchain::Error::Application(error.into()));
214 }
215
216 segment_headers_to_store.push(segment_header);
217 maybe_last_segment_index.replace(segment_index);
218 }
219 }
220 }
221
222 if segment_headers_to_store.is_empty() {
223 return Ok(());
224 }
225
226 {
230 let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
231 let key = Self::key(key_index);
232 let value = segment_headers_to_store.encode();
233 let insert_data = vec![(key.as_slice(), value.as_slice())];
234
235 self.inner.aux_store.insert_aux(&insert_data, &[])?;
236 }
237 self.inner.cache.write().extend(segment_headers_to_store);
238
239 Ok(())
240 }
241
242 pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
244 self.inner
245 .cache
246 .read()
247 .get(u64::from(segment_index) as usize)
248 .copied()
249 }
250
251 fn key(key_index: u16) -> Vec<u8> {
252 (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
253 }
254
255 pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
257 let Some(last_segment_index) = self.max_segment_index() else {
258 return Vec::new();
260 };
261
262 if block_number == 1 {
264 return vec![self
267 .get_segment_header(SegmentIndex::ZERO)
268 .expect("Segment headers are stored in monotonically increasing order; qed")];
269 }
270
271 if last_segment_index == SegmentIndex::ZERO {
272 return Vec::new();
274 }
275
276 let mut current_segment_index = last_segment_index;
277 loop {
278 let current_segment_header = self
281 .get_segment_header(current_segment_index)
282 .expect("Segment headers are stored in monotonically increasing order; qed");
283
284 let target_block_number =
286 current_segment_header.last_archived_block().number + 1 + self.confirmation_depth_k;
287 if target_block_number == block_number {
288 let mut headers_for_block = vec![current_segment_header];
289
290 let last_archived_block_number =
292 current_segment_header.last_archived_block().number;
293 let mut segment_index = current_segment_index - SegmentIndex::ONE;
294
295 while let Some(segment_header) = self.get_segment_header(segment_index) {
296 if segment_header.last_archived_block().number == last_archived_block_number {
297 headers_for_block.insert(0, segment_header);
298 segment_index -= SegmentIndex::ONE;
299 } else {
300 break;
301 }
302 }
303
304 return headers_for_block;
305 }
306
307 if target_block_number > block_number {
309 if current_segment_index > SegmentIndex::ONE {
311 current_segment_index -= SegmentIndex::ONE
312 } else {
313 break;
314 }
315 } else {
316 return Vec::new();
318 }
319 }
320
321 Vec::new()
323 }
324}
325
326#[derive(Debug, Clone)]
328pub struct ArchivedSegmentNotification {
329 pub archived_segment: Arc<NewArchivedSegment>,
331 pub acknowledgement_sender: TracingUnboundedSender<()>,
335}
336
337#[derive(Debug, Clone)]
340pub struct ObjectMappingNotification {
341 pub object_mapping: Vec<GlobalObject>,
345 pub block_number: BlockNumber,
347 }
349
350#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
352pub enum CreateObjectMappings {
353 Block(NonZeroU32),
362
363 Yes,
365
366 #[default]
368 No,
369}
370
371impl CreateObjectMappings {
372 fn block(&self) -> Option<BlockNumber> {
375 match self {
376 CreateObjectMappings::Block(block) => Some(block.get()),
377 CreateObjectMappings::Yes => None,
378 CreateObjectMappings::No => None,
379 }
380 }
381
382 pub fn is_enabled(&self) -> bool {
384 !matches!(self, CreateObjectMappings::No)
385 }
386
387 pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
389 if !self.is_enabled() {
390 return false;
391 }
392
393 if let Some(target_block) = self.block() {
394 return block >= target_block;
395 }
396
397 true
399 }
400}
401
402fn find_last_archived_block<Block, Client, AS>(
403 client: &Client,
404 segment_headers_store: &SegmentHeadersStore<AS>,
405 best_block_to_archive: NumberFor<Block>,
406 create_object_mappings: bool,
407) -> sp_blockchain::Result<Option<(SegmentHeader, SignedBlock<Block>, BlockObjectMapping)>>
408where
409 Block: BlockT,
410 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
411 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
412 AS: AuxStore,
413{
414 let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
415 return Ok(None);
416 };
417
418 if max_segment_index == SegmentIndex::ZERO {
419 return Ok(None);
421 }
422
423 for segment_header in (SegmentIndex::ZERO..=max_segment_index)
424 .rev()
425 .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
426 {
427 let last_archived_block_number = segment_header.last_archived_block().number;
428
429 if NumberFor::<Block>::from(last_archived_block_number) > best_block_to_archive {
430 continue;
434 }
435 let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
436 continue;
439 };
440
441 let Some(last_archived_block) = client.block(last_archived_block_hash)? else {
442 continue;
444 };
445
446 let block_object_mappings = if create_object_mappings {
448 client
449 .runtime_api()
450 .extract_block_object_mapping(
451 *last_archived_block.block.header().parent_hash(),
452 last_archived_block.block.clone(),
453 )
454 .unwrap_or_default()
455 } else {
456 BlockObjectMapping::default()
457 };
458
459 return Ok(Some((
460 segment_header,
461 last_archived_block,
462 block_object_mappings,
463 )));
464 }
465
466 Ok(None)
467}
468
469pub fn recreate_genesis_segment<Block, Client>(
471 client: &Client,
472 kzg: Kzg,
473 erasure_coding: ErasureCoding,
474) -> Result<Option<NewArchivedSegment>, Box<dyn Error>>
475where
476 Block: BlockT,
477 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
478 Client::Api: ObjectsApi<Block>,
479{
480 let genesis_hash = client.info().genesis_hash;
481 let Some(signed_block) = client.block(genesis_hash)? else {
482 return Ok(None);
483 };
484
485 let encoded_block = encode_block(signed_block);
486
487 let block_outcome = Archiver::new(kzg, erasure_coding).add_block(
489 encoded_block,
490 BlockObjectMapping::default(),
491 false,
492 );
493 let new_archived_segment = block_outcome
494 .archived_segments
495 .into_iter()
496 .next()
497 .expect("Genesis block always results in exactly one archived segment; qed");
498
499 Ok(Some(new_archived_segment))
500}
501
502struct InitializedArchiver<Block>
503where
504 Block: BlockT,
505{
506 archiver: Archiver,
507 best_archived_block: (Block::Hash, NumberFor<Block>),
508}
509
510pub fn encode_block<Block>(mut signed_block: SignedBlock<Block>) -> Vec<u8>
515where
516 Block: BlockT,
517{
518 if signed_block.block.header().number().is_zero() {
519 let mut encoded_block = signed_block.encode();
520
521 let encoded_block_length = encoded_block.len();
522
523 encoded_block.resize(RecordedHistorySegment::SIZE, 0);
530 let mut rng = ChaCha8Rng::from_seed(
531 signed_block
532 .block
533 .header()
534 .state_root()
535 .as_ref()
536 .try_into()
537 .expect("State root in Subspace must be 32 bytes, panic otherwise; qed"),
538 );
539 rng.fill(&mut encoded_block[encoded_block_length..]);
540
541 encoded_block
542 } else {
543 if let Some(justifications) = signed_block.justifications.take() {
545 let mut filtered_justifications = justifications.into_iter().filter(|justification| {
546 let Some(subspace_justification) =
548 SubspaceJustification::try_from_justification(justification)
549 .and_then(|subspace_justification| subspace_justification.ok())
550 else {
551 return false;
552 };
553
554 subspace_justification.must_be_archived()
555 });
556
557 if let Some(first_justification) = filtered_justifications.next() {
558 let mut justifications = Justifications::from(first_justification);
559
560 for justification in filtered_justifications {
561 justifications.append(justification);
562 }
563
564 signed_block.justifications = Some(justifications);
565 }
566 }
567
568 signed_block.encode()
569 }
570}
571
572pub fn decode_block<Block>(
574 mut encoded_block: &[u8],
575) -> Result<SignedBlock<Block>, parity_scale_codec::Error>
576where
577 Block: BlockT,
578{
579 SignedBlock::<Block>::decode(&mut encoded_block)
580}
581
582fn initialize_archiver<Block, Client, AS>(
583 segment_headers_store: &SegmentHeadersStore<AS>,
584 subspace_link: &SubspaceLink<Block>,
585 client: &Client,
586 create_object_mappings: CreateObjectMappings,
587) -> sp_blockchain::Result<InitializedArchiver<Block>>
588where
589 Block: BlockT,
590 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
591 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
592 AS: AuxStore,
593{
594 let client_info = client.info();
595 let best_block_number = TryInto::<BlockNumber>::try_into(client_info.best_number)
596 .unwrap_or_else(|_| {
597 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
598 });
599
600 let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k();
601
602 let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
603 if let Some(block_number) = create_object_mappings.block() {
608 best_block_to_archive = best_block_to_archive.min(block_number);
611 }
612
613 if (best_block_to_archive..best_block_number)
614 .any(|block_number| client.hash(block_number.into()).ok().flatten().is_none())
615 {
616 best_block_to_archive = best_block_number;
620 }
621
622 if create_object_mappings.is_enabled() && best_block_to_archive >= 1 {
626 let Some(best_block_to_archive_hash) = client.hash(best_block_to_archive.into())? else {
627 let error = format!(
628 "Missing hash for mapping block {best_block_to_archive}, \
629 try a higher block number, or wipe your node and restart with `--sync full`"
630 );
631 return Err(sp_blockchain::Error::Application(error.into()));
632 };
633
634 let Some(best_block_data) = client.block(best_block_to_archive_hash)? else {
635 let error = format!(
636 "Missing data for mapping block {best_block_to_archive} \
637 hash {best_block_to_archive_hash}, \
638 try a higher block number, or wipe your node and restart with `--sync full`"
639 );
640 return Err(sp_blockchain::Error::Application(error.into()));
641 };
642
643 client
645 .runtime_api()
646 .extract_block_object_mapping(
647 *best_block_data.block.header().parent_hash(),
648 best_block_data.block.clone(),
649 )
650 .map_err(|error| {
651 sp_blockchain::Error::Application(
652 format!(
653 "Missing state for mapping block {best_block_to_archive} \
654 hash {best_block_to_archive_hash}: {error}, \
655 try a higher block number, or wipe your node and restart with `--sync full`"
656 )
657 .into(),
658 )
659 })?;
660 }
661
662 let maybe_last_archived_block = find_last_archived_block(
663 client,
664 segment_headers_store,
665 best_block_to_archive.into(),
666 create_object_mappings.is_enabled(),
667 )?;
668
669 let have_last_segment_header = maybe_last_archived_block.is_some();
670 let mut best_archived_block = None;
671
672 let mut archiver =
673 if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
674 maybe_last_archived_block
675 {
676 let last_archived_block_number = last_segment_header.last_archived_block().number;
678 info!(
679 %last_archived_block_number,
680 "Resuming archiver from last archived block",
681 );
682
683 best_archived_block.replace((
686 last_archived_block.block.hash(),
687 *last_archived_block.block.header().number(),
688 ));
689
690 let last_archived_block_encoded = encode_block(last_archived_block);
691
692 Archiver::with_initial_state(
693 subspace_link.kzg().clone(),
694 subspace_link.erasure_coding().clone(),
695 last_segment_header,
696 &last_archived_block_encoded,
697 block_object_mappings,
698 )
699 .map_err(|error| {
700 sp_blockchain::Error::Application(
701 format!("Incorrect parameters for archiver: {error:?} {last_segment_header:?}")
702 .into(),
703 )
704 })?
705 } else {
706 info!("Starting archiving from genesis");
707
708 Archiver::new(
709 subspace_link.kzg().clone(),
710 subspace_link.erasure_coding().clone(),
711 )
712 };
713
714 {
716 let blocks_to_archive_from = archiver
717 .last_archived_block_number()
718 .map(|n| n + 1)
719 .unwrap_or_default();
720 let blocks_to_archive_to = best_block_number
721 .checked_sub(confirmation_depth_k)
722 .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
723 .or({
724 if have_last_segment_header {
725 None
726 } else {
727 Some(0)
729 }
730 });
731
732 if let Some(blocks_to_archive_to) = blocks_to_archive_to {
733 info!(
734 "Archiving already produced blocks {}..={}",
735 blocks_to_archive_from, blocks_to_archive_to,
736 );
737
738 let thread_pool = ThreadPoolBuilder::new()
739 .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
740 .build()
741 .map_err(|error| {
742 sp_blockchain::Error::Backend(format!(
743 "Failed to create thread pool for archiver initialization: {error}"
744 ))
745 })?;
746 let blocks_to_archive = thread_pool.install(|| {
748 (blocks_to_archive_from..=blocks_to_archive_to)
749 .into_par_iter()
750 .map_init(
751 || client.runtime_api(),
752 |runtime_api, block_number| {
753 let block_hash = client
754 .hash(block_number.into())?
755 .expect("All blocks since last archived must be present; qed");
756
757 let block = client
758 .block(block_hash)?
759 .expect("All blocks since last archived must be present; qed");
760
761 let block_object_mappings =
762 if create_object_mappings.is_enabled_for_block(block_number) {
763 runtime_api
764 .extract_block_object_mapping(
765 *block.block.header().parent_hash(),
766 block.block.clone(),
767 )
768 .unwrap_or_default()
769 } else {
770 BlockObjectMapping::default()
771 };
772
773 Ok((block, block_object_mappings))
774 },
775 )
776 .collect::<sp_blockchain::Result<Vec<(SignedBlock<_>, _)>>>()
777 })?;
778
779 best_archived_block =
780 blocks_to_archive
781 .last()
782 .map(|(block, _block_object_mappings)| {
783 (block.block.hash(), *block.block.header().number())
784 });
785
786 for (signed_block, block_object_mappings) in blocks_to_archive {
787 let block_number_to_archive = *signed_block.block.header().number();
788 let encoded_block = encode_block(signed_block);
789
790 debug!(
791 "Encoded block {} has size of {:.2} kiB",
792 block_number_to_archive,
793 encoded_block.len() as f32 / 1024.0
794 );
795
796 let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false);
797 send_object_mapping_notification(
798 &subspace_link.object_mapping_notification_sender,
799 block_outcome.object_mapping,
800 block_number_to_archive,
801 );
802 let new_segment_headers: Vec<SegmentHeader> = block_outcome
803 .archived_segments
804 .iter()
805 .map(|archived_segment| archived_segment.segment_header)
806 .collect();
807
808 if !new_segment_headers.is_empty() {
809 segment_headers_store.add_segment_headers(&new_segment_headers)?;
810 }
811 }
812 }
813 }
814
815 Ok(InitializedArchiver {
816 archiver,
817 best_archived_block: best_archived_block
818 .expect("Must always set if there is no logical error; qed"),
819 })
820}
821
822fn finalize_block<Block, Backend, Client>(
823 client: &Client,
824 telemetry: Option<&TelemetryHandle>,
825 hash: Block::Hash,
826 number: NumberFor<Block>,
827) where
828 Block: BlockT,
829 Backend: BackendT<Block>,
830 Client: LockImportRun<Block, Backend> + Finalizer<Block, Backend>,
831{
832 if number.is_zero() {
833 return;
835 }
836 let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
839 client
842 .apply_finality(import_op, hash, None, true)
843 .map_err(|error| {
844 warn!(
845 "Error applying finality to block {:?}: {}",
846 (hash, number),
847 error
848 );
849 error
850 })?;
851
852 debug!("Finalizing blocks up to ({:?}, {})", number, hash);
853
854 telemetry!(
855 telemetry;
856 CONSENSUS_INFO;
857 "subspace.finalized_blocks_up_to";
858 "number" => ?number, "hash" => ?hash,
859 );
860
861 Ok(())
862 });
863}
864
865pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
895 segment_headers_store: SegmentHeadersStore<AS>,
896 subspace_link: SubspaceLink<Block>,
897 client: Arc<Client>,
898 sync_oracle: SubspaceSyncOracle<SO>,
899 telemetry: Option<TelemetryHandle>,
900 create_object_mappings: CreateObjectMappings,
901) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
902where
903 Block: BlockT,
904 Backend: BackendT<Block>,
905 Client: ProvideRuntimeApi<Block>
906 + BlockBackend<Block>
907 + HeaderBackend<Block>
908 + LockImportRun<Block, Backend>
909 + Finalizer<Block, Backend>
910 + BlockchainEvents<Block>
911 + AuxStore
912 + Send
913 + Sync
914 + 'static,
915 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
916 AS: AuxStore + Send + Sync + 'static,
917 SO: SyncOracle + Send + Sync + 'static,
918{
919 if create_object_mappings.is_enabled() {
920 info!(
921 ?create_object_mappings,
922 "Creating object mappings from the configured block onwards"
923 );
924 } else {
925 info!("Not creating object mappings");
926 }
927
928 let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
929 Some(initialize_archiver(
930 &segment_headers_store,
931 &subspace_link,
932 client.as_ref(),
933 create_object_mappings,
934 )?)
935 } else {
936 None
937 };
938
939 let mut block_importing_notification_stream = subspace_link
941 .block_importing_notification_stream
942 .subscribe();
943
944 Ok(async move {
945 let archiver = match maybe_archiver {
946 Some(archiver) => archiver,
947 None => initialize_archiver(
948 &segment_headers_store,
949 &subspace_link,
950 client.as_ref(),
951 create_object_mappings,
952 )?,
953 };
954 let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k().into();
955
956 let InitializedArchiver {
957 mut archiver,
958 best_archived_block,
959 } = archiver;
960 let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block;
961
962 while let Some(block_importing_notification) =
963 block_importing_notification_stream.next().await
964 {
965 let importing_block_number = block_importing_notification.block_number;
966 let block_number_to_archive =
967 match importing_block_number.checked_sub(&confirmation_depth_k) {
968 Some(block_number_to_archive) => block_number_to_archive,
969 None => {
970 continue;
972 }
973 };
974
975 let last_archived_block_number = segment_headers_store
976 .last_segment_header()
977 .expect("Exists after archiver initialization; qed")
978 .last_archived_block()
979 .number;
980 let create_mappings =
981 create_object_mappings.is_enabled_for_block(last_archived_block_number);
982 let last_archived_block_number = NumberFor::<Block>::from(last_archived_block_number);
983 trace!(
984 %importing_block_number,
985 %block_number_to_archive,
986 %best_archived_block_number,
987 %last_archived_block_number,
988 "Checking if block needs to be skipped"
989 );
990
991 let skip_last_archived_blocks =
993 last_archived_block_number > block_number_to_archive && !create_mappings;
994 if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
995 debug!(
997 %importing_block_number,
998 %block_number_to_archive,
999 %best_archived_block_number,
1000 %last_archived_block_number,
1001 "Skipping already archived block",
1002 );
1003 continue;
1004 }
1005
1006 if best_archived_block_number + One::one() != block_number_to_archive {
1011 InitializedArchiver {
1012 archiver,
1013 best_archived_block: (best_archived_block_hash, best_archived_block_number),
1014 } = initialize_archiver(
1015 &segment_headers_store,
1016 &subspace_link,
1017 client.as_ref(),
1018 create_object_mappings,
1019 )?;
1020
1021 if best_archived_block_number + One::one() == block_number_to_archive {
1022 } else if best_archived_block_number >= block_number_to_archive {
1024 continue;
1027 } else if client
1028 .block_hash(importing_block_number - One::one())?
1029 .is_none()
1030 {
1031 continue;
1036 } else {
1037 let error = format!(
1038 "There was a gap in blockchain history and the last contiguous series of \
1039 blocks starting with doesn't start with archived segment (best archived \
1040 block number {best_archived_block_number}, block number to archive \
1041 {block_number_to_archive}), block about to be imported \
1042 {importing_block_number}), archiver can't continue",
1043 );
1044 return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1045 error.into(),
1046 )));
1047 }
1048 }
1049
1050 let max_segment_index_before = segment_headers_store.max_segment_index();
1051 (best_archived_block_hash, best_archived_block_number) = archive_block(
1052 &mut archiver,
1053 segment_headers_store.clone(),
1054 &*client,
1055 &sync_oracle,
1056 subspace_link.object_mapping_notification_sender.clone(),
1057 subspace_link.archived_segment_notification_sender.clone(),
1058 best_archived_block_hash,
1059 block_number_to_archive,
1060 create_object_mappings,
1061 )
1062 .await?;
1063
1064 let max_segment_index = segment_headers_store.max_segment_index();
1065 if max_segment_index_before != max_segment_index {
1066 let maybe_block_number_to_finalize = max_segment_index
1067 .and_then(|max_segment_index| {
1069 max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
1070 })
1071 .and_then(|segment_index| {
1072 segment_headers_store.get_segment_header(segment_index)
1073 })
1074 .map(|segment_header| segment_header.last_archived_block().number)
1075 .map(|block_number| block_number_to_archive.min(block_number.into()))
1078 .filter(|block_number| *block_number > client.info().finalized_number);
1080
1081 if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
1082 {
1083 let mut import_notification = client.every_import_notification_stream();
1084
1085 drop(block_importing_notification);
1088
1089 while let Some(notification) = import_notification.next().await {
1090 if notification.header.number() == &importing_block_number {
1092 break;
1093 }
1094 }
1095 }
1096
1097 if let Some(block_hash_to_finalize) =
1100 client.block_hash(block_number_to_finalize)?
1101 {
1102 finalize_block(
1103 &*client,
1104 telemetry.as_ref(),
1105 block_hash_to_finalize,
1106 block_number_to_finalize,
1107 );
1108 }
1109 }
1110 }
1111 }
1112
1113 Ok(())
1114 })
1115}
1116
1117#[allow(clippy::too_many_arguments)]
1119async fn archive_block<Block, Backend, Client, AS, SO>(
1120 archiver: &mut Archiver,
1121 segment_headers_store: SegmentHeadersStore<AS>,
1122 client: &Client,
1123 sync_oracle: &SubspaceSyncOracle<SO>,
1124 object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
1125 archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
1126 best_archived_block_hash: Block::Hash,
1127 block_number_to_archive: NumberFor<Block>,
1128 create_object_mappings: CreateObjectMappings,
1129) -> sp_blockchain::Result<(Block::Hash, NumberFor<Block>)>
1130where
1131 Block: BlockT,
1132 Backend: BackendT<Block>,
1133 Client: ProvideRuntimeApi<Block>
1134 + BlockBackend<Block>
1135 + HeaderBackend<Block>
1136 + LockImportRun<Block, Backend>
1137 + Finalizer<Block, Backend>
1138 + AuxStore
1139 + Send
1140 + Sync
1141 + 'static,
1142 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
1143 AS: AuxStore + Send + Sync + 'static,
1144 SO: SyncOracle + Send + Sync + 'static,
1145{
1146 let block = client
1147 .block(
1148 client
1149 .block_hash(block_number_to_archive)?
1150 .expect("Older block by number must always exist"),
1151 )?
1152 .expect("Older block by number must always exist");
1153
1154 let parent_block_hash = *block.block.header().parent_hash();
1155 let block_hash_to_archive = block.block.hash();
1156
1157 debug!(
1158 "Archiving block {:?} ({})",
1159 block_number_to_archive, block_hash_to_archive
1160 );
1161
1162 if parent_block_hash != best_archived_block_hash {
1163 let error = format!(
1164 "Attempt to switch to a different fork beyond archiving depth, \
1165 can't do it: parent block hash {}, best archived block hash {}",
1166 parent_block_hash, best_archived_block_hash
1167 );
1168 return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1169 error.into(),
1170 )));
1171 }
1172
1173 let create_mappings = create_object_mappings.is_enabled_for_block(
1174 block_number_to_archive.try_into().unwrap_or_else(|_| {
1175 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed")
1176 }),
1177 );
1178
1179 let block_object_mappings = if create_mappings {
1180 client
1181 .runtime_api()
1182 .extract_block_object_mapping(parent_block_hash, block.block.clone())
1183 .map_err(|error| {
1184 sp_blockchain::Error::Application(
1185 format!("Failed to retrieve block object mappings: {error}").into(),
1186 )
1187 })?
1188 } else {
1189 BlockObjectMapping::default()
1190 };
1191
1192 let encoded_block = encode_block(block);
1193 debug!(
1194 "Encoded block {} has size of {:.2} kiB",
1195 block_number_to_archive,
1196 encoded_block.len() as f32 / 1024.0
1197 );
1198
1199 let block_outcome = archiver.add_block(
1200 encoded_block,
1201 block_object_mappings,
1202 !sync_oracle.is_major_syncing(),
1203 );
1204 send_object_mapping_notification(
1205 &object_mapping_notification_sender,
1206 block_outcome.object_mapping,
1207 block_number_to_archive,
1208 );
1209 for archived_segment in block_outcome.archived_segments {
1210 let segment_header = archived_segment.segment_header;
1211
1212 segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
1213
1214 send_archived_segment_notification(&archived_segment_notification_sender, archived_segment)
1215 .await;
1216 }
1217
1218 Ok((block_hash_to_archive, block_number_to_archive))
1219}
1220
1221fn send_object_mapping_notification<BlockNum>(
1222 object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
1223 object_mapping: Vec<GlobalObject>,
1224 block_number: BlockNum,
1225) where
1226 BlockNum: BlockNumberT,
1227{
1228 if object_mapping.is_empty() {
1229 return;
1230 }
1231
1232 let block_number = TryInto::<BlockNumber>::try_into(block_number).unwrap_or_else(|_| {
1233 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
1234 });
1235
1236 let object_mapping_notification = ObjectMappingNotification {
1237 object_mapping,
1238 block_number,
1239 };
1240
1241 object_mapping_notification_sender.notify(move || object_mapping_notification);
1242}
1243
1244async fn send_archived_segment_notification(
1245 archived_segment_notification_sender: &SubspaceNotificationSender<ArchivedSegmentNotification>,
1246 archived_segment: NewArchivedSegment,
1247) {
1248 let segment_index = archived_segment.segment_header.segment_index();
1249 let (acknowledgement_sender, mut acknowledgement_receiver) =
1250 tracing_unbounded::<()>("subspace_acknowledgement", 1000);
1251 let archived_segment = Arc::new(archived_segment);
1254 let archived_segment_notification = ArchivedSegmentNotification {
1255 archived_segment: Arc::clone(&archived_segment),
1256 acknowledgement_sender,
1257 };
1258
1259 archived_segment_notification_sender.notify(move || archived_segment_notification);
1260
1261 let wait_fut = async {
1262 while acknowledgement_receiver.next().await.is_some() {
1263 debug!(
1264 "Archived segment notification acknowledged: {}",
1265 segment_index
1266 );
1267 }
1268 };
1269
1270 if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
1271 .await
1272 .is_err()
1273 {
1274 warn!(
1275 "Archived segment notification was not acknowledged and reached timeout, continue \
1276 regardless"
1277 );
1278 }
1279}