1mod incremental_record_commitments;
2
3extern crate alloc;
4
5use crate::archiver::incremental_record_commitments::{
6 IncrementalRecordCommitmentsState, update_record_commitments,
7};
8use alloc::collections::VecDeque;
9#[cfg(not(feature = "std"))]
10use alloc::vec;
11#[cfg(not(feature = "std"))]
12use alloc::vec::Vec;
13use core::cmp::Ordering;
14use parity_scale_codec::{Compact, CompactLen, Decode, Encode, Input, Output};
15#[cfg(feature = "parallel")]
16use rayon::prelude::*;
17use subspace_core_primitives::hashes::{Blake3Hash, blake3_254_hash_to_scalar};
18use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, GlobalObject};
19use subspace_core_primitives::pieces::RawRecord;
20use subspace_core_primitives::segments::{
21 ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment,
22 SegmentCommitment, SegmentHeader, SegmentIndex,
23};
24use subspace_core_primitives::{BlockNumber, ScalarBytes};
25use subspace_erasure_coding::ErasureCoding;
26use subspace_kzg::{Kzg, Scalar};
27
28const INITIAL_LAST_ARCHIVED_BLOCK: LastArchivedBlock = LastArchivedBlock {
29 number: 0,
30 archived_progress: ArchivedBlockProgress::Partial(0),
37};
38
39#[derive(Debug, Clone, Eq, PartialEq)]
41pub enum Segment {
42 V0 {
44 items: Vec<SegmentItem>,
46 },
47}
48
49impl Default for Segment {
50 fn default() -> Self {
51 Segment::V0 { items: Vec::new() }
52 }
53}
54
55impl Encode for Segment {
56 fn size_hint(&self) -> usize {
57 RecordedHistorySegment::SIZE
58 }
59
60 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
61 match self {
62 Segment::V0 { items } => {
63 dest.push_byte(0);
64 for item in items {
65 item.encode_to(dest);
66 }
67 }
68 }
69 }
70}
71
72impl Decode for Segment {
73 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
74 let variant = input
75 .read_byte()
76 .map_err(|e| e.chain("Could not decode `Segment`, failed to read variant byte"))?;
77 match variant {
78 0 => {
79 let mut items = Vec::new();
80 loop {
81 match input.remaining_len()? {
82 Some(0) => {
83 break;
84 }
85 Some(_) => {
86 }
88 None => {
89 return Err(
90 "Source doesn't report remaining length, decoding not possible"
91 .into(),
92 );
93 }
94 }
95
96 match SegmentItem::decode(input) {
97 Ok(item) => {
98 items.push(item);
99 }
100 Err(error) => {
101 return Err(error.chain("Could not decode `Segment::V0::items`"));
102 }
103 }
104 }
105
106 Ok(Segment::V0 { items })
107 }
108 _ => Err("Could not decode `Segment`, variant doesn't exist".into()),
109 }
110 }
111}
112
113impl Segment {
114 fn push_item(&mut self, segment_item: SegmentItem) {
115 let Self::V0 { items } = self;
116 items.push(segment_item);
117 }
118
119 pub fn items(&self) -> &[SegmentItem] {
120 match self {
121 Segment::V0 { items } => items,
122 }
123 }
124
125 pub(crate) fn items_mut(&mut self) -> &mut Vec<SegmentItem> {
126 match self {
127 Segment::V0 { items } => items,
128 }
129 }
130
131 pub fn into_items(self) -> Vec<SegmentItem> {
132 match self {
133 Segment::V0 { items } => items,
134 }
135 }
136}
137
138#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
140pub enum SegmentItem {
141 #[codec(index = 0)]
143 Padding,
144 #[codec(index = 1)]
146 Block {
147 bytes: Vec<u8>,
149 #[doc(hidden)]
151 #[codec(skip)]
152 object_mapping: BlockObjectMapping,
153 },
154 #[codec(index = 2)]
156 BlockStart {
157 bytes: Vec<u8>,
159 #[doc(hidden)]
161 #[codec(skip)]
162 object_mapping: BlockObjectMapping,
163 },
164 #[codec(index = 3)]
166 BlockContinuation {
167 bytes: Vec<u8>,
169 #[doc(hidden)]
171 #[codec(skip)]
172 object_mapping: BlockObjectMapping,
173 },
174 #[codec(index = 4)]
176 ParentSegmentHeader(SegmentHeader),
177}
178
179#[derive(Debug, Clone, Eq, PartialEq)]
182pub struct NewArchivedSegment {
183 pub segment_header: SegmentHeader,
185 pub pieces: ArchivedHistorySegment,
187}
188
189#[derive(Debug, Clone, Eq, PartialEq)]
191pub struct ArchiveBlockOutcome {
192 pub archived_segments: Vec<NewArchivedSegment>,
195
196 pub object_mapping: Vec<GlobalObject>,
199}
200
201#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
203pub enum ArchiverInstantiationError {
204 #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
207 InvalidLastArchivedBlock(u32),
208 #[error(
210 "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
211 {archived_block_bytes} bytes"
212 )]
213 InvalidBlockSmallSize {
214 block_bytes: u32,
216 archived_block_bytes: u32,
218 },
219}
220
221#[derive(Debug, Clone)]
233pub struct Archiver {
234 buffer: VecDeque<SegmentItem>,
237 incremental_record_commitments: IncrementalRecordCommitmentsState,
239 erasure_coding: ErasureCoding,
241 kzg: Kzg,
243 segment_index: SegmentIndex,
245 prev_segment_header_hash: Blake3Hash,
247 last_archived_block: LastArchivedBlock,
249}
250
251impl Archiver {
252 pub fn new(kzg: Kzg, erasure_coding: ErasureCoding) -> Self {
254 Self {
255 buffer: VecDeque::default(),
256 incremental_record_commitments: IncrementalRecordCommitmentsState::with_capacity(
257 RecordedHistorySegment::NUM_RAW_RECORDS,
258 ),
259 erasure_coding,
260 kzg,
261 segment_index: SegmentIndex::ZERO,
262 prev_segment_header_hash: Blake3Hash::default(),
263 last_archived_block: INITIAL_LAST_ARCHIVED_BLOCK,
264 }
265 }
266
267 pub fn with_initial_state(
271 kzg: Kzg,
272 erasure_coding: ErasureCoding,
273 segment_header: SegmentHeader,
274 encoded_block: &[u8],
275 mut object_mapping: BlockObjectMapping,
276 ) -> Result<Self, ArchiverInstantiationError> {
277 let mut archiver = Self::new(kzg, erasure_coding);
278
279 archiver.segment_index = segment_header.segment_index() + SegmentIndex::ONE;
280 archiver.prev_segment_header_hash = segment_header.hash();
281 archiver.last_archived_block = segment_header.last_archived_block();
282
283 archiver
285 .buffer
286 .push_back(SegmentItem::ParentSegmentHeader(segment_header));
287
288 if let Some(archived_block_bytes) = archiver.last_archived_block.partial_archived() {
289 let encoded_block_bytes = u32::try_from(encoded_block.len())
290 .expect("Blocks length is never bigger than u32; qed");
291
292 match encoded_block_bytes.cmp(&archived_block_bytes) {
293 Ordering::Less => {
294 return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
295 block_bytes: encoded_block_bytes,
296 archived_block_bytes,
297 });
298 }
299 Ordering::Equal => {
300 return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
301 encoded_block_bytes,
302 ));
303 }
304 Ordering::Greater => {
305 object_mapping
308 .objects_mut()
309 .retain_mut(|block_object: &mut BlockObject| {
310 if block_object.offset >= archived_block_bytes {
311 block_object.offset -= archived_block_bytes;
312 true
313 } else {
314 false
315 }
316 });
317 archiver.buffer.push_back(SegmentItem::BlockContinuation {
318 bytes: encoded_block[(archived_block_bytes as usize)..].to_vec(),
319 object_mapping,
320 });
321 }
322 }
323 }
324
325 Ok(archiver)
326 }
327
328 pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
330 if self.last_archived_block != INITIAL_LAST_ARCHIVED_BLOCK {
331 Some(self.last_archived_block.number)
332 } else {
333 None
334 }
335 }
336
337 pub fn add_block(
343 &mut self,
344 bytes: Vec<u8>,
345 object_mapping: BlockObjectMapping,
346 incremental: bool,
347 ) -> ArchiveBlockOutcome {
348 self.buffer.push_back(SegmentItem::Block {
350 bytes,
351 object_mapping,
352 });
353
354 let mut archived_segments = Vec::new();
355 let mut object_mapping = Vec::new();
356
357 while let Some(mut segment) = self.produce_segment(incremental) {
359 object_mapping.extend(Self::produce_object_mappings(
361 self.segment_index,
362 segment.items_mut().iter_mut(),
363 ));
364 archived_segments.push(self.produce_archived_segment(segment));
365 }
366
367 object_mapping.extend(self.produce_next_segment_mappings());
369
370 ArchiveBlockOutcome {
371 archived_segments,
372 object_mapping,
373 }
374 }
375
376 fn produce_segment(&mut self, incremental: bool) -> Option<Segment> {
379 let mut segment = Segment::V0 {
380 items: Vec::with_capacity(self.buffer.len()),
381 };
382
383 let mut last_archived_block = self.last_archived_block;
384
385 let mut segment_size = segment.encoded_size();
386
387 while segment_size < (RecordedHistorySegment::SIZE - 2) {
390 let segment_item = match self.buffer.pop_front() {
391 Some(segment_item) => segment_item,
392 None => {
393 let existing_commitments = self.incremental_record_commitments.len();
394 let bytes_committed_to = existing_commitments * RawRecord::SIZE;
395 if incremental && segment_size - bytes_committed_to >= RawRecord::SIZE * 2 {
398 update_record_commitments(
399 &mut self.incremental_record_commitments,
400 &segment,
401 &self.kzg,
402 );
403 }
404
405 for segment_item in segment.into_items().into_iter().rev() {
407 self.buffer.push_front(segment_item);
408 }
409
410 return None;
411 }
412 };
413
414 let segment_item_encoded_size = segment_item.encoded_size();
415 segment_size += segment_item_encoded_size;
416
417 if segment_size >= RecordedHistorySegment::SIZE {
419 let spill_over = segment_size - RecordedHistorySegment::SIZE;
422
423 let inner_bytes_size = match &segment_item {
428 SegmentItem::Padding => {
429 unreachable!("Buffer never contains SegmentItem::Padding; qed");
430 }
431 SegmentItem::Block { bytes, .. } => bytes.len(),
432 SegmentItem::BlockStart { .. } => {
433 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
434 }
435 SegmentItem::BlockContinuation { bytes, .. } => bytes.len(),
436 SegmentItem::ParentSegmentHeader(_) => {
437 unreachable!(
438 "SegmentItem::SegmentHeader is always the first element in the buffer \
439 and fits into the segment; qed",
440 );
441 }
442 };
443
444 if spill_over > inner_bytes_size {
445 self.buffer.push_front(segment_item);
446 segment_size -= segment_item_encoded_size;
447 break;
448 }
449 }
450
451 match &segment_item {
452 SegmentItem::Padding => {
453 unreachable!("Buffer never contains SegmentItem::Padding; qed");
454 }
455 SegmentItem::Block { .. } => {
456 if last_archived_block != INITIAL_LAST_ARCHIVED_BLOCK {
458 last_archived_block.number += 1;
461 }
462 last_archived_block.set_complete();
463 }
464 SegmentItem::BlockStart { .. } => {
465 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
466 }
467 SegmentItem::BlockContinuation { bytes, .. } => {
468 let archived_bytes = last_archived_block.partial_archived().expect(
472 "Block continuation implies that there are some bytes \
473 archived already; qed",
474 );
475 last_archived_block.set_partial_archived(
476 archived_bytes
477 + u32::try_from(bytes.len())
478 .expect("Blocks length is never bigger than u32; qed"),
479 );
480 }
481 SegmentItem::ParentSegmentHeader(_) => {
482 }
484 }
485
486 segment.push_item(segment_item);
487 }
488
489 let spill_over = segment_size.saturating_sub(RecordedHistorySegment::SIZE);
491
492 if spill_over > 0 {
493 let items = segment.items_mut();
494 let segment_item = items
495 .pop()
496 .expect("Segment over segment size always has at least one item; qed");
497
498 let segment_item = match segment_item {
499 SegmentItem::Padding => {
500 unreachable!("Buffer never contains SegmentItem::Padding; qed");
501 }
502 SegmentItem::Block {
503 mut bytes,
504 mut object_mapping,
505 } => {
506 let split_point = bytes.len() - spill_over;
507 let continuation_bytes = bytes[split_point..].to_vec();
508
509 bytes.truncate(split_point);
510
511 let continuation_object_mapping = BlockObjectMapping::V0 {
512 objects: object_mapping
513 .objects_mut()
514 .extract_if(.., |block_object: &mut BlockObject| {
515 if block_object.offset >= split_point as u32 {
516 block_object.offset -= split_point as u32;
517 true
518 } else {
519 false
520 }
521 })
522 .collect(),
523 };
524
525 last_archived_block.set_partial_archived(
527 u32::try_from(bytes.len())
528 .expect("Blocks length is never bigger than u32; qed"),
529 );
530
531 self.buffer.push_front(SegmentItem::BlockContinuation {
533 bytes: continuation_bytes,
534 object_mapping: continuation_object_mapping,
535 });
536
537 SegmentItem::BlockStart {
538 bytes,
539 object_mapping,
540 }
541 }
542 SegmentItem::BlockStart { .. } => {
543 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
544 }
545 SegmentItem::BlockContinuation {
546 mut bytes,
547 mut object_mapping,
548 } => {
549 let split_point = bytes.len() - spill_over;
550 let continuation_bytes = bytes[split_point..].to_vec();
551
552 bytes.truncate(split_point);
553
554 let continuation_object_mapping = BlockObjectMapping::V0 {
555 objects: object_mapping
556 .objects_mut()
557 .extract_if(.., |block_object: &mut BlockObject| {
558 if block_object.offset >= split_point as u32 {
559 block_object.offset -= split_point as u32;
560 true
561 } else {
562 false
563 }
564 })
565 .collect(),
566 };
567
568 let archived_bytes = last_archived_block.partial_archived().expect(
571 "Block continuation implies that there are some bytes archived \
572 already; qed",
573 );
574 last_archived_block.set_partial_archived(
575 archived_bytes
576 - u32::try_from(spill_over)
577 .expect("Blocks length is never bigger than u32; qed"),
578 );
579
580 self.buffer.push_front(SegmentItem::BlockContinuation {
582 bytes: continuation_bytes,
583 object_mapping: continuation_object_mapping,
584 });
585
586 SegmentItem::BlockContinuation {
587 bytes,
588 object_mapping,
589 }
590 }
591 SegmentItem::ParentSegmentHeader(_) => {
592 unreachable!(
593 "SegmentItem::SegmentHeader is always the first element in the buffer and \
594 fits into the segment; qed",
595 );
596 }
597 };
598
599 items.push(segment_item);
601 } else {
602 last_archived_block.set_complete();
605 }
606
607 self.last_archived_block = last_archived_block;
608
609 Some(segment)
610 }
611
612 fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
618 Self::produce_object_mappings(self.segment_index, self.buffer.iter_mut())
619 }
620
621 fn produce_object_mappings<'a>(
626 segment_index: SegmentIndex,
627 items: impl Iterator<Item = &'a mut SegmentItem>,
628 ) -> Vec<GlobalObject> {
629 let source_piece_indexes = &segment_index.segment_piece_indexes_source_first()
630 [..RecordedHistorySegment::NUM_RAW_RECORDS];
631
632 let mut corrected_object_mapping = Vec::new();
633 let mut base_offset_in_segment = Segment::default().encoded_size();
634 for segment_item in items {
635 match segment_item {
636 SegmentItem::Padding => {
637 unreachable!(
638 "Segment during archiving never contains SegmentItem::Padding; qed"
639 );
640 }
641 SegmentItem::Block {
642 bytes,
643 object_mapping,
644 }
645 | SegmentItem::BlockStart {
646 bytes,
647 object_mapping,
648 }
649 | SegmentItem::BlockContinuation {
650 bytes,
651 object_mapping,
652 } => {
653 for block_object in object_mapping.objects_mut().drain(..) {
654 let offset_in_segment = base_offset_in_segment
656 + 1
657 + Compact::compact_len(&(bytes.len() as u32))
658 + block_object.offset as usize;
659 let raw_piece_offset = (offset_in_segment % RawRecord::SIZE)
660 .try_into()
661 .expect("Offset within piece should always fit in 32-bit integer; qed");
662 corrected_object_mapping.push(GlobalObject {
663 hash: block_object.hash,
664 piece_index: source_piece_indexes[offset_in_segment / RawRecord::SIZE],
665 offset: raw_piece_offset,
666 });
667 }
668 }
669 SegmentItem::ParentSegmentHeader(_) => {
670 }
672 }
673
674 base_offset_in_segment += segment_item.encoded_size();
675 }
676
677 corrected_object_mapping
678 }
679
680 fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
682 let mut pieces = {
683 let mut raw_record_shards = Vec::<u8>::with_capacity(RecordedHistorySegment::SIZE);
685 segment.encode_to(&mut raw_record_shards);
686 raw_record_shards.resize(raw_record_shards.capacity(), 0);
688
689 drop(segment);
691
692 let mut pieces = ArchivedHistorySegment::default();
693
694 let mut tmp_source_shards_scalars =
696 Vec::<Scalar>::with_capacity(RecordedHistorySegment::NUM_RAW_RECORDS);
697 for record_offset in 0..RawRecord::NUM_CHUNKS {
699 raw_record_shards
701 .as_chunks::<{ RawRecord::SIZE }>()
702 .0
703 .iter()
704 .map(|record_bytes| {
705 record_bytes
706 .as_chunks::<{ ScalarBytes::SAFE_BYTES }>()
707 .0
708 .get(record_offset)
709 .expect("Statically known to exist in a record; qed")
710 })
711 .map(Scalar::from)
712 .collect_into(&mut tmp_source_shards_scalars);
713
714 let parity_shards = self
716 .erasure_coding
717 .extend(&tmp_source_shards_scalars)
718 .expect(
719 "Erasure coding instance is deliberately configured to support this \
720 input; qed",
721 );
722
723 let interleaved_input_chunks = tmp_source_shards_scalars
724 .drain(..)
725 .zip(parity_shards)
726 .flat_map(|(a, b)| [a, b]);
727 let output_chunks = pieces.iter_mut().map(|piece| {
728 piece
729 .record_mut()
730 .get_mut(record_offset)
731 .expect("Statically known to exist in a record; qed")
732 });
733
734 interleaved_input_chunks
735 .zip(output_chunks)
736 .for_each(|(input, output)| output.copy_from_slice(&input.to_bytes()));
737 }
738
739 pieces
740 };
741
742 let existing_commitments = self.incremental_record_commitments.len();
743 {
745 #[cfg(not(feature = "parallel"))]
746 let source_pieces = pieces.source();
747 #[cfg(feature = "parallel")]
748 let source_pieces = pieces.par_source();
749
750 let iter = source_pieces.skip(existing_commitments).map(|piece| {
751 let record_chunks = piece.record().iter().map(|scalar_bytes| {
752 Scalar::try_from(scalar_bytes).expect(
753 "Source pieces were just created and are guaranteed to contain \
754 valid scalars; qed",
755 )
756 });
757
758 let number_of_chunks = record_chunks.len();
759 let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());
760
761 record_chunks.collect_into(&mut scalars);
762
763 scalars.resize(scalars.capacity(), Scalar::default());
765
766 let polynomial = self
767 .kzg
768 .poly(&scalars)
769 .expect("KZG instance must be configured to support this many scalars; qed");
770 self.kzg
771 .commit(&polynomial)
772 .expect("KZG instance must be configured to support this many scalars; qed")
773 });
774
775 #[cfg(not(feature = "parallel"))]
776 iter.collect_into(&mut *self.incremental_record_commitments);
777 #[cfg(feature = "parallel")]
780 self.incremental_record_commitments.par_extend(iter);
781 }
782 let record_commitments = self
784 .erasure_coding
785 .extend_commitments(&self.incremental_record_commitments)
786 .expect(
787 "Erasure coding instance is deliberately configured to support this input; qed",
788 );
789 self.incremental_record_commitments.clear();
790
791 let polynomial = self
792 .kzg
793 .poly(
794 &record_commitments
795 .iter()
796 .map(|commitment| {
797 Scalar::try_from(blake3_254_hash_to_scalar(&commitment.to_bytes()))
798 .expect("Create correctly by dedicated hash function; qed")
799 })
800 .collect::<Vec<_>>(),
801 )
802 .expect("Internally produced values must never fail; qed");
803
804 let segment_commitment = SegmentCommitment::from(
805 self.kzg
806 .commit(&polynomial)
807 .expect("Internally produced values must never fail; qed"),
808 );
809
810 pieces
812 .iter_mut()
813 .zip(record_commitments)
814 .enumerate()
815 .for_each(|(position, (piece, commitment))| {
816 let commitment_bytes = commitment.to_bytes();
817 let (_record, commitment, witness) = piece.split_mut();
818 commitment.copy_from_slice(&commitment_bytes);
819 witness.copy_from_slice(
821 &self
822 .kzg
823 .create_witness(
824 &polynomial,
825 ArchivedHistorySegment::NUM_PIECES,
826 position as u32,
827 )
828 .expect("Position is statically known to be valid; qed")
829 .to_bytes(),
830 );
831 });
832
833 let segment_header = SegmentHeader::V0 {
835 segment_index: self.segment_index,
836 segment_commitment,
837 prev_segment_header_hash: self.prev_segment_header_hash,
838 last_archived_block: self.last_archived_block,
839 };
840
841 self.segment_index += SegmentIndex::ONE;
843 self.prev_segment_header_hash = segment_header.hash();
844
845 self.buffer
848 .push_front(SegmentItem::ParentSegmentHeader(segment_header));
849
850 NewArchivedSegment {
851 segment_header,
852 pieces: pieces.to_shared(),
853 }
854 }
855}