subspace_archiving/archiver/
incremental_record_commitments.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::archiver::Segment;
5#[cfg(not(feature = "std"))]
6use alloc::vec::Vec;
7use core::ops::{Deref, DerefMut};
8use parity_scale_codec::{Encode, Output};
9#[cfg(feature = "parallel")]
10use rayon::prelude::*;
11use subspace_core_primitives::ScalarBytes;
12use subspace_core_primitives::pieces::RawRecord;
13use subspace_kzg::{Commitment, Kzg, Scalar};
14
15/// State of incremental record commitments, encapsulated to hide implementation details and
16/// encapsulate tricky logic
17#[derive(Debug, Default, Clone)]
18pub(super) struct IncrementalRecordCommitmentsState {
19    /// State contains record commitments.
20    ///
21    /// NOTE: Until full segment is processed, this will not contain commitment to the first record
22    /// since it is not ready yet. This in turn means all commitments will be at `-1` offset.
23    state: Vec<Commitment>,
24}
25
26impl Deref for IncrementalRecordCommitmentsState {
27    type Target = Vec<Commitment>;
28
29    #[inline]
30    fn deref(&self) -> &Self::Target {
31        &self.state
32    }
33}
34
35impl DerefMut for IncrementalRecordCommitmentsState {
36    #[inline]
37    fn deref_mut(&mut self) -> &mut Self::Target {
38        &mut self.state
39    }
40}
41
42impl IncrementalRecordCommitmentsState {
43    /// Creates an empty state with space for at least capacity records.
44    pub(super) fn with_capacity(capacity: usize) -> Self {
45        Self {
46            state: Vec::with_capacity(capacity),
47        }
48    }
49
50    /// Clears internal state before start of the next segment
51    pub(super) fn clear(&mut self) {
52        self.state.clear();
53    }
54}
55
56/// Update internal record commitments state based on provided segment.
57pub(super) fn update_record_commitments(
58    incremental_record_commitments: &mut IncrementalRecordCommitmentsState,
59    segment: &Segment,
60    kzg: &Kzg,
61) {
62    segment.encode_to(&mut IncrementalRecordCommitmentsProcessor::new(
63        incremental_record_commitments,
64        kzg,
65    ));
66}
67
68/// Processor is hidden to not expose unnecessary implementation details (like `Output` trait
69/// implementation)
70struct IncrementalRecordCommitmentsProcessor<'a> {
71    /// Number of bytes of recorded history segment for which commitments were already created
72    skip_bytes: usize,
73    /// Buffer where new bytes for which commitments need to be created are pushed
74    buffer: Vec<u8>,
75    /// Record commitments already created
76    incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
77    /// Kzg instance used for commitments creation
78    kzg: &'a Kzg,
79}
80
81impl Drop for IncrementalRecordCommitmentsProcessor<'_> {
82    fn drop(&mut self) {
83        #[cfg(not(feature = "parallel"))]
84        let raw_records_bytes = self.buffer.chunks_exact(RawRecord::SIZE);
85        #[cfg(feature = "parallel")]
86        let raw_records_bytes = self.buffer.par_chunks_exact(RawRecord::SIZE);
87
88        let iter = raw_records_bytes
89            .map(|raw_record_bytes| {
90                raw_record_bytes
91                    .as_chunks::<{ ScalarBytes::SAFE_BYTES }>()
92                    .0
93                    .iter()
94                    .map(Scalar::from)
95            })
96            .map(|record_chunks| {
97                let number_of_chunks = record_chunks.len();
98                let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());
99
100                record_chunks.collect_into(&mut scalars);
101
102                // Number of scalars for KZG must be a power of two elements
103                scalars.resize(scalars.capacity(), Scalar::default());
104
105                let polynomial = self
106                    .kzg
107                    .poly(&scalars)
108                    .expect("KZG instance must be configured to support this many scalars; qed");
109                self.kzg
110                    .commit(&polynomial)
111                    .expect("KZG instance must be configured to support this many scalars; qed")
112            });
113
114        #[cfg(not(feature = "parallel"))]
115        iter.collect_into(&mut self.incremental_record_commitments.state);
116        // TODO: `collect_into_vec()`, unfortunately, truncates input, which is not what we want
117        //  can be unified when https://github.com/rayon-rs/rayon/issues/1039 is resolved
118        #[cfg(feature = "parallel")]
119        self.incremental_record_commitments.par_extend(iter);
120    }
121}
122
123impl Output for IncrementalRecordCommitmentsProcessor<'_> {
124    fn write(&mut self, mut bytes: &[u8]) {
125        if self.skip_bytes >= bytes.len() {
126            self.skip_bytes -= bytes.len();
127        } else {
128            bytes = &bytes[self.skip_bytes..];
129            self.skip_bytes = 0;
130            self.buffer.extend_from_slice(bytes);
131        }
132    }
133}
134
135impl<'a> IncrementalRecordCommitmentsProcessor<'a> {
136    fn new(
137        incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
138        kzg: &'a Kzg,
139    ) -> Self {
140        Self {
141            skip_bytes: incremental_record_commitments.len() * RawRecord::SIZE,
142            // Default to record size, may grow if necessary
143            buffer: Vec::with_capacity(RawRecord::SIZE),
144            incremental_record_commitments,
145            kzg,
146        }
147    }
148}