subspace_archiving/
reconstructor.rs1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::archiver::{Segment, SegmentItem};
5#[cfg(not(feature = "std"))]
6use alloc::string::String;
7#[cfg(not(feature = "std"))]
8use alloc::vec::Vec;
9use core::mem;
10use parity_scale_codec::Decode;
11use subspace_core_primitives::pieces::{Piece, RawRecord};
12use subspace_core_primitives::segments::{
13 ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment,
14 SegmentHeader, SegmentIndex,
15};
16use subspace_core_primitives::BlockNumber;
17use subspace_erasure_coding::ErasureCoding;
18use subspace_kzg::Scalar;
19
20#[derive(Debug, Clone, PartialEq, thiserror::Error)]
22pub enum ReconstructorError {
23 #[error("Error during data shards reconstruction: {0}")]
25 DataShardsReconstruction(String),
26 #[error("Error during segment decoding: {0}")]
28 SegmentDecoding(parity_scale_codec::Error),
29 #[error(
31 "Incorrect segment order, expected index {expected_segment_index}, actual \
32 {actual_segment_index}"
33 )]
34 IncorrectSegmentOrder {
35 expected_segment_index: SegmentIndex,
36 actual_segment_index: SegmentIndex,
37 },
38}
39
40#[derive(Debug, Default, Clone, Eq, PartialEq)]
43pub struct ReconstructedContents {
44 pub segment_header: Option<SegmentHeader>,
46 pub blocks: Vec<(BlockNumber, Vec<u8>)>,
48}
49
50#[derive(Debug, Clone)]
52pub struct Reconstructor {
53 erasure_coding: ErasureCoding,
55 last_segment_index: Option<SegmentIndex>,
57 partial_block: Option<Vec<u8>>,
59}
60
61impl Reconstructor {
62 pub fn new(erasure_coding: ErasureCoding) -> Self {
64 Self {
65 erasure_coding,
66 last_segment_index: None,
67 partial_block: None,
68 }
69 }
70
71 pub fn reconstruct_segment(
77 &self,
78 segment_pieces: &[Option<Piece>],
79 ) -> Result<Segment, ReconstructorError> {
80 let mut segment_data = RecordedHistorySegment::new_boxed();
81
82 if !segment_pieces
83 .iter()
84 .step_by(2)
86 .zip(segment_data.iter_mut())
87 .all(|(maybe_piece, raw_record)| {
88 if let Some(piece) = maybe_piece {
89 piece
90 .record()
91 .to_raw_record_chunks()
92 .zip(raw_record.iter_mut())
93 .for_each(|(source, target)| {
94 target.copy_from_slice(source);
95 });
96 true
97 } else {
98 false
99 }
100 })
101 {
102 let mut tmp_shards_scalars =
107 Vec::<Option<Scalar>>::with_capacity(ArchivedHistorySegment::NUM_PIECES);
108 for record_offset in 0..RawRecord::NUM_CHUNKS {
110 for maybe_piece in segment_pieces.iter() {
112 let maybe_scalar = maybe_piece
113 .as_ref()
114 .map(|piece| {
115 piece
116 .record()
117 .get(record_offset)
118 .expect("Statically guaranteed to exist in a piece; qed")
119 })
120 .map(Scalar::try_from)
121 .transpose()
122 .map_err(ReconstructorError::DataShardsReconstruction)?;
123
124 tmp_shards_scalars.push(maybe_scalar);
125 }
126
127 self.erasure_coding
128 .recover(&tmp_shards_scalars)
129 .map_err(ReconstructorError::DataShardsReconstruction)?
130 .into_iter()
131 .step_by(2)
133 .zip(segment_data.iter_mut().map(|raw_record| {
134 raw_record
135 .get_mut(record_offset)
136 .expect("Statically guaranteed to exist in a piece; qed")
137 }))
138 .for_each(|(source_scalar, segment_data)| {
139 segment_data.copy_from_slice(
140 &source_scalar
141 .try_to_safe_bytes()
142 .expect("Source scalar has only safe bytes; qed"),
143 );
144 });
145
146 tmp_shards_scalars.clear();
147 }
148 }
149
150 let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref()))
151 .map_err(ReconstructorError::SegmentDecoding)?;
152
153 Ok(segment)
154 }
155
156 pub fn add_segment(
164 &mut self,
165 segment_pieces: &[Option<Piece>],
166 ) -> Result<ReconstructedContents, ReconstructorError> {
167 let items = self.reconstruct_segment(segment_pieces)?.into_items();
168
169 let mut reconstructed_contents = ReconstructedContents::default();
170 let mut next_block_number = 0;
171 let mut partial_block = self.partial_block.take().unwrap_or_default();
172
173 for segment_item in items {
174 match segment_item {
175 SegmentItem::Padding => {
176 }
178 SegmentItem::Block { bytes, .. } => {
179 if !partial_block.is_empty() {
180 reconstructed_contents
181 .blocks
182 .push((next_block_number, mem::take(&mut partial_block)));
183
184 next_block_number += 1;
185 }
186
187 reconstructed_contents
188 .blocks
189 .push((next_block_number, bytes));
190
191 next_block_number += 1;
192 }
193 SegmentItem::BlockStart { bytes, .. } => {
194 if !partial_block.is_empty() {
195 reconstructed_contents
196 .blocks
197 .push((next_block_number, mem::take(&mut partial_block)));
198
199 next_block_number += 1;
200 }
201
202 partial_block = bytes;
203 }
204 SegmentItem::BlockContinuation { bytes, .. } => {
205 if partial_block.is_empty() {
206 continue;
209 }
210
211 partial_block.extend_from_slice(&bytes);
212 }
213 SegmentItem::ParentSegmentHeader(segment_header) => {
214 let segment_index = segment_header.segment_index();
215
216 if let Some(last_segment_index) = self.last_segment_index {
217 if last_segment_index != segment_index {
218 return Err(ReconstructorError::IncorrectSegmentOrder {
219 expected_segment_index: last_segment_index + SegmentIndex::ONE,
220 actual_segment_index: segment_index + SegmentIndex::ONE,
221 });
222 }
223 }
224
225 self.last_segment_index
226 .replace(segment_index + SegmentIndex::ONE);
227
228 let LastArchivedBlock {
229 number,
230 archived_progress,
231 } = segment_header.last_archived_block();
232
233 reconstructed_contents
234 .segment_header
235 .replace(segment_header);
236
237 match archived_progress {
238 ArchivedBlockProgress::Complete => {
239 reconstructed_contents
240 .blocks
241 .push((next_block_number, mem::take(&mut partial_block)));
242
243 next_block_number = number + 1;
244 }
245 ArchivedBlockProgress::Partial(_bytes) => {
246 next_block_number = number;
247
248 if partial_block.is_empty() {
249 next_block_number += 1;
251 }
252 }
253 }
254 }
255 }
256 }
257
258 if !partial_block.is_empty() {
259 self.partial_block.replace(partial_block);
260 }
261
262 if self.last_segment_index.is_none() {
263 self.last_segment_index.replace(SegmentIndex::ZERO);
264 }
265
266 Ok(reconstructed_contents)
267 }
268}