1use crate::sector::{
8 sector_record_chunks_size, RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError,
9 SectorMetadataChecksummed,
10};
11use crate::{ReadAt, ReadAtAsync, ReadAtSync};
12use futures::stream::FuturesUnordered;
13use futures::StreamExt;
14use parity_scale_codec::Decode;
15use rayon::prelude::*;
16use std::mem::ManuallyDrop;
17use std::simd::Simd;
18use std::str::FromStr;
19use std::{fmt, io};
20use subspace_core_primitives::hashes::blake3_hash;
21use subspace_core_primitives::pieces::{Piece, PieceOffset, Record};
22use subspace_core_primitives::sectors::{SBucket, SectorId};
23use subspace_core_primitives::ScalarBytes;
24use subspace_erasure_coding::ErasureCoding;
25use subspace_kzg::Scalar;
26use subspace_proof_of_space::{Table, TableGenerator};
27use thiserror::Error;
28use tracing::debug;
29
30#[derive(Debug, Error)]
32pub enum ReadingError {
33 #[error("Failed to read chunk at location {chunk_location}: {error}")]
38 FailedToReadChunk {
39 chunk_location: u64,
41 error: io::Error,
43 },
44 #[error("Missing PoS proof for s-bucket {s_bucket}")]
49 MissingPosProof {
50 s_bucket: SBucket,
52 },
53 #[error(
55 "Invalid chunk at location {chunk_location} s-bucket {s_bucket} encoded \
56 {encoded_chunk_used}, possible disk corruption: {error}"
57 )]
58 InvalidChunk {
59 s_bucket: SBucket,
61 encoded_chunk_used: bool,
63 chunk_location: u64,
65 error: String,
67 },
68 #[error("Failed to erasure-decode record at offset {piece_offset}: {error}")]
70 FailedToErasureDecodeRecord {
71 piece_offset: PieceOffset,
73 error: String,
75 },
76 #[error("Wrong record size after decoding: expected {expected}, actual {actual}")]
78 WrongRecordSizeAfterDecoding {
79 expected: usize,
81 actual: usize,
83 },
84 #[error("Failed to decode sector contents map: {0}")]
86 FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
87 #[error("Reading I/O error: {0}")]
89 Io(#[from] io::Error),
90 #[error("Checksum mismatch")]
92 ChecksumMismatch,
93}
94
95impl ReadingError {
96 pub fn is_fatal(&self) -> bool {
98 match self {
99 ReadingError::FailedToReadChunk { .. } => false,
100 ReadingError::MissingPosProof { .. } => false,
101 ReadingError::InvalidChunk { .. } => false,
102 ReadingError::FailedToErasureDecodeRecord { .. } => false,
103 ReadingError::WrongRecordSizeAfterDecoding { .. } => false,
104 ReadingError::FailedToDecodeSectorContentsMap(_) => false,
105 ReadingError::Io(_) => true,
106 ReadingError::ChecksumMismatch => false,
107 }
108 }
109}
110
111#[derive(Debug, Copy, Clone)]
116pub enum ReadSectorRecordChunksMode {
117 ConcurrentChunks,
120 WholeSector,
123}
124
125impl fmt::Display for ReadSectorRecordChunksMode {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 match self {
128 Self::ConcurrentChunks => {
129 write!(f, "ConcurrentChunks")
130 }
131 Self::WholeSector => {
132 write!(f, "WholeSector")
133 }
134 }
135 }
136}
137
138impl FromStr for ReadSectorRecordChunksMode {
139 type Err = String;
140
141 fn from_str(s: &str) -> Result<Self, Self::Err> {
142 match s {
143 "ConcurrentChunks" => Ok(Self::ConcurrentChunks),
144 "WholeSector" => Ok(Self::WholeSector),
145 s => Err(format!("Can't parse {s} as `ReadSectorRecordChunksMode`")),
146 }
147 }
148}
149
150pub async fn read_sector_record_chunks<PosTable, S, A>(
155 piece_offset: PieceOffset,
156 pieces_in_sector: u16,
157 s_bucket_offsets: &[u32; Record::NUM_S_BUCKETS],
158 sector_contents_map: &SectorContentsMap,
159 pos_table: &PosTable,
160 sector: &ReadAt<S, A>,
161 mode: ReadSectorRecordChunksMode,
162) -> Result<Box<[Option<Scalar>; Record::NUM_S_BUCKETS]>, ReadingError>
163where
164 PosTable: Table,
165 S: ReadAtSync,
166 A: ReadAtAsync,
167{
168 let mut record_chunks = Box::<[Option<Scalar>; Record::NUM_S_BUCKETS]>::try_from(
169 vec![None::<Scalar>; Record::NUM_S_BUCKETS].into_boxed_slice(),
170 )
171 .expect("Correct size; qed");
172
173 let read_chunks_inputs = record_chunks
174 .par_iter_mut()
175 .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset))
176 .zip(
177 (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
178 .into_par_iter()
179 .map(SBucket::from)
180 .zip(s_bucket_offsets.par_iter()),
181 )
182 .map(
183 |((maybe_record_chunk, maybe_chunk_details), (s_bucket, &s_bucket_offset))| {
184 let (chunk_offset, encoded_chunk_used) = maybe_chunk_details?;
185
186 let chunk_location = chunk_offset as u64 + u64::from(s_bucket_offset);
187
188 Some((
189 maybe_record_chunk,
190 chunk_location,
191 encoded_chunk_used,
192 s_bucket,
193 ))
194 },
195 )
196 .collect::<Vec<_>>();
197
198 let sector_contents_map_size = SectorContentsMap::encoded_size(pieces_in_sector) as u64;
199 let sector_bytes = match mode {
200 ReadSectorRecordChunksMode::ConcurrentChunks => None,
201 ReadSectorRecordChunksMode::WholeSector => {
202 Some(vec![0u8; crate::sector::sector_size(pieces_in_sector)])
203 }
204 };
205 match sector {
206 ReadAt::Sync(sector) => {
207 let sector_bytes = {
208 if let Some(mut sector_bytes) = sector_bytes {
209 sector.read_at(&mut sector_bytes, 0)?;
210 Some(sector_bytes)
211 } else {
212 None
213 }
214 };
215 read_chunks_inputs.into_par_iter().flatten().try_for_each(
216 |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| {
217 let mut record_chunk = [0; ScalarBytes::FULL_BYTES];
218 if let Some(sector_bytes) = §or_bytes {
219 record_chunk.copy_from_slice(
220 §or_bytes[sector_contents_map_size as usize
221 + chunk_location as usize * ScalarBytes::FULL_BYTES..]
222 [..ScalarBytes::FULL_BYTES],
223 );
224 } else {
225 sector
226 .read_at(
227 &mut record_chunk,
228 sector_contents_map_size
229 + chunk_location * ScalarBytes::FULL_BYTES as u64,
230 )
231 .map_err(|error| ReadingError::FailedToReadChunk {
232 chunk_location,
233 error,
234 })?;
235 }
236
237 if encoded_chunk_used {
239 let proof = pos_table
240 .find_proof(s_bucket.into())
241 .ok_or(ReadingError::MissingPosProof { s_bucket })?;
242
243 record_chunk =
244 Simd::to_array(Simd::from(record_chunk) ^ Simd::from(*proof.hash()));
245 }
246
247 maybe_record_chunk.replace(Scalar::try_from(record_chunk).map_err(
248 |error| ReadingError::InvalidChunk {
249 s_bucket,
250 encoded_chunk_used,
251 chunk_location,
252 error,
253 },
254 )?);
255
256 Ok::<_, ReadingError>(())
257 },
258 )?;
259 }
260 ReadAt::Async(sector) => {
261 let sector_bytes = &{
262 if let Some(sector_bytes) = sector_bytes {
263 Some(sector.read_at(sector_bytes, 0).await?)
264 } else {
265 None
266 }
267 };
268 let processing_chunks = read_chunks_inputs
269 .into_iter()
270 .flatten()
271 .map(
272 |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| async move {
273 let mut record_chunk = [0; ScalarBytes::FULL_BYTES];
274 if let Some(sector_bytes) = §or_bytes {
275 record_chunk.copy_from_slice(
276 §or_bytes[sector_contents_map_size as usize
277 + chunk_location as usize * ScalarBytes::FULL_BYTES..]
278 [..ScalarBytes::FULL_BYTES],
279 );
280 } else {
281 record_chunk.copy_from_slice(
282 §or
283 .read_at(
284 vec![0; ScalarBytes::FULL_BYTES],
285 sector_contents_map_size + chunk_location * ScalarBytes::FULL_BYTES as u64,
286 )
287 .await
288 .map_err(|error| ReadingError::FailedToReadChunk {
289 chunk_location,
290 error,
291 })?
292 );
293 }
294
295
296 if encoded_chunk_used {
298 let proof = pos_table.find_proof(s_bucket.into())
299 .ok_or(ReadingError::MissingPosProof { s_bucket })?;
300
301 record_chunk = Simd::to_array(
302 Simd::from(record_chunk) ^ Simd::from(*proof.hash()),
303 );
304 }
305
306 maybe_record_chunk.replace(Scalar::try_from(record_chunk).map_err(
307 |error| ReadingError::InvalidChunk {
308 s_bucket,
309 encoded_chunk_used,
310 chunk_location,
311 error,
312 },
313 )?);
314
315 Ok::<_, ReadingError>(())
316 },
317 )
318 .collect::<FuturesUnordered<_>>()
319 .filter_map(|result| async move {
320 result.err()
321 });
322
323 std::pin::pin!(processing_chunks)
324 .next()
325 .await
326 .map_or(Ok(()), Err)?;
327 }
328 }
329
330 Ok(record_chunks)
331}
332
333pub fn recover_extended_record_chunks(
335 sector_record_chunks: &[Option<Scalar>; Record::NUM_S_BUCKETS],
336 piece_offset: PieceOffset,
337 erasure_coding: &ErasureCoding,
338) -> Result<Box<[Scalar; Record::NUM_S_BUCKETS]>, ReadingError> {
339 let record_chunks = erasure_coding
342 .recover(sector_record_chunks)
343 .map_err(|error| ReadingError::FailedToErasureDecodeRecord {
344 piece_offset,
345 error,
346 })?;
347
348 if record_chunks.len() != Record::NUM_S_BUCKETS {
350 return Err(ReadingError::WrongRecordSizeAfterDecoding {
351 expected: Record::NUM_S_BUCKETS,
352 actual: record_chunks.len(),
353 });
354 }
355
356 let record_chunks = record_chunks.into_iter().collect::<Box<_>>();
359 let mut record_chunks = ManuallyDrop::new(record_chunks);
360 let record_chunks = unsafe { Box::from_raw(record_chunks.as_mut_ptr() as *mut _) };
362
363 Ok(record_chunks)
364}
365
366pub fn recover_source_record_chunks(
368 sector_record_chunks: &[Option<Scalar>; Record::NUM_S_BUCKETS],
369 piece_offset: PieceOffset,
370 erasure_coding: &ErasureCoding,
371) -> Result<impl ExactSizeIterator<Item = Scalar>, ReadingError> {
372 let record_chunks = erasure_coding
374 .recover_source(sector_record_chunks)
375 .map_err(|error| ReadingError::FailedToErasureDecodeRecord {
376 piece_offset,
377 error,
378 })?;
379
380 if record_chunks.len() != Record::NUM_CHUNKS {
382 return Err(ReadingError::WrongRecordSizeAfterDecoding {
383 expected: Record::NUM_CHUNKS,
384 actual: record_chunks.len(),
385 });
386 }
387
388 Ok(record_chunks)
389}
390
391pub(crate) async fn read_record_metadata<S, A>(
393 piece_offset: PieceOffset,
394 pieces_in_sector: u16,
395 sector: &ReadAt<S, A>,
396) -> Result<RecordMetadata, ReadingError>
397where
398 S: ReadAtSync,
399 A: ReadAtAsync,
400{
401 let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) as u64
402 + sector_record_chunks_size(pieces_in_sector) as u64;
403 let record_metadata_offset =
405 sector_metadata_start + RecordMetadata::encoded_size() as u64 * u64::from(piece_offset);
406
407 let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()];
408 match sector {
409 ReadAt::Sync(sector) => {
410 sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?;
411 }
412 ReadAt::Async(sector) => {
413 record_metadata_bytes = sector
414 .read_at(record_metadata_bytes, record_metadata_offset)
415 .await?;
416 }
417 }
418 let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref())
419 .expect("Length is correct, contents doesn't have specific structure to it; qed");
420
421 Ok(record_metadata)
422}
423
424pub async fn read_piece<PosTable, S, A>(
429 piece_offset: PieceOffset,
430 sector_id: &SectorId,
431 sector_metadata: &SectorMetadataChecksummed,
432 sector: &ReadAt<S, A>,
433 erasure_coding: &ErasureCoding,
434 mode: ReadSectorRecordChunksMode,
435 table_generator: &mut PosTable::Generator,
436) -> Result<Piece, ReadingError>
437where
438 PosTable: Table,
439 S: ReadAtSync,
440 A: ReadAtAsync,
441{
442 let pieces_in_sector = sector_metadata.pieces_in_sector;
443
444 let sector_contents_map = {
445 let mut sector_contents_map_bytes =
446 vec![0; SectorContentsMap::encoded_size(pieces_in_sector)];
447 match sector {
448 ReadAt::Sync(sector) => {
449 sector.read_at(&mut sector_contents_map_bytes, 0)?;
450 }
451 ReadAt::Async(sector) => {
452 sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?;
453 }
454 }
455
456 SectorContentsMap::from_bytes(§or_contents_map_bytes, pieces_in_sector)?
457 };
458
459 let sector_record_chunks = read_sector_record_chunks(
460 piece_offset,
461 pieces_in_sector,
462 §or_metadata.s_bucket_offsets(),
463 §or_contents_map,
464 &table_generator.generate(§or_id.derive_evaluation_seed(piece_offset)),
465 sector,
466 mode,
467 )
468 .await?;
469 let record_chunks =
471 recover_source_record_chunks(§or_record_chunks, piece_offset, erasure_coding)?;
472
473 let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?;
474
475 let mut piece = Piece::default();
476
477 piece
478 .record_mut()
479 .iter_mut()
480 .zip(record_chunks)
481 .for_each(|(output, input)| {
482 *output = input.to_bytes();
483 });
484
485 *piece.commitment_mut() = record_metadata.commitment;
486 *piece.witness_mut() = record_metadata.witness;
487
488 let actual_checksum = blake3_hash(piece.as_ref());
490 if actual_checksum != record_metadata.piece_checksum {
491 debug!(
492 ?sector_id,
493 %piece_offset,
494 actual_checksum = %hex::encode(actual_checksum),
495 expected_checksum = %hex::encode(record_metadata.piece_checksum),
496 "Hash doesn't match, plotted piece is corrupted"
497 );
498
499 return Err(ReadingError::ChecksumMismatch);
500 }
501
502 Ok(piece.to_shared())
503}