1#[cfg(test)]
34mod tests;
35
36use crate::slot_worker::SubspaceSyncOracle;
37use crate::{SubspaceLink, SubspaceNotificationSender};
38use futures::StreamExt;
39use parity_scale_codec::{Decode, Encode};
40use parking_lot::RwLock;
41use rand::prelude::*;
42use rand_chacha::ChaCha8Rng;
43use rayon::ThreadPoolBuilder;
44use rayon::prelude::*;
45use sc_client_api::{
46 AuxStore, Backend as BackendT, BlockBackend, BlockImportOperation, BlockchainEvents, Finalizer,
47 LockImportRun,
48};
49use sc_telemetry::{CONSENSUS_INFO, TelemetryHandle, telemetry};
50use sc_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
51use sp_api::ProvideRuntimeApi;
52use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
53use sp_consensus::SyncOracle;
54use sp_consensus_subspace::{SubspaceApi, SubspaceJustification};
55use sp_objects::ObjectsApi;
56use sp_runtime::Justifications;
57use sp_runtime::generic::SignedBlock;
58use sp_runtime::traits::{
59 Block as BlockT, BlockNumber as BlockNumberT, CheckedSub, Header, NumberFor, One, Zero,
60};
61use std::error::Error;
62use std::future::Future;
63use std::num::NonZeroU32;
64use std::slice;
65use std::sync::Arc;
66use std::sync::atomic::{AtomicU16, Ordering};
67use std::time::Duration;
68use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
69use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject};
70use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
71use subspace_core_primitives::{BlockNumber, PublicKey};
72use subspace_erasure_coding::ErasureCoding;
73use subspace_kzg::Kzg;
74use tracing::{debug, info, trace, warn};
75
76const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 6;
79const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
81
82pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
91
92#[derive(Debug)]
93struct SegmentHeadersStoreInner<AS> {
94 aux_store: Arc<AS>,
95 next_key_index: AtomicU16,
96 cache: RwLock<Vec<SegmentHeader>>,
98}
99
100#[derive(Debug)]
111pub struct SegmentHeadersStore<AS> {
112 inner: Arc<SegmentHeadersStoreInner<AS>>,
113 confirmation_depth_k: BlockNumber,
114}
115
116impl<AS> Clone for SegmentHeadersStore<AS> {
117 fn clone(&self) -> Self {
118 Self {
119 inner: Arc::clone(&self.inner),
120 confirmation_depth_k: self.confirmation_depth_k,
121 }
122 }
123}
124
125impl<AS> SegmentHeadersStore<AS>
126where
127 AS: AuxStore,
128{
129 const KEY_PREFIX: &'static [u8] = b"segment-headers";
130 const INITIAL_CACHE_CAPACITY: usize = 1_000;
131
132 pub fn new(
134 aux_store: Arc<AS>,
135 confirmation_depth_k: BlockNumber,
136 ) -> sp_blockchain::Result<Self> {
137 let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
138
139 debug!("Started loading segment headers into cache");
140 let mut next_key_index = 0;
144 while let Some(segment_headers) =
145 aux_store
146 .get_aux(&Self::key(next_key_index))?
147 .map(|segment_header| {
148 Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
149 .expect("Always correct segment header unless DB is corrupted; qed")
150 })
151 {
152 cache.extend(segment_headers);
153 next_key_index += 1;
154 }
155 debug!("Finished loading segment headers into cache");
156
157 Ok(Self {
158 inner: Arc::new(SegmentHeadersStoreInner {
159 aux_store,
160 next_key_index: AtomicU16::new(next_key_index),
161 cache: RwLock::new(cache),
162 }),
163 confirmation_depth_k,
164 })
165 }
166
167 pub fn last_segment_header(&self) -> Option<SegmentHeader> {
169 self.inner.cache.read().last().cloned()
170 }
171
172 pub fn max_segment_index(&self) -> Option<SegmentIndex> {
174 let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
175 Some(SegmentIndex::from(segment_index))
176 }
177
178 pub fn add_segment_headers(
182 &self,
183 segment_headers: &[SegmentHeader],
184 ) -> sp_blockchain::Result<()> {
185 let mut maybe_last_segment_index = self.max_segment_index();
186 let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
187 for segment_header in segment_headers {
190 let segment_index = segment_header.segment_index();
191 match maybe_last_segment_index {
192 Some(last_segment_index) => {
193 if segment_index <= last_segment_index {
194 continue;
196 }
197
198 if segment_index != last_segment_index + SegmentIndex::ONE {
199 let error = format!(
200 "Segment index {segment_index} must strictly follow {last_segment_index}, can't store segment header"
201 );
202 return Err(sp_blockchain::Error::Application(error.into()));
203 }
204
205 segment_headers_to_store.push(segment_header);
206 maybe_last_segment_index.replace(segment_index);
207 }
208 None => {
209 if segment_index != SegmentIndex::ZERO {
210 let error = format!(
211 "First segment header index must be zero, found index {segment_index}"
212 );
213 return Err(sp_blockchain::Error::Application(error.into()));
214 }
215
216 segment_headers_to_store.push(segment_header);
217 maybe_last_segment_index.replace(segment_index);
218 }
219 }
220 }
221
222 if segment_headers_to_store.is_empty() {
223 return Ok(());
224 }
225
226 {
230 let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
231 let key = Self::key(key_index);
232 let value = segment_headers_to_store.encode();
233 let insert_data = vec![(key.as_slice(), value.as_slice())];
234
235 self.inner.aux_store.insert_aux(&insert_data, &[])?;
236 }
237 self.inner.cache.write().extend(segment_headers_to_store);
238
239 Ok(())
240 }
241
242 pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
244 self.inner
245 .cache
246 .read()
247 .get(u64::from(segment_index) as usize)
248 .copied()
249 }
250
251 fn key(key_index: u16) -> Vec<u8> {
252 (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
253 }
254
255 pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
257 let Some(last_segment_index) = self.max_segment_index() else {
258 return Vec::new();
260 };
261
262 if block_number == 1 {
264 return vec![
267 self.get_segment_header(SegmentIndex::ZERO)
268 .expect("Segment headers are stored in monotonically increasing order; qed"),
269 ];
270 }
271
272 if last_segment_index == SegmentIndex::ZERO {
273 return Vec::new();
275 }
276
277 let mut current_segment_index = last_segment_index;
278 loop {
279 let current_segment_header = self
282 .get_segment_header(current_segment_index)
283 .expect("Segment headers are stored in monotonically increasing order; qed");
284
285 let target_block_number =
287 current_segment_header.last_archived_block().number + 1 + self.confirmation_depth_k;
288 if target_block_number == block_number {
289 let mut headers_for_block = vec![current_segment_header];
290
291 let last_archived_block_number =
293 current_segment_header.last_archived_block().number;
294 let mut segment_index = current_segment_index - SegmentIndex::ONE;
295
296 while let Some(segment_header) = self.get_segment_header(segment_index) {
297 if segment_header.last_archived_block().number == last_archived_block_number {
298 headers_for_block.insert(0, segment_header);
299 segment_index -= SegmentIndex::ONE;
300 } else {
301 break;
302 }
303 }
304
305 return headers_for_block;
306 }
307
308 if target_block_number > block_number {
310 if current_segment_index > SegmentIndex::ONE {
312 current_segment_index -= SegmentIndex::ONE
313 } else {
314 break;
315 }
316 } else {
317 return Vec::new();
319 }
320 }
321
322 Vec::new()
324 }
325}
326
327#[derive(Debug, Clone)]
329pub struct ArchivedSegmentNotification {
330 pub archived_segment: Arc<NewArchivedSegment>,
332 pub acknowledgement_sender: TracingUnboundedSender<()>,
336}
337
338#[derive(Debug, Clone)]
341pub struct ObjectMappingNotification {
342 pub object_mapping: Vec<GlobalObject>,
346 pub block_number: BlockNumber,
348 }
350
351#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
353pub enum CreateObjectMappings {
354 Block(NonZeroU32),
363
364 Yes,
366
367 #[default]
369 No,
370}
371
372impl CreateObjectMappings {
373 fn block(&self) -> Option<BlockNumber> {
376 match self {
377 CreateObjectMappings::Block(block) => Some(block.get()),
378 CreateObjectMappings::Yes => None,
379 CreateObjectMappings::No => None,
380 }
381 }
382
383 pub fn is_enabled(&self) -> bool {
385 !matches!(self, CreateObjectMappings::No)
386 }
387
388 pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
390 if !self.is_enabled() {
391 return false;
392 }
393
394 if let Some(target_block) = self.block() {
395 return block >= target_block;
396 }
397
398 true
400 }
401}
402
403fn find_last_archived_block<Block, Client, AS>(
404 client: &Client,
405 segment_headers_store: &SegmentHeadersStore<AS>,
406 best_block_to_archive: NumberFor<Block>,
407 create_object_mappings: bool,
408) -> sp_blockchain::Result<Option<(SegmentHeader, SignedBlock<Block>, BlockObjectMapping)>>
409where
410 Block: BlockT,
411 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
412 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
413 AS: AuxStore,
414{
415 let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
416 return Ok(None);
417 };
418
419 if max_segment_index == SegmentIndex::ZERO {
420 return Ok(None);
422 }
423
424 for segment_header in (SegmentIndex::ZERO..=max_segment_index)
425 .rev()
426 .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
427 {
428 let last_archived_block_number = segment_header.last_archived_block().number;
429
430 if NumberFor::<Block>::from(last_archived_block_number) > best_block_to_archive {
431 continue;
435 }
436 let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
437 continue;
440 };
441
442 let Some(last_archived_block) = client.block(last_archived_block_hash)? else {
443 continue;
445 };
446
447 let block_object_mappings = if create_object_mappings {
449 client
450 .runtime_api()
451 .extract_block_object_mapping(
452 *last_archived_block.block.header().parent_hash(),
453 last_archived_block.block.clone(),
454 )
455 .unwrap_or_default()
456 } else {
457 BlockObjectMapping::default()
458 };
459
460 return Ok(Some((
461 segment_header,
462 last_archived_block,
463 block_object_mappings,
464 )));
465 }
466
467 Ok(None)
468}
469
470pub fn recreate_genesis_segment<Block, Client>(
472 client: &Client,
473 kzg: Kzg,
474 erasure_coding: ErasureCoding,
475) -> Result<Option<NewArchivedSegment>, Box<dyn Error>>
476where
477 Block: BlockT,
478 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
479 Client::Api: ObjectsApi<Block>,
480{
481 let genesis_hash = client.info().genesis_hash;
482 let Some(signed_block) = client.block(genesis_hash)? else {
483 return Ok(None);
484 };
485
486 let encoded_block = encode_block(signed_block);
487
488 let block_outcome = Archiver::new(kzg, erasure_coding).add_block(
490 encoded_block,
491 BlockObjectMapping::default(),
492 false,
493 );
494 let new_archived_segment = block_outcome
495 .archived_segments
496 .into_iter()
497 .next()
498 .expect("Genesis block always results in exactly one archived segment; qed");
499
500 Ok(Some(new_archived_segment))
501}
502
503struct InitializedArchiver<Block>
504where
505 Block: BlockT,
506{
507 archiver: Archiver,
508 best_archived_block: (Block::Hash, NumberFor<Block>),
509}
510
511pub fn encode_block<Block>(mut signed_block: SignedBlock<Block>) -> Vec<u8>
516where
517 Block: BlockT,
518{
519 if signed_block.block.header().number().is_zero() {
520 let mut encoded_block = signed_block.encode();
521
522 let encoded_block_length = encoded_block.len();
523
524 encoded_block.resize(RecordedHistorySegment::SIZE, 0);
531 let mut rng = ChaCha8Rng::from_seed(
532 signed_block
533 .block
534 .header()
535 .state_root()
536 .as_ref()
537 .try_into()
538 .expect("State root in Subspace must be 32 bytes, panic otherwise; qed"),
539 );
540 rng.fill(&mut encoded_block[encoded_block_length..]);
541
542 encoded_block
543 } else {
544 if let Some(justifications) = signed_block.justifications.take() {
546 let mut filtered_justifications = justifications.into_iter().filter(|justification| {
547 let Some(subspace_justification) =
549 SubspaceJustification::try_from_justification(justification)
550 .and_then(|subspace_justification| subspace_justification.ok())
551 else {
552 return false;
553 };
554
555 subspace_justification.must_be_archived()
556 });
557
558 if let Some(first_justification) = filtered_justifications.next() {
559 let mut justifications = Justifications::from(first_justification);
560
561 for justification in filtered_justifications {
562 justifications.append(justification);
563 }
564
565 signed_block.justifications = Some(justifications);
566 }
567 }
568
569 signed_block.encode()
570 }
571}
572
573pub fn decode_block<Block>(
575 mut encoded_block: &[u8],
576) -> Result<SignedBlock<Block>, parity_scale_codec::Error>
577where
578 Block: BlockT,
579{
580 SignedBlock::<Block>::decode(&mut encoded_block)
581}
582
583fn initialize_archiver<Block, Client, AS>(
584 segment_headers_store: &SegmentHeadersStore<AS>,
585 subspace_link: &SubspaceLink<Block>,
586 client: &Client,
587 create_object_mappings: CreateObjectMappings,
588) -> sp_blockchain::Result<InitializedArchiver<Block>>
589where
590 Block: BlockT,
591 Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
592 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
593 AS: AuxStore,
594{
595 let client_info = client.info();
596 let best_block_number = TryInto::<BlockNumber>::try_into(client_info.best_number)
597 .unwrap_or_else(|_| {
598 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
599 });
600
601 let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k();
602
603 let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
604 if let Some(block_number) = create_object_mappings.block() {
609 best_block_to_archive = best_block_to_archive.min(block_number);
612 }
613
614 if (best_block_to_archive..best_block_number)
615 .any(|block_number| client.hash(block_number.into()).ok().flatten().is_none())
616 {
617 best_block_to_archive = best_block_number;
621 }
622
623 if create_object_mappings.is_enabled() && best_block_to_archive >= 1 {
627 let Some(best_block_to_archive_hash) = client.hash(best_block_to_archive.into())? else {
628 let error = format!(
629 "Missing hash for mapping block {best_block_to_archive}, \
630 try a higher block number, or wipe your node and restart with `--sync full`"
631 );
632 return Err(sp_blockchain::Error::Application(error.into()));
633 };
634
635 let Some(best_block_data) = client.block(best_block_to_archive_hash)? else {
636 let error = format!(
637 "Missing data for mapping block {best_block_to_archive} \
638 hash {best_block_to_archive_hash}, \
639 try a higher block number, or wipe your node and restart with `--sync full`"
640 );
641 return Err(sp_blockchain::Error::Application(error.into()));
642 };
643
644 client
646 .runtime_api()
647 .extract_block_object_mapping(
648 *best_block_data.block.header().parent_hash(),
649 best_block_data.block.clone(),
650 )
651 .map_err(|error| {
652 sp_blockchain::Error::Application(
653 format!(
654 "Missing state for mapping block {best_block_to_archive} \
655 hash {best_block_to_archive_hash}: {error}, \
656 try a higher block number, or wipe your node and restart with `--sync full`"
657 )
658 .into(),
659 )
660 })?;
661 }
662
663 let maybe_last_archived_block = find_last_archived_block(
664 client,
665 segment_headers_store,
666 best_block_to_archive.into(),
667 create_object_mappings.is_enabled(),
668 )?;
669
670 let have_last_segment_header = maybe_last_archived_block.is_some();
671 let mut best_archived_block = None;
672
673 let mut archiver =
674 if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
675 maybe_last_archived_block
676 {
677 let last_archived_block_number = last_segment_header.last_archived_block().number;
679 info!(
680 %last_archived_block_number,
681 "Resuming archiver from last archived block",
682 );
683
684 best_archived_block.replace((
687 last_archived_block.block.hash(),
688 *last_archived_block.block.header().number(),
689 ));
690
691 let last_archived_block_encoded = encode_block(last_archived_block);
692
693 Archiver::with_initial_state(
694 subspace_link.kzg().clone(),
695 subspace_link.erasure_coding().clone(),
696 last_segment_header,
697 &last_archived_block_encoded,
698 block_object_mappings,
699 )
700 .map_err(|error| {
701 sp_blockchain::Error::Application(
702 format!("Incorrect parameters for archiver: {error:?} {last_segment_header:?}")
703 .into(),
704 )
705 })?
706 } else {
707 info!("Starting archiving from genesis");
708
709 Archiver::new(
710 subspace_link.kzg().clone(),
711 subspace_link.erasure_coding().clone(),
712 )
713 };
714
715 {
717 let blocks_to_archive_from = archiver
718 .last_archived_block_number()
719 .map(|n| n + 1)
720 .unwrap_or_default();
721 let blocks_to_archive_to = best_block_number
722 .checked_sub(confirmation_depth_k)
723 .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
724 .or({
725 if have_last_segment_header {
726 None
727 } else {
728 Some(0)
730 }
731 });
732
733 if let Some(blocks_to_archive_to) = blocks_to_archive_to {
734 info!(
735 "Archiving already produced blocks {}..={}",
736 blocks_to_archive_from, blocks_to_archive_to,
737 );
738
739 let thread_pool = ThreadPoolBuilder::new()
740 .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
741 .build()
742 .map_err(|error| {
743 sp_blockchain::Error::Backend(format!(
744 "Failed to create thread pool for archiver initialization: {error}"
745 ))
746 })?;
747 let blocks_to_archive = thread_pool.install(|| {
749 (blocks_to_archive_from..=blocks_to_archive_to)
750 .into_par_iter()
751 .map_init(
752 || client.runtime_api(),
753 |runtime_api, block_number| {
754 let block_hash = client
755 .hash(block_number.into())?
756 .expect("All blocks since last archived must be present; qed");
757
758 let block = client
759 .block(block_hash)?
760 .expect("All blocks since last archived must be present; qed");
761
762 let block_object_mappings =
763 if create_object_mappings.is_enabled_for_block(block_number) {
764 runtime_api
765 .extract_block_object_mapping(
766 *block.block.header().parent_hash(),
767 block.block.clone(),
768 )
769 .unwrap_or_default()
770 } else {
771 BlockObjectMapping::default()
772 };
773
774 Ok((block, block_object_mappings))
775 },
776 )
777 .collect::<sp_blockchain::Result<Vec<(SignedBlock<_>, _)>>>()
778 })?;
779
780 best_archived_block =
781 blocks_to_archive
782 .last()
783 .map(|(block, _block_object_mappings)| {
784 (block.block.hash(), *block.block.header().number())
785 });
786
787 for (signed_block, block_object_mappings) in blocks_to_archive {
788 let block_number_to_archive = *signed_block.block.header().number();
789 let encoded_block = encode_block(signed_block);
790
791 debug!(
792 "Encoded block {} has size of {}",
793 block_number_to_archive,
794 bytesize::ByteSize::b(encoded_block.len() as u64)
795 .display()
796 .iec(),
797 );
798
799 let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false);
800 send_object_mapping_notification(
801 &subspace_link.object_mapping_notification_sender,
802 block_outcome.object_mapping,
803 block_number_to_archive,
804 );
805 let new_segment_headers: Vec<SegmentHeader> = block_outcome
806 .archived_segments
807 .iter()
808 .map(|archived_segment| archived_segment.segment_header)
809 .collect();
810
811 if !new_segment_headers.is_empty() {
812 segment_headers_store.add_segment_headers(&new_segment_headers)?;
813 }
814 }
815 }
816 }
817
818 Ok(InitializedArchiver {
819 archiver,
820 best_archived_block: best_archived_block
821 .expect("Must always set if there is no logical error; qed"),
822 })
823}
824
825fn finalize_block<Block, Backend, Client>(
826 client: &Client,
827 backend: &Backend,
828 telemetry: Option<&TelemetryHandle>,
829 hash: Block::Hash,
830 number: NumberFor<Block>,
831) where
832 Block: BlockT,
833 Backend: BackendT<Block>,
834 Client: LockImportRun<Block, Backend> + Finalizer<Block, Backend> + HeaderBackend<Block>,
835{
836 if number.is_zero() {
837 return;
839 }
840
841 let displaced = match client.header(hash) {
842 Ok(Some(header)) => {
843 let parent_hash = *header.parent_hash();
844 match backend
845 .blockchain()
846 .displaced_leaves_after_finalizing(hash, number, parent_hash)
847 {
848 Ok(d) => Some(d),
849 Err(error) => {
850 warn!(
851 ?error,
852 ?hash,
853 ?parent_hash,
854 ?number,
855 "Failed to compute displaced leaves; their block_weight entries will leak"
856 );
857 None
858 }
859 }
860 }
861 Ok(None) => {
862 warn!(
863 ?hash,
864 ?number,
865 "Header missing for finalized hash; skipping displaced-fork cleanup"
866 );
867 None
868 }
869 Err(error) => {
870 warn!(
871 ?error,
872 ?hash,
873 ?number,
874 "Failed to load header for finalized hash; skipping displaced-fork cleanup"
875 );
876 None
877 }
878 };
879
880 let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
883 client
886 .apply_finality(import_op, hash, None, true)
887 .map_err(|error| {
888 warn!(
889 "Error applying finality to block {:?}: {}",
890 (hash, number),
891 error
892 );
893 error
894 })?;
895
896 if let Some(d) = &displaced
897 && !d.displaced_blocks.is_empty()
898 {
899 let count = d.displaced_blocks.len();
900 let tombstones = d
901 .displaced_blocks
902 .iter()
903 .map(|h| (crate::aux_schema::block_weight_key(h), None));
904 import_op.op.insert_aux(tombstones).map_err(|error| {
905 warn!(
906 ?error,
907 count, "Failed to queue displaced fork tombstones; rolling back finalization",
908 );
909 error
910 })?;
911 }
912
913 debug!("Finalizing blocks up to ({:?}, {})", number, hash);
914
915 telemetry!(
916 telemetry;
917 CONSENSUS_INFO;
918 "subspace.finalized_blocks_up_to";
919 "number" => ?number, "hash" => ?hash,
920 );
921
922 Ok(())
923 });
924}
925
926pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
956 segment_headers_store: SegmentHeadersStore<AS>,
957 subspace_link: SubspaceLink<Block>,
958 client: Arc<Client>,
959 backend: Arc<Backend>,
960 sync_oracle: SubspaceSyncOracle<SO>,
961 telemetry: Option<TelemetryHandle>,
962 create_object_mappings: CreateObjectMappings,
963) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
964where
965 Block: BlockT,
966 Backend: BackendT<Block> + 'static,
967 Client: ProvideRuntimeApi<Block>
968 + BlockBackend<Block>
969 + HeaderBackend<Block>
970 + LockImportRun<Block, Backend>
971 + Finalizer<Block, Backend>
972 + BlockchainEvents<Block>
973 + AuxStore
974 + Send
975 + Sync
976 + 'static,
977 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
978 AS: AuxStore + Send + Sync + 'static,
979 SO: SyncOracle + Send + Sync + 'static,
980{
981 if create_object_mappings.is_enabled() {
982 info!(
983 ?create_object_mappings,
984 "Creating object mappings from the configured block onwards"
985 );
986 } else {
987 info!("Not creating object mappings");
988 }
989
990 let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
991 Some(initialize_archiver(
992 &segment_headers_store,
993 &subspace_link,
994 client.as_ref(),
995 create_object_mappings,
996 )?)
997 } else {
998 None
999 };
1000
1001 let mut block_importing_notification_stream = subspace_link
1003 .block_importing_notification_stream
1004 .subscribe();
1005
1006 Ok(async move {
1007 let archiver = match maybe_archiver {
1008 Some(archiver) => archiver,
1009 None => initialize_archiver(
1010 &segment_headers_store,
1011 &subspace_link,
1012 client.as_ref(),
1013 create_object_mappings,
1014 )?,
1015 };
1016 let confirmation_depth_k = subspace_link.chain_constants.confirmation_depth_k().into();
1017
1018 let InitializedArchiver {
1019 mut archiver,
1020 best_archived_block,
1021 } = archiver;
1022 let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block;
1023
1024 while let Some(block_importing_notification) =
1025 block_importing_notification_stream.next().await
1026 {
1027 let importing_block_number = block_importing_notification.block_number;
1028 let block_number_to_archive =
1029 match importing_block_number.checked_sub(&confirmation_depth_k) {
1030 Some(block_number_to_archive) => block_number_to_archive,
1031 None => {
1032 continue;
1034 }
1035 };
1036
1037 let last_archived_block_number = segment_headers_store
1038 .last_segment_header()
1039 .expect("Exists after archiver initialization; qed")
1040 .last_archived_block()
1041 .number;
1042 let create_mappings =
1043 create_object_mappings.is_enabled_for_block(last_archived_block_number);
1044 let last_archived_block_number = NumberFor::<Block>::from(last_archived_block_number);
1045 trace!(
1046 %importing_block_number,
1047 %block_number_to_archive,
1048 %best_archived_block_number,
1049 %last_archived_block_number,
1050 "Checking if block needs to be skipped"
1051 );
1052
1053 let skip_last_archived_blocks =
1055 last_archived_block_number > block_number_to_archive && !create_mappings;
1056 if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
1057 debug!(
1059 %importing_block_number,
1060 %block_number_to_archive,
1061 %best_archived_block_number,
1062 %last_archived_block_number,
1063 "Skipping already archived block",
1064 );
1065 continue;
1066 }
1067
1068 if best_archived_block_number + One::one() != block_number_to_archive {
1073 InitializedArchiver {
1074 archiver,
1075 best_archived_block: (best_archived_block_hash, best_archived_block_number),
1076 } = initialize_archiver(
1077 &segment_headers_store,
1078 &subspace_link,
1079 client.as_ref(),
1080 create_object_mappings,
1081 )?;
1082
1083 if best_archived_block_number + One::one() == block_number_to_archive {
1084 } else if best_archived_block_number >= block_number_to_archive {
1086 continue;
1089 } else if client
1090 .block_hash(importing_block_number - One::one())?
1091 .is_none()
1092 {
1093 continue;
1098 } else {
1099 let error = format!(
1100 "There was a gap in blockchain history and the last contiguous series of \
1101 blocks starting with doesn't start with archived segment (best archived \
1102 block number {best_archived_block_number}, block number to archive \
1103 {block_number_to_archive}), block about to be imported \
1104 {importing_block_number}), archiver can't continue",
1105 );
1106 return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1107 error.into(),
1108 )));
1109 }
1110 }
1111
1112 let max_segment_index_before = segment_headers_store.max_segment_index();
1113 (best_archived_block_hash, best_archived_block_number) = archive_block(
1114 &mut archiver,
1115 segment_headers_store.clone(),
1116 &*client,
1117 &sync_oracle,
1118 subspace_link.object_mapping_notification_sender.clone(),
1119 subspace_link.archived_segment_notification_sender.clone(),
1120 best_archived_block_hash,
1121 block_number_to_archive,
1122 create_object_mappings,
1123 )
1124 .await?;
1125
1126 let max_segment_index = segment_headers_store.max_segment_index();
1127 if max_segment_index_before != max_segment_index {
1128 let maybe_block_number_to_finalize = max_segment_index
1129 .and_then(|max_segment_index| {
1131 max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
1132 })
1133 .and_then(|segment_index| {
1134 segment_headers_store.get_segment_header(segment_index)
1135 })
1136 .map(|segment_header| segment_header.last_archived_block().number)
1137 .map(|block_number| block_number_to_archive.min(block_number.into()))
1140 .filter(|block_number| *block_number > client.info().finalized_number);
1142
1143 if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
1144 {
1145 let mut import_notification = client.every_import_notification_stream();
1146
1147 drop(block_importing_notification);
1150
1151 while let Some(notification) = import_notification.next().await {
1152 if notification.header.number() == &importing_block_number {
1154 break;
1155 }
1156 }
1157 }
1158
1159 if let Some(block_hash_to_finalize) =
1162 client.block_hash(block_number_to_finalize)?
1163 {
1164 finalize_block(
1165 &*client,
1166 &*backend,
1167 telemetry.as_ref(),
1168 block_hash_to_finalize,
1169 block_number_to_finalize,
1170 );
1171 }
1172 }
1173 }
1174 }
1175
1176 Ok(())
1177 })
1178}
1179
1180#[allow(clippy::too_many_arguments)]
1182async fn archive_block<Block, Backend, Client, AS, SO>(
1183 archiver: &mut Archiver,
1184 segment_headers_store: SegmentHeadersStore<AS>,
1185 client: &Client,
1186 sync_oracle: &SubspaceSyncOracle<SO>,
1187 object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
1188 archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
1189 best_archived_block_hash: Block::Hash,
1190 block_number_to_archive: NumberFor<Block>,
1191 create_object_mappings: CreateObjectMappings,
1192) -> sp_blockchain::Result<(Block::Hash, NumberFor<Block>)>
1193where
1194 Block: BlockT,
1195 Backend: BackendT<Block>,
1196 Client: ProvideRuntimeApi<Block>
1197 + BlockBackend<Block>
1198 + HeaderBackend<Block>
1199 + LockImportRun<Block, Backend>
1200 + Finalizer<Block, Backend>
1201 + AuxStore
1202 + Send
1203 + Sync
1204 + 'static,
1205 Client::Api: SubspaceApi<Block, PublicKey> + ObjectsApi<Block>,
1206 AS: AuxStore + Send + Sync + 'static,
1207 SO: SyncOracle + Send + Sync + 'static,
1208{
1209 let block = client
1210 .block(
1211 client
1212 .block_hash(block_number_to_archive)?
1213 .expect("Older block by number must always exist"),
1214 )?
1215 .expect("Older block by number must always exist");
1216
1217 let parent_block_hash = *block.block.header().parent_hash();
1218 let block_hash_to_archive = block.block.hash();
1219
1220 debug!(
1221 "Archiving block {:?} ({})",
1222 block_number_to_archive, block_hash_to_archive
1223 );
1224
1225 if parent_block_hash != best_archived_block_hash {
1226 let error = format!(
1227 "Attempt to switch to a different fork beyond archiving depth, \
1228 can't do it: parent block hash {parent_block_hash}, best archived block hash {best_archived_block_hash}"
1229 );
1230 return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
1231 error.into(),
1232 )));
1233 }
1234
1235 let create_mappings = create_object_mappings.is_enabled_for_block(
1236 block_number_to_archive.try_into().unwrap_or_else(|_| {
1237 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed")
1238 }),
1239 );
1240
1241 let block_object_mappings = if create_mappings {
1242 client
1243 .runtime_api()
1244 .extract_block_object_mapping(parent_block_hash, block.block.clone())
1245 .map_err(|error| {
1246 sp_blockchain::Error::Application(
1247 format!("Failed to retrieve block object mappings: {error}").into(),
1248 )
1249 })?
1250 } else {
1251 BlockObjectMapping::default()
1252 };
1253
1254 let encoded_block = encode_block(block);
1255 debug!(
1256 "Encoded block {} has size of {}",
1257 block_number_to_archive,
1258 bytesize::ByteSize::b(encoded_block.len() as u64)
1259 .display()
1260 .iec(),
1261 );
1262
1263 let block_outcome = archiver.add_block(
1264 encoded_block,
1265 block_object_mappings,
1266 !sync_oracle.is_major_syncing(),
1267 );
1268 send_object_mapping_notification(
1269 &object_mapping_notification_sender,
1270 block_outcome.object_mapping,
1271 block_number_to_archive,
1272 );
1273 for archived_segment in block_outcome.archived_segments {
1274 let segment_header = archived_segment.segment_header;
1275
1276 segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
1277
1278 send_archived_segment_notification(&archived_segment_notification_sender, archived_segment)
1279 .await;
1280 }
1281
1282 Ok((block_hash_to_archive, block_number_to_archive))
1283}
1284
1285fn send_object_mapping_notification<BlockNum>(
1286 object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
1287 object_mapping: Vec<GlobalObject>,
1288 block_number: BlockNum,
1289) where
1290 BlockNum: BlockNumberT,
1291{
1292 if object_mapping.is_empty() {
1293 return;
1294 }
1295
1296 let block_number = TryInto::<BlockNumber>::try_into(block_number).unwrap_or_else(|_| {
1297 unreachable!("sp_runtime::BlockNumber fits into subspace_primitives::BlockNumber; qed");
1298 });
1299
1300 let object_mapping_notification = ObjectMappingNotification {
1301 object_mapping,
1302 block_number,
1303 };
1304
1305 object_mapping_notification_sender.notify(move || object_mapping_notification);
1306}
1307
1308async fn send_archived_segment_notification(
1309 archived_segment_notification_sender: &SubspaceNotificationSender<ArchivedSegmentNotification>,
1310 archived_segment: NewArchivedSegment,
1311) {
1312 let segment_index = archived_segment.segment_header.segment_index();
1313 let (acknowledgement_sender, mut acknowledgement_receiver) =
1314 tracing_unbounded::<()>("subspace_acknowledgement", 1000);
1315 let archived_segment = Arc::new(archived_segment);
1318 let archived_segment_notification = ArchivedSegmentNotification {
1319 archived_segment: Arc::clone(&archived_segment),
1320 acknowledgement_sender,
1321 };
1322
1323 archived_segment_notification_sender.notify(move || archived_segment_notification);
1324
1325 let wait_fut = async {
1326 while acknowledgement_receiver.next().await.is_some() {
1327 debug!(
1328 "Archived segment notification acknowledged: {}",
1329 segment_index
1330 );
1331 }
1332 };
1333
1334 if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
1335 .await
1336 .is_err()
1337 {
1338 warn!(
1339 "Archived segment notification was not acknowledged and reached timeout, continue \
1340 regardless"
1341 );
1342 }
1343}