subspace_archiving/
piece_reconstructor.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4#[cfg(not(feature = "std"))]
5use alloc::string::String;
6#[cfg(not(feature = "std"))]
7use alloc::vec::Vec;
8#[cfg(feature = "parallel")]
9use rayon::prelude::*;
10use subspace_core_primitives::hashes::blake3_254_hash_to_scalar;
11use subspace_core_primitives::pieces::{Piece, RawRecord};
12use subspace_core_primitives::segments::ArchivedHistorySegment;
13use subspace_erasure_coding::ErasureCoding;
14use subspace_kzg::{Commitment, Kzg, Polynomial, Scalar};
15
16/// Reconstructor-related instantiation error
17#[derive(Debug, Clone, PartialEq, thiserror::Error)]
18pub enum ReconstructorError {
19    /// Segment size is not bigger than record size
20    #[error("Error during data shards reconstruction: {0}")]
21    DataShardsReconstruction(String),
22
23    /// Commitment of input piece is invalid.
24    #[error("Commitment of input piece is invalid.")]
25    InvalidInputPieceCommitment,
26
27    /// Incorrect piece position provided.
28    #[error("Incorrect piece position provided.")]
29    IncorrectPiecePosition,
30}
31
32/// Reconstructor helps to retrieve blocks from archived pieces.
33#[derive(Debug, Clone)]
34pub struct PiecesReconstructor {
35    /// Erasure coding data structure
36    erasure_coding: ErasureCoding,
37    /// KZG instance
38    kzg: Kzg,
39}
40
41impl PiecesReconstructor {
42    /// Create a new instance
43    pub fn new(kzg: Kzg, erasure_coding: ErasureCoding) -> Self {
44        Self {
45            erasure_coding,
46            kzg,
47        }
48    }
49
50    /// Returns incomplete pieces (witness missing) and polynomial that can be used to generate
51    /// necessary witnesses later.
52    fn reconstruct_shards(
53        &self,
54        input_pieces: &[Option<Piece>],
55    ) -> Result<(ArchivedHistorySegment, Polynomial), ReconstructorError> {
56        let mut reconstructed_pieces = ArchivedHistorySegment::default();
57
58        // Scratch buffer to avoid re-allocation
59        let mut tmp_shards_scalars =
60            Vec::<Option<Scalar>>::with_capacity(ArchivedHistorySegment::NUM_PIECES);
61        // Iterate over the chunks of `ScalarBytes::SAFE_BYTES` bytes of all records
62        for record_offset in 0..RawRecord::NUM_CHUNKS {
63            // Collect chunks of each record at the same offset
64            for maybe_piece in input_pieces.iter() {
65                let maybe_scalar = maybe_piece
66                    .as_ref()
67                    .map(|piece| {
68                        piece
69                            .record()
70                            .get(record_offset)
71                            .expect("Statically guaranteed to exist in a piece; qed")
72                    })
73                    .map(Scalar::try_from)
74                    .transpose()
75                    .map_err(ReconstructorError::DataShardsReconstruction)?;
76
77                tmp_shards_scalars.push(maybe_scalar);
78            }
79
80            self.erasure_coding
81                .recover(&tmp_shards_scalars)
82                .map_err(ReconstructorError::DataShardsReconstruction)?
83                .into_iter()
84                .zip(reconstructed_pieces.iter_mut().map(|piece| {
85                    piece
86                        .record_mut()
87                        .get_mut(record_offset)
88                        .expect("Statically guaranteed to exist in a piece; qed")
89                }))
90                .for_each(|(source_scalar, segment_data)| {
91                    segment_data.copy_from_slice(&source_scalar.to_bytes());
92                });
93
94            tmp_shards_scalars.clear();
95        }
96
97        let source_record_commitments = {
98            #[cfg(not(feature = "parallel"))]
99            let iter = reconstructed_pieces.iter_mut().zip(input_pieces).step_by(2);
100            #[cfg(feature = "parallel")]
101            let iter = reconstructed_pieces
102                .par_iter_mut()
103                .zip_eq(input_pieces)
104                .step_by(2);
105
106            iter.map(|(piece, maybe_input_piece)| {
107                if let Some(input_piece) = maybe_input_piece {
108                    Commitment::try_from_bytes(input_piece.commitment())
109                        .map_err(|_error| ReconstructorError::InvalidInputPieceCommitment)
110                } else {
111                    let scalars = {
112                        let mut scalars =
113                            Vec::with_capacity(piece.record().len().next_power_of_two());
114
115                        for record_chunk in piece.record().iter() {
116                            scalars.push(
117                                Scalar::try_from(record_chunk)
118                                    .map_err(ReconstructorError::DataShardsReconstruction)?,
119                            );
120                        }
121
122                        // Number of scalars for KZG must be a power of two elements
123                        scalars.resize(scalars.capacity(), Scalar::default());
124
125                        scalars
126                    };
127
128                    let polynomial = self.kzg.poly(&scalars).expect(
129                        "KZG instance must be configured to support this many scalars; qed",
130                    );
131                    let commitment = self.kzg.commit(&polynomial).expect(
132                        "KZG instance must be configured to support this many scalars; qed",
133                    );
134
135                    Ok(commitment)
136                }
137            })
138            .collect::<Result<Vec<_>, _>>()?
139        };
140        let record_commitments = self
141            .erasure_coding
142            .extend_commitments(&source_record_commitments)
143            .expect(
144                "Erasure coding instance is deliberately configured to support this input; qed",
145            );
146        drop(source_record_commitments);
147
148        let record_commitment_hashes = reconstructed_pieces
149            .iter_mut()
150            .zip(record_commitments)
151            .map(|(reconstructed_piece, commitment)| {
152                let commitment_bytes = commitment.to_bytes();
153                reconstructed_piece
154                    .commitment_mut()
155                    .copy_from_slice(&commitment_bytes);
156                Scalar::try_from(blake3_254_hash_to_scalar(&commitment_bytes))
157                    .expect("Create correctly by dedicated hash function; qed")
158            })
159            .collect::<Vec<_>>();
160
161        let polynomial = self
162            .kzg
163            .poly(&record_commitment_hashes)
164            .expect("Internally produced values must never fail; qed");
165
166        Ok((reconstructed_pieces, polynomial))
167    }
168
169    /// Returns all the pieces for a segment using given set of pieces of a segment of the archived
170    /// history (any half of all pieces are required to be present, the rest will be recovered
171    /// automatically due to use of erasure coding if needed).
172    pub fn reconstruct_segment(
173        &self,
174        segment_pieces: &[Option<Piece>],
175    ) -> Result<ArchivedHistorySegment, ReconstructorError> {
176        let (mut pieces, polynomial) = self.reconstruct_shards(segment_pieces)?;
177
178        #[cfg(not(feature = "parallel"))]
179        let iter = pieces.iter_mut().enumerate();
180        #[cfg(feature = "parallel")]
181        let iter = pieces.par_iter_mut().enumerate();
182
183        iter.for_each(|(position, piece)| {
184            piece.witness_mut().copy_from_slice(
185                &self
186                    .kzg
187                    .create_witness(
188                        &polynomial,
189                        ArchivedHistorySegment::NUM_PIECES,
190                        position as u32,
191                    )
192                    .expect("Position is statically known to be valid; qed")
193                    .to_bytes(),
194            );
195        });
196
197        Ok(pieces.to_shared())
198    }
199
200    /// Returns the missing piece for a segment using given set of pieces of a segment of the archived
201    /// history (any half of all pieces are required to be present).
202    pub fn reconstruct_piece(
203        &self,
204        segment_pieces: &[Option<Piece>],
205        piece_position: usize,
206    ) -> Result<Piece, ReconstructorError> {
207        if piece_position >= ArchivedHistorySegment::NUM_PIECES {
208            return Err(ReconstructorError::IncorrectPiecePosition);
209        }
210
211        // TODO: Early exit if position already exists and doesn't need reconstruction
212        let (reconstructed_records, polynomial) = self.reconstruct_shards(segment_pieces)?;
213
214        let mut piece = Piece::from(&reconstructed_records[piece_position]);
215
216        piece.witness_mut().copy_from_slice(
217            &self
218                .kzg
219                .create_witness(
220                    &polynomial,
221                    ArchivedHistorySegment::NUM_PIECES,
222                    piece_position as u32,
223                )
224                .expect("Position is verified to be valid above; qed")
225                .to_bytes(),
226        );
227
228        Ok(piece.to_shared())
229    }
230}