subspace_archiving/
piece_reconstructor.rs1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4#[cfg(not(feature = "std"))]
5use alloc::string::String;
6#[cfg(not(feature = "std"))]
7use alloc::vec::Vec;
8#[cfg(feature = "parallel")]
9use rayon::prelude::*;
10use subspace_core_primitives::hashes::blake3_254_hash_to_scalar;
11use subspace_core_primitives::pieces::{Piece, RawRecord};
12use subspace_core_primitives::segments::ArchivedHistorySegment;
13use subspace_erasure_coding::ErasureCoding;
14use subspace_kzg::{Commitment, Kzg, Polynomial, Scalar};
15
16#[derive(Debug, Clone, PartialEq, thiserror::Error)]
18pub enum ReconstructorError {
19 #[error("Error during data shards reconstruction: {0}")]
21 DataShardsReconstruction(String),
22
23 #[error("Commitment of input piece is invalid.")]
25 InvalidInputPieceCommitment,
26
27 #[error("Incorrect piece position provided.")]
29 IncorrectPiecePosition,
30}
31
32#[derive(Debug, Clone)]
34pub struct PiecesReconstructor {
35 erasure_coding: ErasureCoding,
37 kzg: Kzg,
39}
40
41impl PiecesReconstructor {
42 pub fn new(kzg: Kzg, erasure_coding: ErasureCoding) -> Self {
44 Self {
45 erasure_coding,
46 kzg,
47 }
48 }
49
50 fn reconstruct_shards(
53 &self,
54 input_pieces: &[Option<Piece>],
55 ) -> Result<(ArchivedHistorySegment, Polynomial), ReconstructorError> {
56 let mut reconstructed_pieces = ArchivedHistorySegment::default();
57
58 let mut tmp_shards_scalars =
60 Vec::<Option<Scalar>>::with_capacity(ArchivedHistorySegment::NUM_PIECES);
61 for record_offset in 0..RawRecord::NUM_CHUNKS {
63 for maybe_piece in input_pieces.iter() {
65 let maybe_scalar = maybe_piece
66 .as_ref()
67 .map(|piece| {
68 piece
69 .record()
70 .get(record_offset)
71 .expect("Statically guaranteed to exist in a piece; qed")
72 })
73 .map(Scalar::try_from)
74 .transpose()
75 .map_err(ReconstructorError::DataShardsReconstruction)?;
76
77 tmp_shards_scalars.push(maybe_scalar);
78 }
79
80 self.erasure_coding
81 .recover(&tmp_shards_scalars)
82 .map_err(ReconstructorError::DataShardsReconstruction)?
83 .into_iter()
84 .zip(reconstructed_pieces.iter_mut().map(|piece| {
85 piece
86 .record_mut()
87 .get_mut(record_offset)
88 .expect("Statically guaranteed to exist in a piece; qed")
89 }))
90 .for_each(|(source_scalar, segment_data)| {
91 segment_data.copy_from_slice(&source_scalar.to_bytes());
92 });
93
94 tmp_shards_scalars.clear();
95 }
96
97 let source_record_commitments = {
98 #[cfg(not(feature = "parallel"))]
99 let iter = reconstructed_pieces.iter_mut().zip(input_pieces).step_by(2);
100 #[cfg(feature = "parallel")]
101 let iter = reconstructed_pieces
102 .par_iter_mut()
103 .zip_eq(input_pieces)
104 .step_by(2);
105
106 iter.map(|(piece, maybe_input_piece)| {
107 if let Some(input_piece) = maybe_input_piece {
108 Commitment::try_from_bytes(input_piece.commitment())
109 .map_err(|_error| ReconstructorError::InvalidInputPieceCommitment)
110 } else {
111 let scalars = {
112 let mut scalars =
113 Vec::with_capacity(piece.record().len().next_power_of_two());
114
115 for record_chunk in piece.record().iter() {
116 scalars.push(
117 Scalar::try_from(record_chunk)
118 .map_err(ReconstructorError::DataShardsReconstruction)?,
119 );
120 }
121
122 scalars.resize(scalars.capacity(), Scalar::default());
124
125 scalars
126 };
127
128 let polynomial = self.kzg.poly(&scalars).expect(
129 "KZG instance must be configured to support this many scalars; qed",
130 );
131 let commitment = self.kzg.commit(&polynomial).expect(
132 "KZG instance must be configured to support this many scalars; qed",
133 );
134
135 Ok(commitment)
136 }
137 })
138 .collect::<Result<Vec<_>, _>>()?
139 };
140 let record_commitments = self
141 .erasure_coding
142 .extend_commitments(&source_record_commitments)
143 .expect(
144 "Erasure coding instance is deliberately configured to support this input; qed",
145 );
146 drop(source_record_commitments);
147
148 let record_commitment_hashes = reconstructed_pieces
149 .iter_mut()
150 .zip(record_commitments)
151 .map(|(reconstructed_piece, commitment)| {
152 let commitment_bytes = commitment.to_bytes();
153 reconstructed_piece
154 .commitment_mut()
155 .copy_from_slice(&commitment_bytes);
156 Scalar::try_from(blake3_254_hash_to_scalar(&commitment_bytes))
157 .expect("Create correctly by dedicated hash function; qed")
158 })
159 .collect::<Vec<_>>();
160
161 let polynomial = self
162 .kzg
163 .poly(&record_commitment_hashes)
164 .expect("Internally produced values must never fail; qed");
165
166 Ok((reconstructed_pieces, polynomial))
167 }
168
169 pub fn reconstruct_segment(
173 &self,
174 segment_pieces: &[Option<Piece>],
175 ) -> Result<ArchivedHistorySegment, ReconstructorError> {
176 let (mut pieces, polynomial) = self.reconstruct_shards(segment_pieces)?;
177
178 #[cfg(not(feature = "parallel"))]
179 let iter = pieces.iter_mut().enumerate();
180 #[cfg(feature = "parallel")]
181 let iter = pieces.par_iter_mut().enumerate();
182
183 iter.for_each(|(position, piece)| {
184 piece.witness_mut().copy_from_slice(
185 &self
186 .kzg
187 .create_witness(
188 &polynomial,
189 ArchivedHistorySegment::NUM_PIECES,
190 position as u32,
191 )
192 .expect("Position is statically known to be valid; qed")
193 .to_bytes(),
194 );
195 });
196
197 Ok(pieces.to_shared())
198 }
199
200 pub fn reconstruct_piece(
203 &self,
204 segment_pieces: &[Option<Piece>],
205 piece_position: usize,
206 ) -> Result<Piece, ReconstructorError> {
207 if piece_position >= ArchivedHistorySegment::NUM_PIECES {
208 return Err(ReconstructorError::IncorrectPiecePosition);
209 }
210
211 let (reconstructed_records, polynomial) = self.reconstruct_shards(segment_pieces)?;
213
214 let mut piece = Piece::from(&reconstructed_records[piece_position]);
215
216 piece.witness_mut().copy_from_slice(
217 &self
218 .kzg
219 .create_witness(
220 &polynomial,
221 ArchivedHistorySegment::NUM_PIECES,
222 piece_position as u32,
223 )
224 .expect("Position is verified to be valid above; qed")
225 .to_bytes(),
226 );
227
228 Ok(piece.to_shared())
229 }
230}