1mod incremental_record_commitments;
2
3extern crate alloc;
4
5use crate::archiver::incremental_record_commitments::{
6 update_record_commitments, IncrementalRecordCommitmentsState,
7};
8use alloc::collections::VecDeque;
9#[cfg(not(feature = "std"))]
10use alloc::vec;
11#[cfg(not(feature = "std"))]
12use alloc::vec::Vec;
13use core::cmp::Ordering;
14use parity_scale_codec::{Compact, CompactLen, Decode, Encode, Input, Output};
15#[cfg(feature = "parallel")]
16use rayon::prelude::*;
17use subspace_core_primitives::hashes::{blake3_254_hash_to_scalar, Blake3Hash};
18use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, GlobalObject};
19use subspace_core_primitives::pieces::RawRecord;
20use subspace_core_primitives::segments::{
21 ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment,
22 SegmentCommitment, SegmentHeader, SegmentIndex,
23};
24use subspace_core_primitives::{BlockNumber, ScalarBytes};
25use subspace_erasure_coding::ErasureCoding;
26use subspace_kzg::{Kzg, Scalar};
27
28const INITIAL_LAST_ARCHIVED_BLOCK: LastArchivedBlock = LastArchivedBlock {
29 number: 0,
30 archived_progress: ArchivedBlockProgress::Partial(0),
37};
38
39#[derive(Debug, Clone, Eq, PartialEq)]
41pub enum Segment {
42 V0 {
44 items: Vec<SegmentItem>,
46 },
47}
48
49impl Default for Segment {
50 fn default() -> Self {
51 Segment::V0 { items: Vec::new() }
52 }
53}
54
55impl Encode for Segment {
56 fn size_hint(&self) -> usize {
57 RecordedHistorySegment::SIZE
58 }
59
60 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
61 match self {
62 Segment::V0 { items } => {
63 dest.push_byte(0);
64 for item in items {
65 item.encode_to(dest);
66 }
67 }
68 }
69 }
70}
71
72impl Decode for Segment {
73 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
74 let variant = input
75 .read_byte()
76 .map_err(|e| e.chain("Could not decode `Segment`, failed to read variant byte"))?;
77 match variant {
78 0 => {
79 let mut items = Vec::new();
80 loop {
81 match input.remaining_len()? {
82 Some(0) => {
83 break;
84 }
85 Some(_) => {
86 }
88 None => {
89 return Err(
90 "Source doesn't report remaining length, decoding not possible"
91 .into(),
92 );
93 }
94 }
95
96 match SegmentItem::decode(input) {
97 Ok(item) => {
98 items.push(item);
99 }
100 Err(error) => {
101 return Err(error.chain("Could not decode `Segment::V0::items`"));
102 }
103 }
104 }
105
106 Ok(Segment::V0 { items })
107 }
108 _ => Err("Could not decode `Segment`, variant doesn't exist".into()),
109 }
110 }
111}
112
113impl Segment {
114 fn push_item(&mut self, segment_item: SegmentItem) {
115 let Self::V0 { items } = self;
116 items.push(segment_item);
117 }
118
119 pub fn items(&self) -> &[SegmentItem] {
120 match self {
121 Segment::V0 { items } => items,
122 }
123 }
124
125 pub(crate) fn items_mut(&mut self) -> &mut Vec<SegmentItem> {
126 match self {
127 Segment::V0 { items } => items,
128 }
129 }
130
131 pub fn into_items(self) -> Vec<SegmentItem> {
132 match self {
133 Segment::V0 { items } => items,
134 }
135 }
136}
137
138#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
140pub enum SegmentItem {
141 #[codec(index = 0)]
143 Padding,
144 #[codec(index = 1)]
146 Block {
147 bytes: Vec<u8>,
149 #[doc(hidden)]
151 #[codec(skip)]
152 object_mapping: BlockObjectMapping,
153 },
154 #[codec(index = 2)]
156 BlockStart {
157 bytes: Vec<u8>,
159 #[doc(hidden)]
161 #[codec(skip)]
162 object_mapping: BlockObjectMapping,
163 },
164 #[codec(index = 3)]
166 BlockContinuation {
167 bytes: Vec<u8>,
169 #[doc(hidden)]
171 #[codec(skip)]
172 object_mapping: BlockObjectMapping,
173 },
174 #[codec(index = 4)]
176 ParentSegmentHeader(SegmentHeader),
177}
178
179#[derive(Debug, Clone, Eq, PartialEq)]
182pub struct NewArchivedSegment {
183 pub segment_header: SegmentHeader,
185 pub pieces: ArchivedHistorySegment,
187}
188
189#[derive(Debug, Clone, Eq, PartialEq)]
191pub struct ArchiveBlockOutcome {
192 pub archived_segments: Vec<NewArchivedSegment>,
195
196 pub object_mapping: Vec<GlobalObject>,
199}
200
201#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
203pub enum ArchiverInstantiationError {
204 #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
207 InvalidLastArchivedBlock(u32),
208 #[error(
210 "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
211 {archived_block_bytes} bytes"
212 )]
213 InvalidBlockSmallSize {
214 block_bytes: u32,
216 archived_block_bytes: u32,
218 },
219}
220
221#[derive(Debug, Clone)]
233pub struct Archiver {
234 buffer: VecDeque<SegmentItem>,
237 incremental_record_commitments: IncrementalRecordCommitmentsState,
239 erasure_coding: ErasureCoding,
241 kzg: Kzg,
243 segment_index: SegmentIndex,
245 prev_segment_header_hash: Blake3Hash,
247 last_archived_block: LastArchivedBlock,
249}
250
251impl Archiver {
252 pub fn new(kzg: Kzg, erasure_coding: ErasureCoding) -> Self {
254 Self {
255 buffer: VecDeque::default(),
256 incremental_record_commitments: IncrementalRecordCommitmentsState::with_capacity(
257 RecordedHistorySegment::NUM_RAW_RECORDS,
258 ),
259 erasure_coding,
260 kzg,
261 segment_index: SegmentIndex::ZERO,
262 prev_segment_header_hash: Blake3Hash::default(),
263 last_archived_block: INITIAL_LAST_ARCHIVED_BLOCK,
264 }
265 }
266
267 pub fn with_initial_state(
271 kzg: Kzg,
272 erasure_coding: ErasureCoding,
273 segment_header: SegmentHeader,
274 encoded_block: &[u8],
275 mut object_mapping: BlockObjectMapping,
276 ) -> Result<Self, ArchiverInstantiationError> {
277 let mut archiver = Self::new(kzg, erasure_coding);
278
279 archiver.segment_index = segment_header.segment_index() + SegmentIndex::ONE;
280 archiver.prev_segment_header_hash = segment_header.hash();
281 archiver.last_archived_block = segment_header.last_archived_block();
282
283 archiver
285 .buffer
286 .push_back(SegmentItem::ParentSegmentHeader(segment_header));
287
288 if let Some(archived_block_bytes) = archiver.last_archived_block.partial_archived() {
289 let encoded_block_bytes = u32::try_from(encoded_block.len())
290 .expect("Blocks length is never bigger than u32; qed");
291
292 match encoded_block_bytes.cmp(&archived_block_bytes) {
293 Ordering::Less => {
294 return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
295 block_bytes: encoded_block_bytes,
296 archived_block_bytes,
297 });
298 }
299 Ordering::Equal => {
300 return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
301 encoded_block_bytes,
302 ));
303 }
304 Ordering::Greater => {
305 object_mapping
308 .objects_mut()
309 .retain_mut(|block_object: &mut BlockObject| {
310 if block_object.offset >= archived_block_bytes {
311 block_object.offset -= archived_block_bytes;
312 true
313 } else {
314 false
315 }
316 });
317 archiver.buffer.push_back(SegmentItem::BlockContinuation {
318 bytes: encoded_block[(archived_block_bytes as usize)..].to_vec(),
319 object_mapping,
320 });
321 }
322 }
323 }
324
325 Ok(archiver)
326 }
327
328 pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
330 if self.last_archived_block != INITIAL_LAST_ARCHIVED_BLOCK {
331 Some(self.last_archived_block.number)
332 } else {
333 None
334 }
335 }
336
337 pub fn add_block(
343 &mut self,
344 bytes: Vec<u8>,
345 object_mapping: BlockObjectMapping,
346 incremental: bool,
347 ) -> ArchiveBlockOutcome {
348 self.buffer.push_back(SegmentItem::Block {
350 bytes,
351 object_mapping,
352 });
353
354 let mut archived_segments = Vec::new();
355 let mut object_mapping = Vec::new();
356
357 while let Some(mut segment) = self.produce_segment(incremental) {
359 object_mapping.extend(Self::produce_object_mappings(
361 self.segment_index,
362 segment.items_mut().iter_mut(),
363 ));
364 archived_segments.push(self.produce_archived_segment(segment));
365 }
366
367 object_mapping.extend(self.produce_next_segment_mappings());
369
370 ArchiveBlockOutcome {
371 archived_segments,
372 object_mapping,
373 }
374 }
375
376 fn produce_segment(&mut self, incremental: bool) -> Option<Segment> {
379 let mut segment = Segment::V0 {
380 items: Vec::with_capacity(self.buffer.len()),
381 };
382
383 let mut last_archived_block = self.last_archived_block;
384
385 let mut segment_size = segment.encoded_size();
386
387 while segment_size < (RecordedHistorySegment::SIZE - 2) {
390 let segment_item = match self.buffer.pop_front() {
391 Some(segment_item) => segment_item,
392 None => {
393 let existing_commitments = self.incremental_record_commitments.len();
394 let bytes_committed_to = existing_commitments * RawRecord::SIZE;
395 if incremental && segment_size - bytes_committed_to >= RawRecord::SIZE * 2 {
398 update_record_commitments(
399 &mut self.incremental_record_commitments,
400 &segment,
401 &self.kzg,
402 );
403 }
404
405 for segment_item in segment.into_items().into_iter().rev() {
407 self.buffer.push_front(segment_item);
408 }
409
410 return None;
411 }
412 };
413
414 let segment_item_encoded_size = segment_item.encoded_size();
415 segment_size += segment_item_encoded_size;
416
417 if segment_size >= RecordedHistorySegment::SIZE {
419 let spill_over = segment_size - RecordedHistorySegment::SIZE;
422
423 let inner_bytes_size = match &segment_item {
428 SegmentItem::Padding => {
429 unreachable!("Buffer never contains SegmentItem::Padding; qed");
430 }
431 SegmentItem::Block { bytes, .. } => bytes.len(),
432 SegmentItem::BlockStart { .. } => {
433 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
434 }
435 SegmentItem::BlockContinuation { bytes, .. } => bytes.len(),
436 SegmentItem::ParentSegmentHeader(_) => {
437 unreachable!(
438 "SegmentItem::SegmentHeader is always the first element in the buffer \
439 and fits into the segment; qed",
440 );
441 }
442 };
443
444 if spill_over > inner_bytes_size {
445 self.buffer.push_front(segment_item);
446 segment_size -= segment_item_encoded_size;
447 break;
448 }
449 }
450
451 match &segment_item {
452 SegmentItem::Padding => {
453 unreachable!("Buffer never contains SegmentItem::Padding; qed");
454 }
455 SegmentItem::Block { .. } => {
456 if last_archived_block != INITIAL_LAST_ARCHIVED_BLOCK {
458 last_archived_block.number += 1;
461 }
462 last_archived_block.set_complete();
463 }
464 SegmentItem::BlockStart { .. } => {
465 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
466 }
467 SegmentItem::BlockContinuation { bytes, .. } => {
468 let archived_bytes = last_archived_block.partial_archived().expect(
472 "Block continuation implies that there are some bytes \
473 archived already; qed",
474 );
475 last_archived_block.set_partial_archived(
476 archived_bytes
477 + u32::try_from(bytes.len())
478 .expect("Blocks length is never bigger than u32; qed"),
479 );
480 }
481 SegmentItem::ParentSegmentHeader(_) => {
482 }
484 }
485
486 segment.push_item(segment_item);
487 }
488
489 let spill_over = segment_size
491 .checked_sub(RecordedHistorySegment::SIZE)
492 .unwrap_or_default();
493
494 if spill_over > 0 {
495 let items = segment.items_mut();
496 let segment_item = items
497 .pop()
498 .expect("Segment over segment size always has at least one item; qed");
499
500 let segment_item = match segment_item {
501 SegmentItem::Padding => {
502 unreachable!("Buffer never contains SegmentItem::Padding; qed");
503 }
504 SegmentItem::Block {
505 mut bytes,
506 mut object_mapping,
507 } => {
508 let split_point = bytes.len() - spill_over;
509 let continuation_bytes = bytes[split_point..].to_vec();
510
511 bytes.truncate(split_point);
512
513 let continuation_object_mapping = BlockObjectMapping::V0 {
514 objects: object_mapping
515 .objects_mut()
516 .extract_if(.., |block_object: &mut BlockObject| {
517 if block_object.offset >= split_point as u32 {
518 block_object.offset -= split_point as u32;
519 true
520 } else {
521 false
522 }
523 })
524 .collect(),
525 };
526
527 last_archived_block.set_partial_archived(
529 u32::try_from(bytes.len())
530 .expect("Blocks length is never bigger than u32; qed"),
531 );
532
533 self.buffer.push_front(SegmentItem::BlockContinuation {
535 bytes: continuation_bytes,
536 object_mapping: continuation_object_mapping,
537 });
538
539 SegmentItem::BlockStart {
540 bytes,
541 object_mapping,
542 }
543 }
544 SegmentItem::BlockStart { .. } => {
545 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
546 }
547 SegmentItem::BlockContinuation {
548 mut bytes,
549 mut object_mapping,
550 } => {
551 let split_point = bytes.len() - spill_over;
552 let continuation_bytes = bytes[split_point..].to_vec();
553
554 bytes.truncate(split_point);
555
556 let continuation_object_mapping = BlockObjectMapping::V0 {
557 objects: object_mapping
558 .objects_mut()
559 .extract_if(.., |block_object: &mut BlockObject| {
560 if block_object.offset >= split_point as u32 {
561 block_object.offset -= split_point as u32;
562 true
563 } else {
564 false
565 }
566 })
567 .collect(),
568 };
569
570 let archived_bytes = last_archived_block.partial_archived().expect(
573 "Block continuation implies that there are some bytes archived \
574 already; qed",
575 );
576 last_archived_block.set_partial_archived(
577 archived_bytes
578 - u32::try_from(spill_over)
579 .expect("Blocks length is never bigger than u32; qed"),
580 );
581
582 self.buffer.push_front(SegmentItem::BlockContinuation {
584 bytes: continuation_bytes,
585 object_mapping: continuation_object_mapping,
586 });
587
588 SegmentItem::BlockContinuation {
589 bytes,
590 object_mapping,
591 }
592 }
593 SegmentItem::ParentSegmentHeader(_) => {
594 unreachable!(
595 "SegmentItem::SegmentHeader is always the first element in the buffer and \
596 fits into the segment; qed",
597 );
598 }
599 };
600
601 items.push(segment_item);
603 } else {
604 last_archived_block.set_complete();
607 }
608
609 self.last_archived_block = last_archived_block;
610
611 Some(segment)
612 }
613
614 fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
620 Self::produce_object_mappings(self.segment_index, self.buffer.iter_mut())
621 }
622
623 fn produce_object_mappings<'a>(
628 segment_index: SegmentIndex,
629 items: impl Iterator<Item = &'a mut SegmentItem>,
630 ) -> Vec<GlobalObject> {
631 let source_piece_indexes = &segment_index.segment_piece_indexes_source_first()
632 [..RecordedHistorySegment::NUM_RAW_RECORDS];
633
634 let mut corrected_object_mapping = Vec::new();
635 let mut base_offset_in_segment = Segment::default().encoded_size();
636 for segment_item in items {
637 match segment_item {
638 SegmentItem::Padding => {
639 unreachable!(
640 "Segment during archiving never contains SegmentItem::Padding; qed"
641 );
642 }
643 SegmentItem::Block {
644 bytes,
645 object_mapping,
646 }
647 | SegmentItem::BlockStart {
648 bytes,
649 object_mapping,
650 }
651 | SegmentItem::BlockContinuation {
652 bytes,
653 object_mapping,
654 } => {
655 for block_object in object_mapping.objects_mut().drain(..) {
656 let offset_in_segment = base_offset_in_segment
658 + 1
659 + Compact::compact_len(&(bytes.len() as u32))
660 + block_object.offset as usize;
661 let raw_piece_offset = (offset_in_segment % RawRecord::SIZE)
662 .try_into()
663 .expect("Offset within piece should always fit in 32-bit integer; qed");
664 corrected_object_mapping.push(GlobalObject {
665 hash: block_object.hash,
666 piece_index: source_piece_indexes[offset_in_segment / RawRecord::SIZE],
667 offset: raw_piece_offset,
668 });
669 }
670 }
671 SegmentItem::ParentSegmentHeader(_) => {
672 }
674 }
675
676 base_offset_in_segment += segment_item.encoded_size();
677 }
678
679 corrected_object_mapping
680 }
681
682 fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
684 let mut pieces = {
685 let mut raw_record_shards = Vec::<u8>::with_capacity(RecordedHistorySegment::SIZE);
687 segment.encode_to(&mut raw_record_shards);
688 raw_record_shards.resize(raw_record_shards.capacity(), 0);
690
691 drop(segment);
693
694 let mut pieces = ArchivedHistorySegment::default();
695
696 let mut tmp_source_shards_scalars =
698 Vec::<Scalar>::with_capacity(RecordedHistorySegment::NUM_RAW_RECORDS);
699 for record_offset in 0..RawRecord::NUM_CHUNKS {
701 raw_record_shards
703 .array_chunks::<{ RawRecord::SIZE }>()
704 .map(|record_bytes| {
705 record_bytes
706 .array_chunks::<{ ScalarBytes::SAFE_BYTES }>()
707 .nth(record_offset)
708 .expect("Statically known to exist in a record; qed")
709 })
710 .map(Scalar::from)
711 .collect_into(&mut tmp_source_shards_scalars);
712
713 let parity_shards = self
715 .erasure_coding
716 .extend(&tmp_source_shards_scalars)
717 .expect(
718 "Erasure coding instance is deliberately configured to support this \
719 input; qed",
720 );
721
722 let interleaved_input_chunks = tmp_source_shards_scalars
723 .drain(..)
724 .zip(parity_shards)
725 .flat_map(|(a, b)| [a, b]);
726 let output_chunks = pieces.iter_mut().map(|piece| {
727 piece
728 .record_mut()
729 .get_mut(record_offset)
730 .expect("Statically known to exist in a record; qed")
731 });
732
733 interleaved_input_chunks
734 .zip(output_chunks)
735 .for_each(|(input, output)| output.copy_from_slice(&input.to_bytes()));
736 }
737
738 pieces
739 };
740
741 let existing_commitments = self.incremental_record_commitments.len();
742 {
744 #[cfg(not(feature = "parallel"))]
745 let source_pieces = pieces.source();
746 #[cfg(feature = "parallel")]
747 let source_pieces = pieces.par_source();
748
749 let iter = source_pieces.skip(existing_commitments).map(|piece| {
750 let record_chunks = piece.record().iter().map(|scalar_bytes| {
751 Scalar::try_from(scalar_bytes).expect(
752 "Source pieces were just created and are guaranteed to contain \
753 valid scalars; qed",
754 )
755 });
756
757 let number_of_chunks = record_chunks.len();
758 let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());
759
760 record_chunks.collect_into(&mut scalars);
761
762 scalars.resize(scalars.capacity(), Scalar::default());
764
765 let polynomial = self
766 .kzg
767 .poly(&scalars)
768 .expect("KZG instance must be configured to support this many scalars; qed");
769 self.kzg
770 .commit(&polynomial)
771 .expect("KZG instance must be configured to support this many scalars; qed")
772 });
773
774 #[cfg(not(feature = "parallel"))]
775 iter.collect_into(&mut *self.incremental_record_commitments);
776 #[cfg(feature = "parallel")]
779 self.incremental_record_commitments.par_extend(iter);
780 }
781 let record_commitments = self
783 .erasure_coding
784 .extend_commitments(&self.incremental_record_commitments)
785 .expect(
786 "Erasure coding instance is deliberately configured to support this input; qed",
787 );
788 self.incremental_record_commitments.clear();
789
790 let polynomial = self
791 .kzg
792 .poly(
793 &record_commitments
794 .iter()
795 .map(|commitment| {
796 Scalar::try_from(blake3_254_hash_to_scalar(&commitment.to_bytes()))
797 .expect("Create correctly by dedicated hash function; qed")
798 })
799 .collect::<Vec<_>>(),
800 )
801 .expect("Internally produced values must never fail; qed");
802
803 let segment_commitment = SegmentCommitment::from(
804 self.kzg
805 .commit(&polynomial)
806 .expect("Internally produced values must never fail; qed"),
807 );
808
809 pieces
811 .iter_mut()
812 .zip(record_commitments)
813 .enumerate()
814 .for_each(|(position, (piece, commitment))| {
815 let commitment_bytes = commitment.to_bytes();
816 let (_record, commitment, witness) = piece.split_mut();
817 commitment.copy_from_slice(&commitment_bytes);
818 witness.copy_from_slice(
820 &self
821 .kzg
822 .create_witness(
823 &polynomial,
824 ArchivedHistorySegment::NUM_PIECES,
825 position as u32,
826 )
827 .expect("Position is statically known to be valid; qed")
828 .to_bytes(),
829 );
830 });
831
832 let segment_header = SegmentHeader::V0 {
834 segment_index: self.segment_index,
835 segment_commitment,
836 prev_segment_header_hash: self.prev_segment_header_hash,
837 last_archived_block: self.last_archived_block,
838 };
839
840 self.segment_index += SegmentIndex::ONE;
842 self.prev_segment_header_hash = segment_header.hash();
843
844 self.buffer
847 .push_front(SegmentItem::ParentSegmentHeader(segment_header));
848
849 NewArchivedSegment {
850 segment_header,
851 pieces: pieces.to_shared(),
852 }
853 }
854}