subspace_archiving/archiver/
incremental_record_commitments.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::archiver::Segment;
5#[cfg(not(feature = "std"))]
6use alloc::vec::Vec;
7use core::ops::{Deref, DerefMut};
8use parity_scale_codec::{Encode, Output};
9#[cfg(feature = "parallel")]
10use rayon::prelude::*;
11use subspace_core_primitives::pieces::RawRecord;
12use subspace_core_primitives::ScalarBytes;
13use subspace_kzg::{Commitment, Kzg, Scalar};
14
15/// State of incremental record commitments, encapsulated to hide implementation details and
16/// encapsulate tricky logic
17#[derive(Debug, Default, Clone)]
18pub(super) struct IncrementalRecordCommitmentsState {
19    /// State contains record commitments.
20    ///
21    /// NOTE: Until full segment is processed, this will not contain commitment to the first record
22    /// since it is not ready yet. This in turn means all commitments will be at `-1` offset.
23    state: Vec<Commitment>,
24}
25
26impl Deref for IncrementalRecordCommitmentsState {
27    type Target = Vec<Commitment>;
28
29    #[inline]
30    fn deref(&self) -> &Self::Target {
31        &self.state
32    }
33}
34
35impl DerefMut for IncrementalRecordCommitmentsState {
36    #[inline]
37    fn deref_mut(&mut self) -> &mut Self::Target {
38        &mut self.state
39    }
40}
41
42impl IncrementalRecordCommitmentsState {
43    /// Creates an empty state with space for at least capacity records.
44    pub(super) fn with_capacity(capacity: usize) -> Self {
45        Self {
46            state: Vec::with_capacity(capacity),
47        }
48    }
49
50    /// Clears internal state before start of the next segment
51    pub(super) fn clear(&mut self) {
52        self.state.clear();
53    }
54}
55
56/// Update internal record commitments state based on provided segment.
57pub(super) fn update_record_commitments(
58    incremental_record_commitments: &mut IncrementalRecordCommitmentsState,
59    segment: &Segment,
60    kzg: &Kzg,
61) {
62    segment.encode_to(&mut IncrementalRecordCommitmentsProcessor::new(
63        incremental_record_commitments,
64        kzg,
65    ));
66}
67
68/// Processor is hidden to not expose unnecessary implementation details (like `Output` trait
69/// implementation)
70struct IncrementalRecordCommitmentsProcessor<'a> {
71    /// Number of bytes of recorded history segment for which commitments were already created
72    skip_bytes: usize,
73    /// Buffer where new bytes for which commitments need to be created are pushed
74    buffer: Vec<u8>,
75    /// Record commitments already created
76    incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
77    /// Kzg instance used for commitments creation
78    kzg: &'a Kzg,
79}
80
81impl Drop for IncrementalRecordCommitmentsProcessor<'_> {
82    fn drop(&mut self) {
83        #[cfg(not(feature = "parallel"))]
84        let raw_records_bytes = self.buffer.chunks_exact(RawRecord::SIZE);
85        #[cfg(feature = "parallel")]
86        let raw_records_bytes = self.buffer.par_chunks_exact(RawRecord::SIZE);
87
88        let iter = raw_records_bytes
89            .map(|raw_record_bytes| {
90                raw_record_bytes
91                    .array_chunks::<{ ScalarBytes::SAFE_BYTES }>()
92                    .map(Scalar::from)
93            })
94            .map(|record_chunks| {
95                let number_of_chunks = record_chunks.len();
96                let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());
97
98                record_chunks.collect_into(&mut scalars);
99
100                // Number of scalars for KZG must be a power of two elements
101                scalars.resize(scalars.capacity(), Scalar::default());
102
103                let polynomial = self
104                    .kzg
105                    .poly(&scalars)
106                    .expect("KZG instance must be configured to support this many scalars; qed");
107                self.kzg
108                    .commit(&polynomial)
109                    .expect("KZG instance must be configured to support this many scalars; qed")
110            });
111
112        #[cfg(not(feature = "parallel"))]
113        iter.collect_into(&mut self.incremental_record_commitments.state);
114        // TODO: `collect_into_vec()`, unfortunately, truncates input, which is not what we want
115        //  can be unified when https://github.com/rayon-rs/rayon/issues/1039 is resolved
116        #[cfg(feature = "parallel")]
117        self.incremental_record_commitments.par_extend(iter);
118    }
119}
120
121impl Output for IncrementalRecordCommitmentsProcessor<'_> {
122    fn write(&mut self, mut bytes: &[u8]) {
123        if self.skip_bytes >= bytes.len() {
124            self.skip_bytes -= bytes.len();
125        } else {
126            bytes = &bytes[self.skip_bytes..];
127            self.skip_bytes = 0;
128            self.buffer.extend_from_slice(bytes);
129        }
130    }
131}
132
133impl<'a> IncrementalRecordCommitmentsProcessor<'a> {
134    fn new(
135        incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
136        kzg: &'a Kzg,
137    ) -> Self {
138        Self {
139            skip_bytes: incremental_record_commitments.len() * RawRecord::SIZE,
140            // Default to record size, may grow if necessary
141            buffer: Vec::with_capacity(RawRecord::SIZE),
142            incremental_record_commitments,
143            kzg,
144        }
145    }
146}