subspace_archiving/
reconstructor.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::archiver::{Segment, SegmentItem};
5#[cfg(not(feature = "std"))]
6use alloc::string::String;
7#[cfg(not(feature = "std"))]
8use alloc::vec::Vec;
9use core::mem;
10use parity_scale_codec::Decode;
11use subspace_core_primitives::pieces::{Piece, RawRecord};
12use subspace_core_primitives::segments::{
13    ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment,
14    SegmentHeader, SegmentIndex,
15};
16use subspace_core_primitives::BlockNumber;
17use subspace_erasure_coding::ErasureCoding;
18use subspace_kzg::Scalar;
19
20/// Reconstructor-related instantiation error
21#[derive(Debug, Clone, PartialEq, thiserror::Error)]
22pub enum ReconstructorError {
23    /// Error during data shards reconstruction
24    #[error("Error during data shards reconstruction: {0}")]
25    DataShardsReconstruction(String),
26    /// Segment size is not bigger than record size
27    #[error("Error during segment decoding: {0}")]
28    SegmentDecoding(parity_scale_codec::Error),
29    /// Incorrect segment order, each next segment must have monotonically increasing segment index
30    #[error(
31        "Incorrect segment order, expected index {expected_segment_index}, actual \
32        {actual_segment_index}"
33    )]
34    IncorrectSegmentOrder {
35        expected_segment_index: SegmentIndex,
36        actual_segment_index: SegmentIndex,
37    },
38}
39
40/// Data structure that contains information reconstructed from given segment (potentially using
41/// information from segments that were added previously)
42#[derive(Debug, Default, Clone, Eq, PartialEq)]
43pub struct ReconstructedContents {
44    /// Segment header stored in a segment
45    pub segment_header: Option<SegmentHeader>,
46    /// Reconstructed encoded blocks with their block numbers
47    pub blocks: Vec<(BlockNumber, Vec<u8>)>,
48}
49
50/// Reconstructor helps to retrieve blocks from archived pieces.
51#[derive(Debug, Clone)]
52pub struct Reconstructor {
53    /// Erasure coding data structure
54    erasure_coding: ErasureCoding,
55    /// Index of last segment added to reconstructor
56    last_segment_index: Option<SegmentIndex>,
57    /// Partially reconstructed block waiting for more data
58    partial_block: Option<Vec<u8>>,
59}
60
61impl Reconstructor {
62    /// Create a new instance
63    pub fn new(erasure_coding: ErasureCoding) -> Self {
64        Self {
65            erasure_coding,
66            last_segment_index: None,
67            partial_block: None,
68        }
69    }
70
71    /// Given a set of pieces of a segment of the archived history (any half of all pieces are
72    /// required to be present, the rest will be recovered automatically due to use of erasure
73    /// coding if needed), reconstructs and returns the segment itself.
74    ///
75    /// Does not modify the internal state of the reconstructor.
76    pub fn reconstruct_segment(
77        &self,
78        segment_pieces: &[Option<Piece>],
79    ) -> Result<Segment, ReconstructorError> {
80        let mut segment_data = RecordedHistorySegment::new_boxed();
81
82        if !segment_pieces
83            .iter()
84            // Take each source shards here
85            .step_by(2)
86            .zip(segment_data.iter_mut())
87            .all(|(maybe_piece, raw_record)| {
88                if let Some(piece) = maybe_piece {
89                    piece
90                        .record()
91                        .to_raw_record_chunks()
92                        .zip(raw_record.iter_mut())
93                        .for_each(|(source, target)| {
94                            target.copy_from_slice(source);
95                        });
96                    true
97                } else {
98                    false
99                }
100            })
101        {
102            // If not all data pieces are available, need to reconstruct data shards using erasure
103            // coding.
104
105            // Scratch buffer to avoid re-allocation
106            let mut tmp_shards_scalars =
107                Vec::<Option<Scalar>>::with_capacity(ArchivedHistorySegment::NUM_PIECES);
108            // Iterate over the chunks of `ScalarBytes::SAFE_BYTES` bytes of all records
109            for record_offset in 0..RawRecord::NUM_CHUNKS {
110                // Collect chunks of each record at the same offset
111                for maybe_piece in segment_pieces.iter() {
112                    let maybe_scalar = maybe_piece
113                        .as_ref()
114                        .map(|piece| {
115                            piece
116                                .record()
117                                .get(record_offset)
118                                .expect("Statically guaranteed to exist in a piece; qed")
119                        })
120                        .map(Scalar::try_from)
121                        .transpose()
122                        .map_err(ReconstructorError::DataShardsReconstruction)?;
123
124                    tmp_shards_scalars.push(maybe_scalar);
125                }
126
127                self.erasure_coding
128                    .recover(&tmp_shards_scalars)
129                    .map_err(ReconstructorError::DataShardsReconstruction)?
130                    .into_iter()
131                    // Take each source shards here
132                    .step_by(2)
133                    .zip(segment_data.iter_mut().map(|raw_record| {
134                        raw_record
135                            .get_mut(record_offset)
136                            .expect("Statically guaranteed to exist in a piece; qed")
137                    }))
138                    .for_each(|(source_scalar, segment_data)| {
139                        segment_data.copy_from_slice(
140                            &source_scalar
141                                .try_to_safe_bytes()
142                                .expect("Source scalar has only safe bytes; qed"),
143                        );
144                    });
145
146                tmp_shards_scalars.clear();
147            }
148        }
149
150        let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref()))
151            .map_err(ReconstructorError::SegmentDecoding)?;
152
153        Ok(segment)
154    }
155
156    /// Given a set of pieces of a segment of the archived history (any half of all pieces are
157    /// required to be present, the rest will be recovered automatically due to use of erasure
158    /// coding if needed), reconstructs and returns segment header and a list of encoded blocks with
159    /// corresponding block numbers.
160    ///
161    /// It is possible to start with any segment, but when next segment is pushed, it needs to
162    /// follow the previous one or else error will be returned.
163    pub fn add_segment(
164        &mut self,
165        segment_pieces: &[Option<Piece>],
166    ) -> Result<ReconstructedContents, ReconstructorError> {
167        let items = self.reconstruct_segment(segment_pieces)?.into_items();
168
169        let mut reconstructed_contents = ReconstructedContents::default();
170        let mut next_block_number = 0;
171        let mut partial_block = self.partial_block.take().unwrap_or_default();
172
173        for segment_item in items {
174            match segment_item {
175                SegmentItem::Padding => {
176                    // Doesn't contain anything
177                }
178                SegmentItem::Block { bytes, .. } => {
179                    if !partial_block.is_empty() {
180                        reconstructed_contents
181                            .blocks
182                            .push((next_block_number, mem::take(&mut partial_block)));
183
184                        next_block_number += 1;
185                    }
186
187                    reconstructed_contents
188                        .blocks
189                        .push((next_block_number, bytes));
190
191                    next_block_number += 1;
192                }
193                SegmentItem::BlockStart { bytes, .. } => {
194                    if !partial_block.is_empty() {
195                        reconstructed_contents
196                            .blocks
197                            .push((next_block_number, mem::take(&mut partial_block)));
198
199                        next_block_number += 1;
200                    }
201
202                    partial_block = bytes;
203                }
204                SegmentItem::BlockContinuation { bytes, .. } => {
205                    if partial_block.is_empty() {
206                        // This is continuation from previous segment, we don't have the beginning
207                        // of the block to continue.
208                        continue;
209                    }
210
211                    partial_block.extend_from_slice(&bytes);
212                }
213                SegmentItem::ParentSegmentHeader(segment_header) => {
214                    let segment_index = segment_header.segment_index();
215
216                    if let Some(last_segment_index) = self.last_segment_index {
217                        if last_segment_index != segment_index {
218                            return Err(ReconstructorError::IncorrectSegmentOrder {
219                                expected_segment_index: last_segment_index + SegmentIndex::ONE,
220                                actual_segment_index: segment_index + SegmentIndex::ONE,
221                            });
222                        }
223                    }
224
225                    self.last_segment_index
226                        .replace(segment_index + SegmentIndex::ONE);
227
228                    let LastArchivedBlock {
229                        number,
230                        archived_progress,
231                    } = segment_header.last_archived_block();
232
233                    reconstructed_contents
234                        .segment_header
235                        .replace(segment_header);
236
237                    match archived_progress {
238                        ArchivedBlockProgress::Complete => {
239                            reconstructed_contents
240                                .blocks
241                                .push((next_block_number, mem::take(&mut partial_block)));
242
243                            next_block_number = number + 1;
244                        }
245                        ArchivedBlockProgress::Partial(_bytes) => {
246                            next_block_number = number;
247
248                            if partial_block.is_empty() {
249                                // Will not be able to recover full block, bump right away.
250                                next_block_number += 1;
251                            }
252                        }
253                    }
254                }
255            }
256        }
257
258        if !partial_block.is_empty() {
259            self.partial_block.replace(partial_block);
260        }
261
262        if self.last_segment_index.is_none() {
263            self.last_segment_index.replace(SegmentIndex::ZERO);
264        }
265
266        Ok(reconstructed_contents)
267    }
268}