1#[cfg(test)]
34mod tests;
35
36use crate::slot_worker::SubspaceSyncOracle;
37use crate::{SubspaceLink, SubspaceNotificationSender};
38use futures::StreamExt;
39use parity_scale_codec::{Decode, Encode};
40use parking_lot::RwLock;
41use rand::prelude::*;
42use rand_chacha::ChaCha8Rng;
43use rayon::ThreadPoolBuilder;
44use rayon::prelude::*;
45use sc_client_api::{
46 AuxStore, Backend as BackendT, BlockBackend, BlockImportOperation, BlockchainEvents, Finalizer,
47 LockImportRun,
48};
49use sc_telemetry::{CONSENSUS_INFO, TelemetryHandle, telemetry};
50use sc_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
51use sp_api::ProvideRuntimeApi;
52use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
53use sp_consensus::SyncOracle;
54use sp_consensus_subspace::{SubspaceApi, SubspaceJustification};
55use sp_objects::ObjectsApi;
56use sp_runtime::Justifications;
57use sp_runtime::generic::SignedBlock;
58use sp_runtime::traits::{
59 Block as BlockT, BlockNumber as BlockNumberT, CheckedSub, Header, NumberFor, One, Zero,
60};
61use std::error::Error;
62use std::future::Future;
63use std::num::NonZeroU32;
64use std::slice;
65use std::sync::Arc;
66use std::sync::atomic::{AtomicU16, Ordering};
67use std::time::Duration;
68use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
69use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject};
70use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
71use subspace_core_primitives::{BlockNumber, PublicKey};
72use subspace_erasure_coding::ErasureCoding;
73use subspace_kzg::Kzg;
74use tracing::{debug, info, trace, warn};
75
76const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 6;
79const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
81
82pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
91
92#[derive(Debug)]
93struct SegmentHeadersStoreInner<AS> {
94 aux_store: Arc<AS>,
95 next_key_index: AtomicU16,
96 cache: RwLock<Vec<SegmentHeader>>,
98}
99
100#[derive(Debug)]
111pub struct SegmentHeadersStore<AS> {
112 inner: Arc<SegmentHeadersStoreInner<AS>>,
113 confirmation_depth_k: BlockNumber,
114}
115
116impl<AS> Clone for SegmentHeadersStore<AS> {
117 fn clone(&self) -> Self {
118 Self {
119 inner: Arc::clone(&self.inner),
120 confirmation_depth_k: self.confirmation_depth_k,
121 }
122 }
123}
124
125impl<AS> SegmentHeadersStore<AS>
126where
127 AS: AuxStore,
128{
129 const KEY_PREFIX: &'static [u8] = b"segment-headers";
130 const INITIAL_CACHE_CAPACITY: usize = 1_000;
131
132 pub fn new(
134 aux_store: Arc<AS>,
135 confirmation_depth_k: BlockNumber,
136 ) -> sp_blockchain::Result<Self> {
137 let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
138
139 debug!("Started loading segment headers into cache");
140 let mut next_key_index = 0;
144 while let Some(segment_headers) =
145 aux_store
146 .get_aux(&Self::key(next_key_index))?
147 .map(|segment_header| {
148 Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
149 .expect("Always correct segment header unless DB is corrupted; qed")
150 })
151 {
152 cache.extend(segment_headers);
153 next_key_index += 1;
154 }
155 debug!("Finished loading segment headers into cache");
156
157 Ok(Self {
158 inner: Arc::new(SegmentHeadersStoreInner {
159 aux_store,
160 next_key_index: AtomicU16::new(next_key_index),
161 cache: RwLock::new(cache),
162 }),
163 confirmation_depth_k,
164 })
165 }
166
167 pub fn last_segment_header(&self) -> Option<SegmentHeader> {
169 self.inner.cache.read().last().cloned()
170 }
171
172 pub fn max_segment_index(&self) -> Option<SegmentIndex> {
174 let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
175 Some(SegmentIndex::from(segment_index))
176 }
177
178 pub fn add_segment_headers(
182 &self,
183 segment_headers: &[SegmentHeader],
184 ) -> sp_blockchain::Result<()> {
185 let mut maybe_last_segment_index = self.max_segment_index();
186 let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
187 for segment_header in segment_headers {
190 let segment_index = segment_header.segment_index();
191 match maybe_last_segment_index {
192 Some(last_segment_index) => {
193 if segment_index <= last_segment_index {
194 continue;
196 }
197
198 if segment_index != last_segment_index + SegmentIndex::ONE {
199 let error = format!(
200 "Segment index {segment_index} must strictly follow {last_segment_index}, can't store segment header"
201 );
202 return Err(sp_blockchain::Error::Application(error.into()));
203 }
204
205 segment_headers_to_store.push(segment_header);
206 maybe_last_segment_index.replace(segment_index);
207 }
208 None => {
209 if segment_index != SegmentIndex::ZERO {
210 let error = format!(
211 "First segment header index must be zero, found index {segment_index}"
212 );
213 return Err(sp_blockchain::Error::Application(error.into()));
214 }
215
216 segment_headers_to_store.push(segment_header);
217 maybe_last_segment_index.replace(segment_index);
218 }
219 }
220 }
221
222 if segment_headers_to_store.is_empty() {
223 return Ok(());
224 }
225
226 {
230 let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
231 let key = Self::key(key_index);
232 let value = segment_headers_to_store.encode();
233 let insert_data = vec![(key.as_slice(), value.as_slice())];
234
235 self.inner.aux_store.insert_aux(&insert_data, &[])?;
236 }
237 self.inner.cache.write().extend(segment_headers_to_store);
238
239 Ok(())
240 }
241
242 pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
244 self.inner
245 .cache
246 .read()
247 .get(u64::from(segment_index) as usize)
248 .copied()
249 }
250
251 fn key(key_index: u16) -> Vec<u8> {
252 (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
253 }
254
255 pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
257 let Some(last_segment_index) = self.max_segment_index() else {
258 return Vec::new();
260 };
261
262 if block_number == 1 {
264 return vec![
267 self.get_segment_header(SegmentIndex::ZERO)
268 .expect("Segment headers are stored in monotonically increasing order; qed"),
269 ];
270 }
271
272 if last_segment_index == SegmentIndex::ZERO {
273 return Vec::new();
275 }
276
277 let mut current_segment_index = last_segment_index;
278 loop {
279 let current_segment_header = self
282 .get_segment_header(current_segment_index)
283 .expect("Segment headers are stored in monotonically increasing order; qed");
284
285 let target_block_number =
287 current_segment_header.last_archived_block().number + 1 + self.confirmation_depth_k;
288 if target_block_number == block_number {
289 let mut headers_for_block = vec![current_segment_header];
290
291 let last_archived_block_number =
293 current_segment_header.last_archived_block().number;
294 let mut segment_index = current_segment_index - SegmentIndex::ONE;
295
296 while let Some(segment_header) = self.get_segment_header(segment_index) {
297 if segment_header.last_archived_block().number == last_archived_block_number {
298 headers_for_block.insert(0, segment_header);
299 segment_index -= SegmentIndex::ONE;
300 } else {
301 break;
302 }
303 }
304
305 return headers_for_block;
306 }
307
308 if target_block_number > block_number {
310 if current_segment_index > SegmentIndex::ONE {
312 current_segment_index -= SegmentIndex::ONE
313 } else {
314 break;
315 }
316 } else {
317 return Vec::new();
319 }
320 }
321
322 Vec::new()
324 }
325}
326
327#[derive(Debug, Clone)]
329pub struct ArchivedSegmentNotification {
330 pub archived_segment: Arc<NewArchivedSegment>,
332 pub acknowledgement_sender: TracingUnboundedSender<()>,
336}
337
338#[derive(Debug, Clone)]
341pub struct ObjectMappingNotification {
342 pub object_mapping: Vec<GlobalObject>,
346 pub block_number: BlockNumber,
348 }
350
351#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
353pub enum CreateObjectMappings {
354 Block(NonZeroU32),
363
364 Yes,
366
367 #[default]
369 No,
370}
371
372impl CreateObjectMappings {
373 fn block(&self) -> Option<BlockNumber> {
376 match self {
377 CreateObjectMappings::Block(block) => Some(block.get()),
378 CreateObjectMappings::Yes => None,
379 CreateObjectMappings::No => None,
380 }
381 }
382
383 pub fn is_enabled(&self) -> bool {
385 !matches!(self, CreateObjectMappings::No)
386 }
387
388 pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
390 if !self.is_enabled() {
391 return false;
392 }
393
394 if let Some(target_block) = self.block() {
395 return block >= target_block;
396 }
397
398 true
400 }
401}
402
403fn find_last_archived_block<Block, Client, AS>(
404 client: &Client,
405 segment_headers_store: &SegmentHeadersStore<AS>,
406 best_block_to_archive: NumberFor<Block>,
407 create_object_mappings: bool,
408) -> sp_blockchain::Result<Option<(SegmentHeader, SignedBlock<Block>, BlockObjectMapping)>>
409where
410 Block: BlockT,
411 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
412 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
413 AS: AuxStore,
414{
415 let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
416 return Ok(None);
417 };
418
419 if max_segment_index == SegmentIndex::ZERO {
420 return Ok(None);
422 }
423
424 for segment_header in (SegmentIndex::ZERO..=max_segment_index)
425 .rev()
426 .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
427 {
428 let last_archived_block_number = segment_header.last_archived_block().number;
429
430 if NumberFor::<Block>::from(last_archived_block_number) > best_block_to_archive {
431 continue;
435 }
436 let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
437 continue;
440 };
441
442 let Some(last_archived_block) = client.block(last_archived_block_hash)? else {
443 continue;
445 };
446
447 let block_object_mappings = if create_object_mappings {
449 client
450 .runtime_api()
451 .extract_block_object_mapping(
452 *last_archived_block.block.header().parent_hash(),
453 last_archived_block.block.clone(),
454 )
455 .unwrap_or_default()
456 } else {
457 BlockObjectMapping::default()
458 };
459
460 return Ok(Some((
461 segment_header,
462 last_archived_block,
463 block_object_mappings,
464 )));
465 }
466
467 Ok(None)
468}
469
470pub fn recreate_genesis_segment<Block, Client>(
472 client: &Client,
473 kzg: Kzg,
474 erasure_coding: ErasureCoding,
475) -> Result<Option<NewArchivedSegment>, Box<dyn Error>>
476where
477 Block: BlockT,
478 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
479 Client::Api: ObjectsApi<Block>,
480{
481 let genesis_hash = client.info().genesis_hash;
482 let Some(signed_block) = client.block(genesis_hash)? else {
483 return Ok(None);
484 };
485
486 let encoded_block = encode_block(signed_block);
487
488 let block_outcome = Archiver::new(kzg, erasure_coding).add_block(
490 encoded_block,
491 BlockObjectMapping::default(),
492 false,
493 );
494 let new_archived_segment = block_outcome
495 .archived_segments
496 .into_iter()
497 .next()
498 .expect("Genesis block always results in exactly one archived segment; qed");
499
500 Ok(Some(new_archived_segment))
501}
502
503struct InitializedArchiver<Block>
504where
505 Block: BlockT,
506{
507 archiver: Archiver,
508 best_archived_block: (Block::Hash, NumberFor<Block>),
509}
510
511pub fn encode_block<Block>(mut signed_block: SignedBlock<Block>) -> Vec<u8>
516where
517 Block: BlockT,
518{
519 if signed_block.block.header().number().is_zero() {
520 let mut encoded_block = signed_block.encode();
521
522 let encoded_block_length = encoded_block.len();
523
524 encoded_block.resize(RecordedHistorySegment::SIZE, 0);
531 let mut rng = ChaCha8Rng::from_seed(
532 signed_block
533 .block
534 .header()
535 .state_root()
536 .as_ref()
537 .try_into()
538 .expect("State root in Subspace must be 32 bytes, panic otherwise; qed"),
539 );
540 rng.fill(&mut encoded_block[encoded_block_length..]);
541
542 encoded_block
543 } else {
544 if let Some(justifications) = signed_block.justifications.take() {
546 let mut filtered_justifications = justifications.into_iter().filter(|justification| {
547 let Some(subspace_justification) =
549 SubspaceJustification::try_from_justification(justification)
550 .and_then(|subspace_justification| subspace_justification.ok())
551 else {
552 return false;
553 };
554
555 subspace_justification.must_be_archived()
556 });
557
558 if let Some(first_justification) = filtered_justifications.next() {
559 let mut justifications = Justifications::from(first_justification);
560
561 for justification in filtered_justifications {
562 justifications.append(justification);
563 }
564
565 signed_block.justifications = Some(justifications);
566 }
567 }
568
569 signed_block.encode()
570 }
571}
572
573pub fn decode_block<Block>(
575 mut encoded_block: &[u8],
576) -> Result<SignedBlock<Block>, parity_scale_codec::Error>
577where
578 Block: BlockT,
579{
580 SignedBlock::<Block>::decode(&mut encoded_block)
581}
582
583fn initialize_archiver<Block, Client, AS>(
584 segment_headers_store: &SegmentHeadersStore<AS>,
585 subspace_link: &SubspaceLink<Block>,
586 client: &Client,
587 create_object_mappings: CreateObjectMappings,
588) -> sp_blockchain::Result<InitializedArchiver<Block>>
589where
590 Block: BlockT,
591 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
592 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
593 AS: AuxStore,
594{
595 let client_info = client.info();
596 let best_block_number = TryInto::<BlockNumber>::try_into(client_info.best_number)
597 .unwrap_or_else(|_| {
598 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
599 });
600
601 let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k();
602
603 let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
604 if let Some(block_number) = create_object_mappings.block() {
609 best_block_to_archive = best_block_to_archive.min(block_number);
612 }
613
614 if (best_block_to_archive..best_block_number)
615 .any(|block_number| client.hash(block_number.into()).ok().flatten().is_none())
616 {
617 best_block_to_archive = best_block_number;
621 }
622
623 if create_object_mappings.is_enabled() && best_block_to_archive >= 1 {
627 let Some(best_block_to_archive_hash) = client.hash(best_block_to_archive.into())? else {
628 let error = format!(
629 "Missing hash for mapping block {best_block_to_archive}, \
630 try a higher block number, or wipe your node and restart with `--sync full`"
631 );
632 return Err(sp_blockchain::Error::Application(error.into()));
633 };
634
635 let Some(best_block_data) = client.block(best_block_to_archive_hash)? else {
636 let error = format!(
637 "Missing data for mapping block {best_block_to_archive} \
638 hash {best_block_to_archive_hash}, \
639 try a higher block number, or wipe your node and restart with `--sync full`"
640 );
641 return Err(sp_blockchain::Error::Application(error.into()));
642 };
643
644 client
646 .runtime_api()
647 .extract_block_object_mapping(
648 *best_block_data.block.header().parent_hash(),
649 best_block_data.block.clone(),
650 )
651 .map_err(|error| {
652 sp_blockchain::Error::Application(
653 format!(
654 "Missing state for mapping block {best_block_to_archive} \
655 hash {best_block_to_archive_hash}: {error}, \
656 try a higher block number, or wipe your node and restart with `--sync full`"
657 )
658 .into(),
659 )
660 })?;
661 }
662
663 let maybe_last_archived_block = find_last_archived_block(
664 client,
665 segment_headers_store,
666 best_block_to_archive.into(),
667 create_object_mappings.is_enabled(),
668 )?;
669
670 let have_last_segment_header = maybe_last_archived_block.is_some();
671 let mut best_archived_block = None;
672
673 let mut archiver =
674 if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
675 maybe_last_archived_block
676 {
677 let last_archived_block_number = last_segment_header.last_archived_block().number;
679 info!(
680 %last_archived_block_number,
681 "Resuming archiver from last archived block",
682 );
683
684 best_archived_block.replace((
687 last_archived_block.block.hash(),
688 *last_archived_block.block.header().number(),
689 ));
690
691 let last_archived_block_encoded = encode_block(last_archived_block);
692
693 Archiver::with_initial_state(
694 subspace_link.kzg().clone(),
695 subspace_link.erasure_coding().clone(),
696 last_segment_header,
697 &last_archived_block_encoded,
698 block_object_mappings,
699 )
700 .map_err(|error| {
701 sp_blockchain::Error::Application(
702 format!("Incorrect parameters for archiver: {error:?} {last_segment_header:?}")
703 .into(),
704 )
705 })?
706 } else {
707 info!("Starting archiving from genesis");
708
709 Archiver::new(
710 subspace_link.kzg().clone(),
711 subspace_link.erasure_coding().clone(),
712 )
713 };
714
715 {
717 let blocks_to_archive_from = archiver
718 .last_archived_block_number()
719 .map(|n| n + 1)
720 .unwrap_or_default();
721 let blocks_to_archive_to = best_block_number
722 .checked_sub(confirmation_depth_k)
723 .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
724 .or({
725 if have_last_segment_header {
726 None
727 } else {
728 Some(0)
730 }
731 });
732
733 if let Some(blocks_to_archive_to) = blocks_to_archive_to {
734 info!(
735 "Archiving already produced blocks {}..={}",
736 blocks_to_archive_from, blocks_to_archive_to,
737 );
738
739 let thread_pool = ThreadPoolBuilder::new()
740 .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
741 .build()
742 .map_err(|error| {
743 sp_blockchain::Error::Backend(format!(
744 "Failed to create thread pool for archiver initialization: {error}"
745 ))
746 })?;
747 let blocks_to_archive = thread_pool.install(|| {
749 (blocks_to_archive_from..=blocks_to_archive_to)
750 .into_par_iter()
751 .map_init(
752 || client.runtime_api(),
753 |runtime_api, block_number| {
754 let block_hash = client
755 .hash(block_number.into())?
756 .expect("All blocks since last archived must be present; qed");
757
758 let block = client
759 .block(block_hash)?
760 .expect("All blocks since last archived must be present; qed");
761
762 let block_object_mappings =
763 if create_object_mappings.is_enabled_for_block(block_number) {
764 runtime_api
765 .extract_block_object_mapping(
766 *block.block.header().parent_hash(),
767 block.block.clone(),
768 )
769 .unwrap_or_default()
770 } else {
771 BlockObjectMapping::default()
772 };
773
774 Ok((block, block_object_mappings))
775 },
776 )
777 .collect::<sp_blockchain::Result<Vec<(SignedBlock<_>, _)>>>()
778 })?;
779
780 best_archived_block =
781 blocks_to_archive
782 .last()
783 .map(|(block, _block_object_mappings)| {
784 (block.block.hash(), *block.block.header().number())
785 });
786
787 for (signed_block, block_object_mappings) in blocks_to_archive {
788 let block_number_to_archive = *signed_block.block.header().number();
789 let encoded_block = encode_block(signed_block);
790
791 debug!(
792 "Encoded block {} has size of {}",
793 block_number_to_archive,
794 bytesize::to_string(encoded_block.len() as u64, true),
795 );
796
797 let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false);
798 send_object_mapping_notification(
799 &subspace_link.object_mapping_notification_sender,
800 block_outcome.object_mapping,
801 block_number_to_archive,
802 );
803 let new_segment_headers: Vec<SegmentHeader> = block_outcome
804 .archived_segments
805 .iter()
806 .map(|archived_segment| archived_segment.segment_header)
807 .collect();
808
809 if !new_segment_headers.is_empty() {
810 segment_headers_store.add_segment_headers(&new_segment_headers)?;
811 }
812 }
813 }
814 }
815
816 Ok(InitializedArchiver {
817 archiver,
818 best_archived_block: best_archived_block
819 .expect("Must always set if there is no logical error; qed"),
820 })
821}
822
823fn finalize_block<Block, Backend, Client>(
824 client: &Client,
825 backend: &Backend,
826 telemetry: Option<&TelemetryHandle>,
827 hash: Block::Hash,
828 number: NumberFor<Block>,
829) where
830 Block: BlockT,
831 Backend: BackendT<Block>,
832 Client: LockImportRun<Block, Backend> + Finalizer<Block, Backend> + HeaderBackend<Block>,
833{
834 if number.is_zero() {
835 return;
837 }
838
839 let displaced = match client.header(hash) {
840 Ok(Some(header)) => {
841 let parent_hash = *header.parent_hash();
842 match backend
843 .blockchain()
844 .displaced_leaves_after_finalizing(hash, number, parent_hash)
845 {
846 Ok(d) => Some(d),
847 Err(error) => {
848 warn!(
849 ?error,
850 ?hash,
851 ?parent_hash,
852 ?number,
853 "Failed to compute displaced leaves; their block_weight entries will leak"
854 );
855 None
856 }
857 }
858 }
859 Ok(None) => {
860 warn!(
861 ?hash,
862 ?number,
863 "Header missing for finalized hash; skipping displaced-fork cleanup"
864 );
865 None
866 }
867 Err(error) => {
868 warn!(
869 ?error,
870 ?hash,
871 ?number,
872 "Failed to load header for finalized hash; skipping displaced-fork cleanup"
873 );
874 None
875 }
876 };
877
878 let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
881 client
884 .apply_finality(import_op, hash, None, true)
885 .map_err(|error| {
886 warn!(
887 "Error applying finality to block {:?}: {}",
888 (hash, number),
889 error
890 );
891 error
892 })?;
893
894 if let Some(d) = &displaced
895 && !d.displaced_blocks.is_empty()
896 {
897 let count = d.displaced_blocks.len();
898 let tombstones = d
899 .displaced_blocks
900 .iter()
901 .map(|h| (crate::aux_schema::block_weight_key(h), None));
902 import_op.op.insert_aux(tombstones).map_err(|error| {
903 warn!(
904 ?error,
905 count, "Failed to queue displaced fork tombstones; rolling back finalization",
906 );
907 error
908 })?;
909 }
910
911 debug!("Finalizing blocks up to ({:?}, {})", number, hash);
912
913 telemetry!(
914 telemetry;
915 CONSENSUS_INFO;
916 "subspace.finalized_blocks_up_to";
917 "number" => ?number, "hash" => ?hash,
918 );
919
920 Ok(())
921 });
922}
923
924pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
954 segment_headers_store: SegmentHeadersStore<AS>,
955 subspace_link: SubspaceLink<Block>,
956 client: Arc<Client>,
957 backend: Arc<Backend>,
958 sync_oracle: SubspaceSyncOracle<SO>,
959 telemetry: Option<TelemetryHandle>,
960 create_object_mappings: CreateObjectMappings,
961) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
962where
963 Block: BlockT,
964 Backend: BackendT<Block> + 'static,
965 Client: ProvideRuntimeApi<Block>
966 + BlockBackend<Block>
967 + HeaderBackend<Block>
968 + LockImportRun<Block, Backend>
969 + Finalizer<Block, Backend>
970 + BlockchainEvents<Block>
971 + AuxStore
972 + Send
973 + Sync
974 + 'static,
975 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
976 AS: AuxStore + Send + Sync + 'static,
977 SO: SyncOracle + Send + Sync + 'static,
978{
979 if create_object_mappings.is_enabled() {
980 info!(
981 ?create_object_mappings,
982 "Creating object mappings from the configured block onwards"
983 );
984 } else {
985 info!("Not creating object mappings");
986 }
987
988 let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
989 Some(initialize_archiver(
990 &segment_headers_store,
991 &subspace_link,
992 client.as_ref(),
993 create_object_mappings,
994 )?)
995 } else {
996 None
997 };
998
999 let mut block_importing_notification_stream = subspace_link
1001 .block_importing_notification_stream
1002 .subscribe();
1003
1004 Ok(async move {
1005 let archiver = match maybe_archiver {
1006 Some(archiver) => archiver,
1007 None => initialize_archiver(
1008 &segment_headers_store,
1009 &subspace_link,
1010 client.as_ref(),
1011 create_object_mappings,
1012 )?,
1013 };
1014 let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k().into();
1015
1016 let InitializedArchiver {
1017 mut archiver,
1018 best_archived_block,
1019 } = archiver;
1020 let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block;
1021
1022 while let Some(block_importing_notification) =
1023 block_importing_notification_stream.next().await
1024 {
1025 let importing_block_number = block_importing_notification.block_number;
1026 let block_number_to_archive =
1027 match importing_block_number.checked_sub(&confirmation_depth_k) {
1028 Some(block_number_to_archive) => block_number_to_archive,
1029 None => {
1030 continue;
1032 }
1033 };
1034
1035 let last_archived_block_number = segment_headers_store
1036 .last_segment_header()
1037 .expect("Exists after archiver initialization; qed")
1038 .last_archived_block()
1039 .number;
1040 let create_mappings =
1041 create_object_mappings.is_enabled_for_block(last_archived_block_number);
1042 let last_archived_block_number = NumberFor::<Block>::from(last_archived_block_number);
1043 trace!(
1044 %importing_block_number,
1045 %block_number_to_archive,
1046 %best_archived_block_number,
1047 %last_archived_block_number,
1048 "Checking if block needs to be skipped"
1049 );
1050
1051 let skip_last_archived_blocks =
1053 last_archived_block_number > block_number_to_archive && !create_mappings;
1054 if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
1055 debug!(
1057 %importing_block_number,
1058 %block_number_to_archive,
1059 %best_archived_block_number,
1060 %last_archived_block_number,
1061 "Skipping already archived block",
1062 );
1063 continue;
1064 }
1065
1066 if best_archived_block_number + One::one() != block_number_to_archive {
1071 InitializedArchiver {
1072 archiver,
1073 best_archived_block: (best_archived_block_hash, best_archived_block_number),
1074 } = initialize_archiver(
1075 &segment_headers_store,
1076 &subspace_link,
1077 client.as_ref(),
1078 create_object_mappings,
1079 )?;
1080
1081 if best_archived_block_number + One::one() == block_number_to_archive {
1082 } else if best_archived_block_number >= block_number_to_archive {
1084 continue;
1087 } else if client
1088 .block_hash(importing_block_number - One::one())?
1089 .is_none()
1090 {
1091 continue;
1096 } else {
1097 let error = format!(
1098 "There was a gap in blockchain history and the last contiguous series of \
1099 blocks starting with doesn't start with archived segment (best archived \
1100 block number {best_archived_block_number}, block number to archive \
1101 {block_number_to_archive}), block about to be imported \
1102 {importing_block_number}), archiver can't continue",
1103 );
1104 return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1105 error.into(),
1106 )));
1107 }
1108 }
1109
1110 let max_segment_index_before = segment_headers_store.max_segment_index();
1111 (best_archived_block_hash, best_archived_block_number) = archive_block(
1112 &mut archiver,
1113 segment_headers_store.clone(),
1114 &*client,
1115 &sync_oracle,
1116 subspace_link.object_mapping_notification_sender.clone(),
1117 subspace_link.archived_segment_notification_sender.clone(),
1118 best_archived_block_hash,
1119 block_number_to_archive,
1120 create_object_mappings,
1121 )
1122 .await?;
1123
1124 let max_segment_index = segment_headers_store.max_segment_index();
1125 if max_segment_index_before != max_segment_index {
1126 let maybe_block_number_to_finalize = max_segment_index
1127 .and_then(|max_segment_index| {
1129 max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
1130 })
1131 .and_then(|segment_index| {
1132 segment_headers_store.get_segment_header(segment_index)
1133 })
1134 .map(|segment_header| segment_header.last_archived_block().number)
1135 .map(|block_number| block_number_to_archive.min(block_number.into()))
1138 .filter(|block_number| *block_number > client.info().finalized_number);
1140
1141 if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
1142 {
1143 let mut import_notification = client.every_import_notification_stream();
1144
1145 drop(block_importing_notification);
1148
1149 while let Some(notification) = import_notification.next().await {
1150 if notification.header.number() == &importing_block_number {
1152 break;
1153 }
1154 }
1155 }
1156
1157 if let Some(block_hash_to_finalize) =
1160 client.block_hash(block_number_to_finalize)?
1161 {
1162 finalize_block(
1163 &*client,
1164 &*backend,
1165 telemetry.as_ref(),
1166 block_hash_to_finalize,
1167 block_number_to_finalize,
1168 );
1169 }
1170 }
1171 }
1172 }
1173
1174 Ok(())
1175 })
1176}
1177
1178#[allow(clippy::too_many_arguments)]
1180async fn archive_block<Block, Backend, Client, AS, SO>(
1181 archiver: &mut Archiver,
1182 segment_headers_store: SegmentHeadersStore<AS>,
1183 client: &Client,
1184 sync_oracle: &SubspaceSyncOracle<SO>,
1185 object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
1186 archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
1187 best_archived_block_hash: Block::Hash,
1188 block_number_to_archive: NumberFor<Block>,
1189 create_object_mappings: CreateObjectMappings,
1190) -> sp_blockchain::Result<(Block::Hash, NumberFor<Block>)>
1191where
1192 Block: BlockT,
1193 Backend: BackendT<Block>,
1194 Client: ProvideRuntimeApi<Block>
1195 + BlockBackend<Block>
1196 + HeaderBackend<Block>
1197 + LockImportRun<Block, Backend>
1198 + Finalizer<Block, Backend>
1199 + AuxStore
1200 + Send
1201 + Sync
1202 + 'static,
1203 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
1204 AS: AuxStore + Send + Sync + 'static,
1205 SO: SyncOracle + Send + Sync + 'static,
1206{
1207 let block = client
1208 .block(
1209 client
1210 .block_hash(block_number_to_archive)?
1211 .expect("Older block by number must always exist"),
1212 )?
1213 .expect("Older block by number must always exist");
1214
1215 let parent_block_hash = *block.block.header().parent_hash();
1216 let block_hash_to_archive = block.block.hash();
1217
1218 debug!(
1219 "Archiving block {:?} ({})",
1220 block_number_to_archive, block_hash_to_archive
1221 );
1222
1223 if parent_block_hash != best_archived_block_hash {
1224 let error = format!(
1225 "Attempt to switch to a different fork beyond archiving depth, \
1226 can't do it: parent block hash {parent_block_hash}, best archived block hash {best_archived_block_hash}"
1227 );
1228 return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1229 error.into(),
1230 )));
1231 }
1232
1233 let create_mappings = create_object_mappings.is_enabled_for_block(
1234 block_number_to_archive.try_into().unwrap_or_else(|_| {
1235 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed")
1236 }),
1237 );
1238
1239 let block_object_mappings = if create_mappings {
1240 client
1241 .runtime_api()
1242 .extract_block_object_mapping(parent_block_hash, block.block.clone())
1243 .map_err(|error| {
1244 sp_blockchain::Error::Application(
1245 format!("Failed to retrieve block object mappings: {error}").into(),
1246 )
1247 })?
1248 } else {
1249 BlockObjectMapping::default()
1250 };
1251
1252 let encoded_block = encode_block(block);
1253 debug!(
1254 "Encoded block {} has size of {}",
1255 block_number_to_archive,
1256 bytesize::to_string(encoded_block.len() as u64, true),
1257 );
1258
1259 let block_outcome = archiver.add_block(
1260 encoded_block,
1261 block_object_mappings,
1262 !sync_oracle.is_major_syncing(),
1263 );
1264 send_object_mapping_notification(
1265 &object_mapping_notification_sender,
1266 block_outcome.object_mapping,
1267 block_number_to_archive,
1268 );
1269 for archived_segment in block_outcome.archived_segments {
1270 let segment_header = archived_segment.segment_header;
1271
1272 segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
1273
1274 send_archived_segment_notification(&archived_segment_notification_sender, archived_segment)
1275 .await;
1276 }
1277
1278 Ok((block_hash_to_archive, block_number_to_archive))
1279}
1280
1281fn send_object_mapping_notification<BlockNum>(
1282 object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
1283 object_mapping: Vec<GlobalObject>,
1284 block_number: BlockNum,
1285) where
1286 BlockNum: BlockNumberT,
1287{
1288 if object_mapping.is_empty() {
1289 return;
1290 }
1291
1292 let block_number = TryInto::<BlockNumber>::try_into(block_number).unwrap_or_else(|_| {
1293 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
1294 });
1295
1296 let object_mapping_notification = ObjectMappingNotification {
1297 object_mapping,
1298 block_number,
1299 };
1300
1301 object_mapping_notification_sender.notify(move || object_mapping_notification);
1302}
1303
1304async fn send_archived_segment_notification(
1305 archived_segment_notification_sender: &SubspaceNotificationSender<ArchivedSegmentNotification>,
1306 archived_segment: NewArchivedSegment,
1307) {
1308 let segment_index = archived_segment.segment_header.segment_index();
1309 let (acknowledgement_sender, mut acknowledgement_receiver) =
1310 tracing_unbounded::<()>("subspace_acknowledgement", 1000);
1311 let archived_segment = Arc::new(archived_segment);
1314 let archived_segment_notification = ArchivedSegmentNotification {
1315 archived_segment: Arc::clone(&archived_segment),
1316 acknowledgement_sender,
1317 };
1318
1319 archived_segment_notification_sender.notify(move || archived_segment_notification);
1320
1321 let wait_fut = async {
1322 while acknowledgement_receiver.next().await.is_some() {
1323 debug!(
1324 "Archived segment notification acknowledged: {}",
1325 segment_index
1326 );
1327 }
1328 };
1329
1330 if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
1331 .await
1332 .is_err()
1333 {
1334 warn!(
1335 "Archived segment notification was not acknowledged and reached timeout, continue \
1336 regardless"
1337 );
1338 }
1339}