subspace_archiving/archiver/
incremental_record_commitments.rs1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::archiver::Segment;
5#[cfg(not(feature = "std"))]
6use alloc::vec::Vec;
7use core::ops::{Deref, DerefMut};
8use parity_scale_codec::{Encode, Output};
9#[cfg(feature = "parallel")]
10use rayon::prelude::*;
11use subspace_core_primitives::pieces::RawRecord;
12use subspace_core_primitives::ScalarBytes;
13use subspace_kzg::{Commitment, Kzg, Scalar};
14
15#[derive(Debug, Default, Clone)]
18pub(super) struct IncrementalRecordCommitmentsState {
19 state: Vec<Commitment>,
24}
25
26impl Deref for IncrementalRecordCommitmentsState {
27 type Target = Vec<Commitment>;
28
29 #[inline]
30 fn deref(&self) -> &Self::Target {
31 &self.state
32 }
33}
34
35impl DerefMut for IncrementalRecordCommitmentsState {
36 #[inline]
37 fn deref_mut(&mut self) -> &mut Self::Target {
38 &mut self.state
39 }
40}
41
42impl IncrementalRecordCommitmentsState {
43 pub(super) fn with_capacity(capacity: usize) -> Self {
45 Self {
46 state: Vec::with_capacity(capacity),
47 }
48 }
49
50 pub(super) fn clear(&mut self) {
52 self.state.clear();
53 }
54}
55
56pub(super) fn update_record_commitments(
58 incremental_record_commitments: &mut IncrementalRecordCommitmentsState,
59 segment: &Segment,
60 kzg: &Kzg,
61) {
62 segment.encode_to(&mut IncrementalRecordCommitmentsProcessor::new(
63 incremental_record_commitments,
64 kzg,
65 ));
66}
67
68struct IncrementalRecordCommitmentsProcessor<'a> {
71 skip_bytes: usize,
73 buffer: Vec<u8>,
75 incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
77 kzg: &'a Kzg,
79}
80
81impl Drop for IncrementalRecordCommitmentsProcessor<'_> {
82 fn drop(&mut self) {
83 #[cfg(not(feature = "parallel"))]
84 let raw_records_bytes = self.buffer.chunks_exact(RawRecord::SIZE);
85 #[cfg(feature = "parallel")]
86 let raw_records_bytes = self.buffer.par_chunks_exact(RawRecord::SIZE);
87
88 let iter = raw_records_bytes
89 .map(|raw_record_bytes| {
90 raw_record_bytes
91 .array_chunks::<{ ScalarBytes::SAFE_BYTES }>()
92 .map(Scalar::from)
93 })
94 .map(|record_chunks| {
95 let number_of_chunks = record_chunks.len();
96 let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());
97
98 record_chunks.collect_into(&mut scalars);
99
100 scalars.resize(scalars.capacity(), Scalar::default());
102
103 let polynomial = self
104 .kzg
105 .poly(&scalars)
106 .expect("KZG instance must be configured to support this many scalars; qed");
107 self.kzg
108 .commit(&polynomial)
109 .expect("KZG instance must be configured to support this many scalars; qed")
110 });
111
112 #[cfg(not(feature = "parallel"))]
113 iter.collect_into(&mut self.incremental_record_commitments.state);
114 #[cfg(feature = "parallel")]
117 self.incremental_record_commitments.par_extend(iter);
118 }
119}
120
121impl Output for IncrementalRecordCommitmentsProcessor<'_> {
122 fn write(&mut self, mut bytes: &[u8]) {
123 if self.skip_bytes >= bytes.len() {
124 self.skip_bytes -= bytes.len();
125 } else {
126 bytes = &bytes[self.skip_bytes..];
127 self.skip_bytes = 0;
128 self.buffer.extend_from_slice(bytes);
129 }
130 }
131}
132
133impl<'a> IncrementalRecordCommitmentsProcessor<'a> {
134 fn new(
135 incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
136 kzg: &'a Kzg,
137 ) -> Self {
138 Self {
139 skip_bytes: incremental_record_commitments.len() * RawRecord::SIZE,
140 buffer: Vec::with_capacity(RawRecord::SIZE),
142 incremental_record_commitments,
143 kzg,
144 }
145 }
146}