subspace_archiving/
archiver.rs

1mod incremental_record_commitments;
2
3extern crate alloc;
4
5use crate::archiver::incremental_record_commitments::{
6    update_record_commitments, IncrementalRecordCommitmentsState,
7};
8use alloc::collections::VecDeque;
9#[cfg(not(feature = "std"))]
10use alloc::vec;
11#[cfg(not(feature = "std"))]
12use alloc::vec::Vec;
13use core::cmp::Ordering;
14use parity_scale_codec::{Compact, CompactLen, Decode, Encode, Input, Output};
15#[cfg(feature = "parallel")]
16use rayon::prelude::*;
17use subspace_core_primitives::hashes::{blake3_254_hash_to_scalar, Blake3Hash};
18use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, GlobalObject};
19use subspace_core_primitives::pieces::RawRecord;
20use subspace_core_primitives::segments::{
21    ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment,
22    SegmentCommitment, SegmentHeader, SegmentIndex,
23};
24use subspace_core_primitives::{BlockNumber, ScalarBytes};
25use subspace_erasure_coding::ErasureCoding;
26use subspace_kzg::{Kzg, Scalar};
27
28const INITIAL_LAST_ARCHIVED_BLOCK: LastArchivedBlock = LastArchivedBlock {
29    number: 0,
30    // Special case for the genesis block.
31    //
32    // When we start archiving process with pre-genesis objects, we do not yet have any blocks
33    // archived, but `LastArchivedBlock` is required for `SegmentHeader`s to be produced, so
34    // `ArchivedBlockProgress::Partial(0)` indicates that we have archived 0 bytes of block `0` (see
35    // field above), meaning we did not in fact archive actual blocks yet.
36    archived_progress: ArchivedBlockProgress::Partial(0),
37};
38
39/// Segment represents a collection of items stored in archival history of the Subspace blockchain
40#[derive(Debug, Clone, Eq, PartialEq)]
41pub enum Segment {
42    // V0 of the segment data structure
43    V0 {
44        /// Segment items
45        items: Vec<SegmentItem>,
46    },
47}
48
49impl Default for Segment {
50    fn default() -> Self {
51        Segment::V0 { items: Vec::new() }
52    }
53}
54
55impl Encode for Segment {
56    fn size_hint(&self) -> usize {
57        RecordedHistorySegment::SIZE
58    }
59
60    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
61        match self {
62            Segment::V0 { items } => {
63                dest.push_byte(0);
64                for item in items {
65                    item.encode_to(dest);
66                }
67            }
68        }
69    }
70}
71
72impl Decode for Segment {
73    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
74        let variant = input
75            .read_byte()
76            .map_err(|e| e.chain("Could not decode `Segment`, failed to read variant byte"))?;
77        match variant {
78            0 => {
79                let mut items = Vec::new();
80                loop {
81                    match input.remaining_len()? {
82                        Some(0) => {
83                            break;
84                        }
85                        Some(_) => {
86                            // Processing continues below
87                        }
88                        None => {
89                            return Err(
90                                "Source doesn't report remaining length, decoding not possible"
91                                    .into(),
92                            );
93                        }
94                    }
95
96                    match SegmentItem::decode(input) {
97                        Ok(item) => {
98                            items.push(item);
99                        }
100                        Err(error) => {
101                            return Err(error.chain("Could not decode `Segment::V0::items`"));
102                        }
103                    }
104                }
105
106                Ok(Segment::V0 { items })
107            }
108            _ => Err("Could not decode `Segment`, variant doesn't exist".into()),
109        }
110    }
111}
112
113impl Segment {
114    fn push_item(&mut self, segment_item: SegmentItem) {
115        let Self::V0 { items } = self;
116        items.push(segment_item);
117    }
118
119    pub fn items(&self) -> &[SegmentItem] {
120        match self {
121            Segment::V0 { items } => items,
122        }
123    }
124
125    pub(crate) fn items_mut(&mut self) -> &mut Vec<SegmentItem> {
126        match self {
127            Segment::V0 { items } => items,
128        }
129    }
130
131    pub fn into_items(self) -> Vec<SegmentItem> {
132        match self {
133            Segment::V0 { items } => items,
134        }
135    }
136}
137
138/// Kinds of items that are contained within a segment
139#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
140pub enum SegmentItem {
141    /// Special dummy enum variant only used as an implementation detail for padding purposes
142    #[codec(index = 0)]
143    Padding,
144    /// Contains full block inside
145    #[codec(index = 1)]
146    Block {
147        /// Block bytes
148        bytes: Vec<u8>,
149        /// This is a convenience implementation detail and will not be available on decoding
150        #[doc(hidden)]
151        #[codec(skip)]
152        object_mapping: BlockObjectMapping,
153    },
154    /// Contains the beginning of the block inside, remainder will be found in subsequent segments
155    #[codec(index = 2)]
156    BlockStart {
157        /// Block bytes
158        bytes: Vec<u8>,
159        /// This is a convenience implementation detail and will not be available on decoding
160        #[doc(hidden)]
161        #[codec(skip)]
162        object_mapping: BlockObjectMapping,
163    },
164    /// Continuation of the partial block spilled over into the next segment
165    #[codec(index = 3)]
166    BlockContinuation {
167        /// Block bytes
168        bytes: Vec<u8>,
169        /// This is a convenience implementation detail and will not be available on decoding
170        #[doc(hidden)]
171        #[codec(skip)]
172        object_mapping: BlockObjectMapping,
173    },
174    /// Segment header of the parent
175    #[codec(index = 4)]
176    ParentSegmentHeader(SegmentHeader),
177}
178
179/// Newly archived segment as a combination of segment header and corresponding archived history
180/// segment containing pieces
181#[derive(Debug, Clone, Eq, PartialEq)]
182pub struct NewArchivedSegment {
183    /// Segment header
184    pub segment_header: SegmentHeader,
185    /// Segment of archived history containing pieces
186    pub pieces: ArchivedHistorySegment,
187}
188
189/// The outcome of adding a block to the archiver.
190#[derive(Debug, Clone, Eq, PartialEq)]
191pub struct ArchiveBlockOutcome {
192    /// The new segments archived after adding the block.
193    /// There can be zero or more segments created after each block.
194    pub archived_segments: Vec<NewArchivedSegment>,
195
196    /// The new object mappings for those segments.
197    /// There can be zero or more mappings created after each block.
198    pub object_mapping: Vec<GlobalObject>,
199}
200
201/// Archiver instantiation error
202#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
203pub enum ArchiverInstantiationError {
204    /// Invalid last archived block, its size is the same as the encoded block
205    /// (so it should have been completely archived, not partially archived)
206    #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
207    InvalidLastArchivedBlock(u32),
208    /// Invalid block, its size is smaller than the already archived number of bytes
209    #[error(
210        "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
211        {archived_block_bytes} bytes"
212    )]
213    InvalidBlockSmallSize {
214        /// Full block size
215        block_bytes: u32,
216        /// Already archived portion of the block
217        archived_block_bytes: u32,
218    },
219}
220
221/// Block archiver for Subspace blockchain.
222///
223/// It takes new confirmed (at `K` depth) blocks and concatenates them into a buffer, buffer is
224/// sliced into segments of [`RecordedHistorySegment::SIZE`] size, segments are sliced into source
225/// records of [`RawRecord::SIZE`], records are erasure coded, committed to with [`Kzg`], then
226/// commitments with witnesses are appended and records become pieces that are returned alongside
227/// corresponding segment header header.
228///
229/// ## Panics
230/// Panics when operating on blocks, whose length doesn't fit into u32 (should never be the case in
231/// blockchain context anyway).
232#[derive(Debug, Clone)]
233pub struct Archiver {
234    /// Buffer containing blocks and other buffered items that are pending to be included into the
235    /// next segment
236    buffer: VecDeque<SegmentItem>,
237    /// Intermediate record commitments that are built incrementally as above buffer fills up.
238    incremental_record_commitments: IncrementalRecordCommitmentsState,
239    /// Erasure coding data structure
240    erasure_coding: ErasureCoding,
241    /// KZG instance
242    kzg: Kzg,
243    /// An index of the current segment
244    segment_index: SegmentIndex,
245    /// Hash of the segment header of the previous segment
246    prev_segment_header_hash: Blake3Hash,
247    /// Last archived block
248    last_archived_block: LastArchivedBlock,
249}
250
251impl Archiver {
252    /// Create a new instance
253    pub fn new(kzg: Kzg, erasure_coding: ErasureCoding) -> Self {
254        Self {
255            buffer: VecDeque::default(),
256            incremental_record_commitments: IncrementalRecordCommitmentsState::with_capacity(
257                RecordedHistorySegment::NUM_RAW_RECORDS,
258            ),
259            erasure_coding,
260            kzg,
261            segment_index: SegmentIndex::ZERO,
262            prev_segment_header_hash: Blake3Hash::default(),
263            last_archived_block: INITIAL_LAST_ARCHIVED_BLOCK,
264        }
265    }
266
267    /// Create a new instance of the archiver with initial state in case of restart.
268    ///
269    /// `block` corresponds to `last_archived_block` and will be processed according to its state.
270    pub fn with_initial_state(
271        kzg: Kzg,
272        erasure_coding: ErasureCoding,
273        segment_header: SegmentHeader,
274        encoded_block: &[u8],
275        mut object_mapping: BlockObjectMapping,
276    ) -> Result<Self, ArchiverInstantiationError> {
277        let mut archiver = Self::new(kzg, erasure_coding);
278
279        archiver.segment_index = segment_header.segment_index() + SegmentIndex::ONE;
280        archiver.prev_segment_header_hash = segment_header.hash();
281        archiver.last_archived_block = segment_header.last_archived_block();
282
283        // The first thing in the buffer should be segment header
284        archiver
285            .buffer
286            .push_back(SegmentItem::ParentSegmentHeader(segment_header));
287
288        if let Some(archived_block_bytes) = archiver.last_archived_block.partial_archived() {
289            let encoded_block_bytes = u32::try_from(encoded_block.len())
290                .expect("Blocks length is never bigger than u32; qed");
291
292            match encoded_block_bytes.cmp(&archived_block_bytes) {
293                Ordering::Less => {
294                    return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
295                        block_bytes: encoded_block_bytes,
296                        archived_block_bytes,
297                    });
298                }
299                Ordering::Equal => {
300                    return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
301                        encoded_block_bytes,
302                    ));
303                }
304                Ordering::Greater => {
305                    // Take part of the encoded block that wasn't archived yet and push to the
306                    // buffer as a block continuation
307                    object_mapping
308                        .objects_mut()
309                        .retain_mut(|block_object: &mut BlockObject| {
310                            if block_object.offset >= archived_block_bytes {
311                                block_object.offset -= archived_block_bytes;
312                                true
313                            } else {
314                                false
315                            }
316                        });
317                    archiver.buffer.push_back(SegmentItem::BlockContinuation {
318                        bytes: encoded_block[(archived_block_bytes as usize)..].to_vec(),
319                        object_mapping,
320                    });
321                }
322            }
323        }
324
325        Ok(archiver)
326    }
327
328    /// Get last archived block if there was any
329    pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
330        if self.last_archived_block != INITIAL_LAST_ARCHIVED_BLOCK {
331            Some(self.last_archived_block.number)
332        } else {
333            None
334        }
335    }
336
337    /// Adds new block to internal buffer, potentially producing pieces, segment headers, and
338    /// object mappings.
339    ///
340    /// Incremental archiving can be enabled if amortized block addition cost is preferred over
341    /// throughput.
342    pub fn add_block(
343        &mut self,
344        bytes: Vec<u8>,
345        object_mapping: BlockObjectMapping,
346        incremental: bool,
347    ) -> ArchiveBlockOutcome {
348        // Append new block to the buffer
349        self.buffer.push_back(SegmentItem::Block {
350            bytes,
351            object_mapping,
352        });
353
354        let mut archived_segments = Vec::new();
355        let mut object_mapping = Vec::new();
356
357        // Add completed segments and their mappings for this block.
358        while let Some(mut segment) = self.produce_segment(incremental) {
359            // Produce any segment mappings that haven't already been produced.
360            object_mapping.extend(Self::produce_object_mappings(
361                self.segment_index,
362                segment.items_mut().iter_mut(),
363            ));
364            archived_segments.push(self.produce_archived_segment(segment));
365        }
366
367        // Produce any next segment buffer mappings that haven't already been produced.
368        object_mapping.extend(self.produce_next_segment_mappings());
369
370        ArchiveBlockOutcome {
371            archived_segments,
372            object_mapping,
373        }
374    }
375
376    /// Try to slice buffer contents into segments if there is enough data, producing one segment at
377    /// a time
378    fn produce_segment(&mut self, incremental: bool) -> Option<Segment> {
379        let mut segment = Segment::V0 {
380            items: Vec::with_capacity(self.buffer.len()),
381        };
382
383        let mut last_archived_block = self.last_archived_block;
384
385        let mut segment_size = segment.encoded_size();
386
387        // `-2` because even the smallest segment item will take 2 bytes to encode, so it makes
388        // sense to stop earlier here
389        while segment_size < (RecordedHistorySegment::SIZE - 2) {
390            let segment_item = match self.buffer.pop_front() {
391                Some(segment_item) => segment_item,
392                None => {
393                    let existing_commitments = self.incremental_record_commitments.len();
394                    let bytes_committed_to = existing_commitments * RawRecord::SIZE;
395                    // Run incremental archiver only when there is at least two records to archive,
396                    // otherwise we're wasting CPU cycles encoding segment over and over again
397                    if incremental && segment_size - bytes_committed_to >= RawRecord::SIZE * 2 {
398                        update_record_commitments(
399                            &mut self.incremental_record_commitments,
400                            &segment,
401                            &self.kzg,
402                        );
403                    }
404
405                    // Push all of the items back into the buffer, we don't have enough data yet
406                    for segment_item in segment.into_items().into_iter().rev() {
407                        self.buffer.push_front(segment_item);
408                    }
409
410                    return None;
411                }
412            };
413
414            let segment_item_encoded_size = segment_item.encoded_size();
415            segment_size += segment_item_encoded_size;
416
417            // Check if there would be enough data collected with above segment item inserted
418            if segment_size >= RecordedHistorySegment::SIZE {
419                // Check if there is an excess of data that should be spilled over into the next
420                // segment
421                let spill_over = segment_size - RecordedHistorySegment::SIZE;
422
423                // Due to compact vector length encoding in scale codec, spill over might happen to
424                // be the same or even bigger than the inserted segment item bytes, in which case
425                // last segment item insertion needs to be skipped to avoid out of range panic when
426                // trying to cut segment item internal bytes.
427                let inner_bytes_size = match &segment_item {
428                    SegmentItem::Padding => {
429                        unreachable!("Buffer never contains SegmentItem::Padding; qed");
430                    }
431                    SegmentItem::Block { bytes, .. } => bytes.len(),
432                    SegmentItem::BlockStart { .. } => {
433                        unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
434                    }
435                    SegmentItem::BlockContinuation { bytes, .. } => bytes.len(),
436                    SegmentItem::ParentSegmentHeader(_) => {
437                        unreachable!(
438                            "SegmentItem::SegmentHeader is always the first element in the buffer \
439                            and fits into the segment; qed",
440                        );
441                    }
442                };
443
444                if spill_over > inner_bytes_size {
445                    self.buffer.push_front(segment_item);
446                    segment_size -= segment_item_encoded_size;
447                    break;
448                }
449            }
450
451            match &segment_item {
452                SegmentItem::Padding => {
453                    unreachable!("Buffer never contains SegmentItem::Padding; qed");
454                }
455                SegmentItem::Block { .. } => {
456                    // Skip block number increase in case of the very first block
457                    if last_archived_block != INITIAL_LAST_ARCHIVED_BLOCK {
458                        // Increase archived block number and assume the whole block was
459                        // archived
460                        last_archived_block.number += 1;
461                    }
462                    last_archived_block.set_complete();
463                }
464                SegmentItem::BlockStart { .. } => {
465                    unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
466                }
467                SegmentItem::BlockContinuation { bytes, .. } => {
468                    // Same block, but assume for now that the whole block was archived, but
469                    // also store the number of bytes as opposed to `None`, we'll transform
470                    // it into `None` if needed later
471                    let archived_bytes = last_archived_block.partial_archived().expect(
472                        "Block continuation implies that there are some bytes \
473                            archived already; qed",
474                    );
475                    last_archived_block.set_partial_archived(
476                        archived_bytes
477                            + u32::try_from(bytes.len())
478                                .expect("Blocks length is never bigger than u32; qed"),
479                    );
480                }
481                SegmentItem::ParentSegmentHeader(_) => {
482                    // We are not interested in segment header here
483                }
484            }
485
486            segment.push_item(segment_item);
487        }
488
489        // Check if there is an excess of data that should be spilled over into the next segment
490        let spill_over = segment_size
491            .checked_sub(RecordedHistorySegment::SIZE)
492            .unwrap_or_default();
493
494        if spill_over > 0 {
495            let items = segment.items_mut();
496            let segment_item = items
497                .pop()
498                .expect("Segment over segment size always has at least one item; qed");
499
500            let segment_item = match segment_item {
501                SegmentItem::Padding => {
502                    unreachable!("Buffer never contains SegmentItem::Padding; qed");
503                }
504                SegmentItem::Block {
505                    mut bytes,
506                    mut object_mapping,
507                } => {
508                    let split_point = bytes.len() - spill_over;
509                    let continuation_bytes = bytes[split_point..].to_vec();
510
511                    bytes.truncate(split_point);
512
513                    let continuation_object_mapping = BlockObjectMapping::V0 {
514                        objects: object_mapping
515                            .objects_mut()
516                            .extract_if(.., |block_object: &mut BlockObject| {
517                                if block_object.offset >= split_point as u32 {
518                                    block_object.offset -= split_point as u32;
519                                    true
520                                } else {
521                                    false
522                                }
523                            })
524                            .collect(),
525                    };
526
527                    // Update last archived block to include partial archiving info
528                    last_archived_block.set_partial_archived(
529                        u32::try_from(bytes.len())
530                            .expect("Blocks length is never bigger than u32; qed"),
531                    );
532
533                    // Push continuation element back into the buffer where removed segment item was
534                    self.buffer.push_front(SegmentItem::BlockContinuation {
535                        bytes: continuation_bytes,
536                        object_mapping: continuation_object_mapping,
537                    });
538
539                    SegmentItem::BlockStart {
540                        bytes,
541                        object_mapping,
542                    }
543                }
544                SegmentItem::BlockStart { .. } => {
545                    unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
546                }
547                SegmentItem::BlockContinuation {
548                    mut bytes,
549                    mut object_mapping,
550                } => {
551                    let split_point = bytes.len() - spill_over;
552                    let continuation_bytes = bytes[split_point..].to_vec();
553
554                    bytes.truncate(split_point);
555
556                    let continuation_object_mapping = BlockObjectMapping::V0 {
557                        objects: object_mapping
558                            .objects_mut()
559                            .extract_if(.., |block_object: &mut BlockObject| {
560                                if block_object.offset >= split_point as u32 {
561                                    block_object.offset -= split_point as u32;
562                                    true
563                                } else {
564                                    false
565                                }
566                            })
567                            .collect(),
568                    };
569
570                    // Above code assumed that block was archived fully, now remove spilled-over
571                    // bytes from the size
572                    let archived_bytes = last_archived_block.partial_archived().expect(
573                        "Block continuation implies that there are some bytes archived \
574                        already; qed",
575                    );
576                    last_archived_block.set_partial_archived(
577                        archived_bytes
578                            - u32::try_from(spill_over)
579                                .expect("Blocks length is never bigger than u32; qed"),
580                    );
581
582                    // Push continuation element back into the buffer where removed segment item was
583                    self.buffer.push_front(SegmentItem::BlockContinuation {
584                        bytes: continuation_bytes,
585                        object_mapping: continuation_object_mapping,
586                    });
587
588                    SegmentItem::BlockContinuation {
589                        bytes,
590                        object_mapping,
591                    }
592                }
593                SegmentItem::ParentSegmentHeader(_) => {
594                    unreachable!(
595                        "SegmentItem::SegmentHeader is always the first element in the buffer and \
596                        fits into the segment; qed",
597                    );
598                }
599            };
600
601            // Push back shortened segment item
602            items.push(segment_item);
603        } else {
604            // Above code added bytes length even though it was assumed that all continuation bytes
605            // fit into the segment, now we need to tweak that
606            last_archived_block.set_complete();
607        }
608
609        self.last_archived_block = last_archived_block;
610
611        Some(segment)
612    }
613
614    /// Produce object mappings for the buffered items for the next segment. Then remove the
615    /// mappings in those items.
616    ///
617    /// Must only be called after all complete segments for a block have been produced. Before
618    /// that, the buffer can contain a `BlockContinuation` which spans multiple segments.
619    fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
620        Self::produce_object_mappings(self.segment_index, self.buffer.iter_mut())
621    }
622
623    /// Produce object mappings for `items` in `segment_index`. Then remove the mappings from those
624    /// items.
625    ///
626    /// This method can be called on a `Segment`’s items, or on the `Archiver`'s internal buffer.
627    fn produce_object_mappings<'a>(
628        segment_index: SegmentIndex,
629        items: impl Iterator<Item = &'a mut SegmentItem>,
630    ) -> Vec<GlobalObject> {
631        let source_piece_indexes = &segment_index.segment_piece_indexes_source_first()
632            [..RecordedHistorySegment::NUM_RAW_RECORDS];
633
634        let mut corrected_object_mapping = Vec::new();
635        let mut base_offset_in_segment = Segment::default().encoded_size();
636        for segment_item in items {
637            match segment_item {
638                SegmentItem::Padding => {
639                    unreachable!(
640                        "Segment during archiving never contains SegmentItem::Padding; qed"
641                    );
642                }
643                SegmentItem::Block {
644                    bytes,
645                    object_mapping,
646                }
647                | SegmentItem::BlockStart {
648                    bytes,
649                    object_mapping,
650                }
651                | SegmentItem::BlockContinuation {
652                    bytes,
653                    object_mapping,
654                } => {
655                    for block_object in object_mapping.objects_mut().drain(..) {
656                        // `+1` corresponds to `SegmentItem::X {}` enum variant encoding
657                        let offset_in_segment = base_offset_in_segment
658                            + 1
659                            + Compact::compact_len(&(bytes.len() as u32))
660                            + block_object.offset as usize;
661                        let raw_piece_offset = (offset_in_segment % RawRecord::SIZE)
662                            .try_into()
663                            .expect("Offset within piece should always fit in 32-bit integer; qed");
664                        corrected_object_mapping.push(GlobalObject {
665                            hash: block_object.hash,
666                            piece_index: source_piece_indexes[offset_in_segment / RawRecord::SIZE],
667                            offset: raw_piece_offset,
668                        });
669                    }
670                }
671                SegmentItem::ParentSegmentHeader(_) => {
672                    // Ignore, no object mappings here
673                }
674            }
675
676            base_offset_in_segment += segment_item.encoded_size();
677        }
678
679        corrected_object_mapping
680    }
681
682    /// Take segment as an input, apply necessary transformations and produce archived segment
683    fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
684        let mut pieces = {
685            // Serialize segment into concatenation of raw records
686            let mut raw_record_shards = Vec::<u8>::with_capacity(RecordedHistorySegment::SIZE);
687            segment.encode_to(&mut raw_record_shards);
688            // Segment might require some padding (see [`Self::produce_segment`] for details)
689            raw_record_shards.resize(raw_record_shards.capacity(), 0);
690
691            // Segment is quite big and no longer necessary
692            drop(segment);
693
694            let mut pieces = ArchivedHistorySegment::default();
695
696            // Scratch buffer to avoid re-allocation
697            let mut tmp_source_shards_scalars =
698                Vec::<Scalar>::with_capacity(RecordedHistorySegment::NUM_RAW_RECORDS);
699            // Iterate over the chunks of `ScalarBytes::SAFE_BYTES` bytes of all records
700            for record_offset in 0..RawRecord::NUM_CHUNKS {
701                // Collect chunks of each record at the same offset
702                raw_record_shards
703                    .array_chunks::<{ RawRecord::SIZE }>()
704                    .map(|record_bytes| {
705                        record_bytes
706                            .array_chunks::<{ ScalarBytes::SAFE_BYTES }>()
707                            .nth(record_offset)
708                            .expect("Statically known to exist in a record; qed")
709                    })
710                    .map(Scalar::from)
711                    .collect_into(&mut tmp_source_shards_scalars);
712
713                // Extend to obtain corresponding parity shards
714                let parity_shards = self
715                    .erasure_coding
716                    .extend(&tmp_source_shards_scalars)
717                    .expect(
718                        "Erasure coding instance is deliberately configured to support this \
719                        input; qed",
720                    );
721
722                let interleaved_input_chunks = tmp_source_shards_scalars
723                    .drain(..)
724                    .zip(parity_shards)
725                    .flat_map(|(a, b)| [a, b]);
726                let output_chunks = pieces.iter_mut().map(|piece| {
727                    piece
728                        .record_mut()
729                        .get_mut(record_offset)
730                        .expect("Statically known to exist in a record; qed")
731                });
732
733                interleaved_input_chunks
734                    .zip(output_chunks)
735                    .for_each(|(input, output)| output.copy_from_slice(&input.to_bytes()));
736            }
737
738            pieces
739        };
740
741        let existing_commitments = self.incremental_record_commitments.len();
742        // Add commitments for pieces that were not created incrementally
743        {
744            #[cfg(not(feature = "parallel"))]
745            let source_pieces = pieces.source();
746            #[cfg(feature = "parallel")]
747            let source_pieces = pieces.par_source();
748
749            let iter = source_pieces.skip(existing_commitments).map(|piece| {
750                let record_chunks = piece.record().iter().map(|scalar_bytes| {
751                    Scalar::try_from(scalar_bytes).expect(
752                        "Source pieces were just created and are guaranteed to contain \
753                            valid scalars; qed",
754                    )
755                });
756
757                let number_of_chunks = record_chunks.len();
758                let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());
759
760                record_chunks.collect_into(&mut scalars);
761
762                // Number of scalars for KZG must be a power of two elements
763                scalars.resize(scalars.capacity(), Scalar::default());
764
765                let polynomial = self
766                    .kzg
767                    .poly(&scalars)
768                    .expect("KZG instance must be configured to support this many scalars; qed");
769                self.kzg
770                    .commit(&polynomial)
771                    .expect("KZG instance must be configured to support this many scalars; qed")
772            });
773
774            #[cfg(not(feature = "parallel"))]
775            iter.collect_into(&mut *self.incremental_record_commitments);
776            // TODO: `collect_into_vec()`, unfortunately, truncates input, which is not what we want
777            //  can be unified when https://github.com/rayon-rs/rayon/issues/1039 is resolved
778            #[cfg(feature = "parallel")]
779            self.incremental_record_commitments.par_extend(iter);
780        }
781        // Collect hashes to commitments from all records
782        let record_commitments = self
783            .erasure_coding
784            .extend_commitments(&self.incremental_record_commitments)
785            .expect(
786                "Erasure coding instance is deliberately configured to support this input; qed",
787            );
788        self.incremental_record_commitments.clear();
789
790        let polynomial = self
791            .kzg
792            .poly(
793                &record_commitments
794                    .iter()
795                    .map(|commitment| {
796                        Scalar::try_from(blake3_254_hash_to_scalar(&commitment.to_bytes()))
797                            .expect("Create correctly by dedicated hash function; qed")
798                    })
799                    .collect::<Vec<_>>(),
800            )
801            .expect("Internally produced values must never fail; qed");
802
803        let segment_commitment = SegmentCommitment::from(
804            self.kzg
805                .commit(&polynomial)
806                .expect("Internally produced values must never fail; qed"),
807        );
808
809        // Create witness for every record and write it to corresponding piece.
810        pieces
811            .iter_mut()
812            .zip(record_commitments)
813            .enumerate()
814            .for_each(|(position, (piece, commitment))| {
815                let commitment_bytes = commitment.to_bytes();
816                let (_record, commitment, witness) = piece.split_mut();
817                commitment.copy_from_slice(&commitment_bytes);
818                // TODO: Consider batch witness creation for improved performance
819                witness.copy_from_slice(
820                    &self
821                        .kzg
822                        .create_witness(
823                            &polynomial,
824                            ArchivedHistorySegment::NUM_PIECES,
825                            position as u32,
826                        )
827                        .expect("Position is statically known to be valid; qed")
828                        .to_bytes(),
829                );
830            });
831
832        // Now produce segment header
833        let segment_header = SegmentHeader::V0 {
834            segment_index: self.segment_index,
835            segment_commitment,
836            prev_segment_header_hash: self.prev_segment_header_hash,
837            last_archived_block: self.last_archived_block,
838        };
839
840        // Update state
841        self.segment_index += SegmentIndex::ONE;
842        self.prev_segment_header_hash = segment_header.hash();
843
844        // Add segment header to the beginning of the buffer to be the first thing included in the
845        // next segment
846        self.buffer
847            .push_front(SegmentItem::ParentSegmentHeader(segment_header));
848
849        NewArchivedSegment {
850            segment_header,
851            pieces: pieces.to_shared(),
852        }
853    }
854}