subspace_farmer_components/
reading.rs

1//! Reading utilities
2//!
3//! This module contains utilities for extracting data from plots/sectors created by functions in
4//! [`plotting`](crate::plotting) module earlier. This is a relatively expensive operation and is
5//! only used for cold storage purposes or when there is a need to prove a solution to consensus.
6
7use crate::sector::{
8    sector_record_chunks_size, RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError,
9    SectorMetadataChecksummed,
10};
11use crate::{ReadAt, ReadAtAsync, ReadAtSync};
12use futures::stream::FuturesUnordered;
13use futures::StreamExt;
14use parity_scale_codec::Decode;
15use rayon::prelude::*;
16use std::mem::ManuallyDrop;
17use std::simd::Simd;
18use std::str::FromStr;
19use std::{fmt, io};
20use subspace_core_primitives::hashes::blake3_hash;
21use subspace_core_primitives::pieces::{Piece, PieceOffset, Record};
22use subspace_core_primitives::sectors::{SBucket, SectorId};
23use subspace_core_primitives::ScalarBytes;
24use subspace_erasure_coding::ErasureCoding;
25use subspace_kzg::Scalar;
26use subspace_proof_of_space::{Table, TableGenerator};
27use thiserror::Error;
28use tracing::debug;
29
30/// Errors that happen during reading
31#[derive(Debug, Error)]
32pub enum ReadingError {
33    /// Failed to read chunk.
34    ///
35    /// This is an implementation bug, most likely due to mismatch between sector contents map and
36    /// other farming parameters.
37    #[error("Failed to read chunk at location {chunk_location}: {error}")]
38    FailedToReadChunk {
39        /// Chunk location
40        chunk_location: u64,
41        /// Low-level error
42        error: io::Error,
43    },
44    /// Missing proof of space proof.
45    ///
46    /// This is either hardware issue or if happens for everyone all the time an implementation
47    /// bug.
48    #[error("Missing PoS proof for s-bucket {s_bucket}")]
49    MissingPosProof {
50        /// S-bucket
51        s_bucket: SBucket,
52    },
53    /// Invalid chunk, possible disk corruption
54    #[error(
55        "Invalid chunk at location {chunk_location} s-bucket {s_bucket} encoded \
56        {encoded_chunk_used}, possible disk corruption: {error}"
57    )]
58    InvalidChunk {
59        /// S-bucket
60        s_bucket: SBucket,
61        /// Indicates whether chunk was encoded
62        encoded_chunk_used: bool,
63        /// Chunk location
64        chunk_location: u64,
65        /// Lower-level error
66        error: String,
67    },
68    /// Failed to erasure-decode record
69    #[error("Failed to erasure-decode record at offset {piece_offset}: {error}")]
70    FailedToErasureDecodeRecord {
71        /// Piece offset
72        piece_offset: PieceOffset,
73        /// Lower-level error
74        error: String,
75    },
76    /// Wrong record size after decoding
77    #[error("Wrong record size after decoding: expected {expected}, actual {actual}")]
78    WrongRecordSizeAfterDecoding {
79        /// Expected size in bytes
80        expected: usize,
81        /// Actual size in bytes
82        actual: usize,
83    },
84    /// Failed to decode sector contents map
85    #[error("Failed to decode sector contents map: {0}")]
86    FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
87    /// I/O error occurred
88    #[error("Reading I/O error: {0}")]
89    Io(#[from] io::Error),
90    /// Checksum mismatch
91    #[error("Checksum mismatch")]
92    ChecksumMismatch,
93}
94
95impl ReadingError {
96    /// Whether this error is fatal and renders farm unusable
97    pub fn is_fatal(&self) -> bool {
98        match self {
99            ReadingError::FailedToReadChunk { .. } => false,
100            ReadingError::MissingPosProof { .. } => false,
101            ReadingError::InvalidChunk { .. } => false,
102            ReadingError::FailedToErasureDecodeRecord { .. } => false,
103            ReadingError::WrongRecordSizeAfterDecoding { .. } => false,
104            ReadingError::FailedToDecodeSectorContentsMap(_) => false,
105            ReadingError::Io(_) => true,
106            ReadingError::ChecksumMismatch => false,
107        }
108    }
109}
110
111/// Defines a mode of reading chunks in [`read_sector_record_chunks`].
112///
113/// Which option that is slower or faster depends on disk used, there is no one-size-fits-all here,
114/// unfortunately.
115#[derive(Debug, Copy, Clone)]
116pub enum ReadSectorRecordChunksMode {
117    /// Read individual chunks ([`ScalarBytes::FULL_BYTES`] in size) concurrently, which results in lower
118    /// total data transfer, but requires for SSD to support high concurrency and low latency
119    ConcurrentChunks,
120    /// Read the whole sector at once and extract chunks from in-memory buffer, which uses more
121    /// memory, but only requires linear read speed from the disk to be decent
122    WholeSector,
123}
124
125impl fmt::Display for ReadSectorRecordChunksMode {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        match self {
128            Self::ConcurrentChunks => {
129                write!(f, "ConcurrentChunks")
130            }
131            Self::WholeSector => {
132                write!(f, "WholeSector")
133            }
134        }
135    }
136}
137
138impl FromStr for ReadSectorRecordChunksMode {
139    type Err = String;
140
141    fn from_str(s: &str) -> Result<Self, Self::Err> {
142        match s {
143            "ConcurrentChunks" => Ok(Self::ConcurrentChunks),
144            "WholeSector" => Ok(Self::WholeSector),
145            s => Err(format!("Can't parse {s} as `ReadSectorRecordChunksMode`")),
146        }
147    }
148}
149
150/// Read sector record chunks, only plotted s-buckets are returned (in decoded form).
151///
152/// NOTE: This is an async function, but it also does CPU-intensive operation internally, while it
153/// is not very long, make sure it is okay to do so in your context.
154pub async fn read_sector_record_chunks<PosTable, S, A>(
155    piece_offset: PieceOffset,
156    pieces_in_sector: u16,
157    s_bucket_offsets: &[u32; Record::NUM_S_BUCKETS],
158    sector_contents_map: &SectorContentsMap,
159    pos_table: &PosTable,
160    sector: &ReadAt<S, A>,
161    mode: ReadSectorRecordChunksMode,
162) -> Result<Box<[Option<Scalar>; Record::NUM_S_BUCKETS]>, ReadingError>
163where
164    PosTable: Table,
165    S: ReadAtSync,
166    A: ReadAtAsync,
167{
168    let mut record_chunks = Box::<[Option<Scalar>; Record::NUM_S_BUCKETS]>::try_from(
169        vec![None::<Scalar>; Record::NUM_S_BUCKETS].into_boxed_slice(),
170    )
171    .expect("Correct size; qed");
172
173    let read_chunks_inputs = record_chunks
174        .par_iter_mut()
175        .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset))
176        .zip(
177            (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
178                .into_par_iter()
179                .map(SBucket::from)
180                .zip(s_bucket_offsets.par_iter()),
181        )
182        .map(
183            |((maybe_record_chunk, maybe_chunk_details), (s_bucket, &s_bucket_offset))| {
184                let (chunk_offset, encoded_chunk_used) = maybe_chunk_details?;
185
186                let chunk_location = chunk_offset as u64 + u64::from(s_bucket_offset);
187
188                Some((
189                    maybe_record_chunk,
190                    chunk_location,
191                    encoded_chunk_used,
192                    s_bucket,
193                ))
194            },
195        )
196        .collect::<Vec<_>>();
197
198    let sector_contents_map_size = SectorContentsMap::encoded_size(pieces_in_sector) as u64;
199    let sector_bytes = match mode {
200        ReadSectorRecordChunksMode::ConcurrentChunks => None,
201        ReadSectorRecordChunksMode::WholeSector => {
202            Some(vec![0u8; crate::sector::sector_size(pieces_in_sector)])
203        }
204    };
205    match sector {
206        ReadAt::Sync(sector) => {
207            let sector_bytes = {
208                if let Some(mut sector_bytes) = sector_bytes {
209                    sector.read_at(&mut sector_bytes, 0)?;
210                    Some(sector_bytes)
211                } else {
212                    None
213                }
214            };
215            read_chunks_inputs.into_par_iter().flatten().try_for_each(
216                |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| {
217                    let mut record_chunk = [0; ScalarBytes::FULL_BYTES];
218                    if let Some(sector_bytes) = &sector_bytes {
219                        record_chunk.copy_from_slice(
220                            &sector_bytes[sector_contents_map_size as usize
221                                + chunk_location as usize * ScalarBytes::FULL_BYTES..]
222                                [..ScalarBytes::FULL_BYTES],
223                        );
224                    } else {
225                        sector
226                            .read_at(
227                                &mut record_chunk,
228                                sector_contents_map_size
229                                    + chunk_location * ScalarBytes::FULL_BYTES as u64,
230                            )
231                            .map_err(|error| ReadingError::FailedToReadChunk {
232                                chunk_location,
233                                error,
234                            })?;
235                    }
236
237                    // Decode chunk if necessary
238                    if encoded_chunk_used {
239                        let proof = pos_table
240                            .find_proof(s_bucket.into())
241                            .ok_or(ReadingError::MissingPosProof { s_bucket })?;
242
243                        record_chunk =
244                            Simd::to_array(Simd::from(record_chunk) ^ Simd::from(*proof.hash()));
245                    }
246
247                    maybe_record_chunk.replace(Scalar::try_from(record_chunk).map_err(
248                        |error| ReadingError::InvalidChunk {
249                            s_bucket,
250                            encoded_chunk_used,
251                            chunk_location,
252                            error,
253                        },
254                    )?);
255
256                    Ok::<_, ReadingError>(())
257                },
258            )?;
259        }
260        ReadAt::Async(sector) => {
261            let sector_bytes = &{
262                if let Some(sector_bytes) = sector_bytes {
263                    Some(sector.read_at(sector_bytes, 0).await?)
264                } else {
265                    None
266                }
267            };
268            let processing_chunks = read_chunks_inputs
269                .into_iter()
270                .flatten()
271                .map(
272                    |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| async move {
273                        let mut record_chunk = [0; ScalarBytes::FULL_BYTES];
274                        if let Some(sector_bytes) = &sector_bytes {
275                            record_chunk.copy_from_slice(
276                                &sector_bytes[sector_contents_map_size as usize
277                                    + chunk_location as usize * ScalarBytes::FULL_BYTES..]
278                                    [..ScalarBytes::FULL_BYTES],
279                            );
280                        } else {
281                            record_chunk.copy_from_slice(
282                                &sector
283                                    .read_at(
284                                        vec![0; ScalarBytes::FULL_BYTES],
285                                        sector_contents_map_size + chunk_location * ScalarBytes::FULL_BYTES as u64,
286                                    )
287                                    .await
288                                    .map_err(|error| ReadingError::FailedToReadChunk {
289                                        chunk_location,
290                                        error,
291                                    })?
292                            );
293                        }
294
295
296                        // Decode chunk if necessary
297                        if encoded_chunk_used {
298                            let proof = pos_table.find_proof(s_bucket.into())
299                                .ok_or(ReadingError::MissingPosProof { s_bucket })?;
300
301                            record_chunk = Simd::to_array(
302                                Simd::from(record_chunk) ^ Simd::from(*proof.hash()),
303                            );
304                        }
305
306                        maybe_record_chunk.replace(Scalar::try_from(record_chunk).map_err(
307                            |error| ReadingError::InvalidChunk {
308                                s_bucket,
309                                encoded_chunk_used,
310                                chunk_location,
311                                error,
312                            },
313                        )?);
314
315                        Ok::<_, ReadingError>(())
316                    },
317                )
318                .collect::<FuturesUnordered<_>>()
319                .filter_map(|result| async move {
320                    result.err()
321                });
322
323            std::pin::pin!(processing_chunks)
324                .next()
325                .await
326                .map_or(Ok(()), Err)?;
327        }
328    }
329
330    Ok(record_chunks)
331}
332
333/// Given sector record chunks recover extended record chunks (both source and parity)
334pub fn recover_extended_record_chunks(
335    sector_record_chunks: &[Option<Scalar>; Record::NUM_S_BUCKETS],
336    piece_offset: PieceOffset,
337    erasure_coding: &ErasureCoding,
338) -> Result<Box<[Scalar; Record::NUM_S_BUCKETS]>, ReadingError> {
339    // Restore source record scalars
340    // TODO: Would be nice to recover directly into `Box<[Scalar; Record::NUM_S_BUCKETS]>`
341    let record_chunks = erasure_coding
342        .recover(sector_record_chunks)
343        .map_err(|error| ReadingError::FailedToErasureDecodeRecord {
344            piece_offset,
345            error,
346        })?;
347
348    // Required for safety invariant below
349    if record_chunks.len() != Record::NUM_S_BUCKETS {
350        return Err(ReadingError::WrongRecordSizeAfterDecoding {
351            expected: Record::NUM_S_BUCKETS,
352            actual: record_chunks.len(),
353        });
354    }
355
356    // Allocation in vector can be larger than contents, we need to make sure allocation is the same
357    // as the contents, this should also contain fast path if allocation matches contents
358    let record_chunks = record_chunks.into_iter().collect::<Box<_>>();
359    let mut record_chunks = ManuallyDrop::new(record_chunks);
360    // SAFETY: Original memory is not dropped, size of the data checked above
361    let record_chunks = unsafe { Box::from_raw(record_chunks.as_mut_ptr() as *mut _) };
362
363    Ok(record_chunks)
364}
365
366/// Given sector record chunks recover source record chunks in form of an iterator.
367pub fn recover_source_record_chunks(
368    sector_record_chunks: &[Option<Scalar>; Record::NUM_S_BUCKETS],
369    piece_offset: PieceOffset,
370    erasure_coding: &ErasureCoding,
371) -> Result<impl ExactSizeIterator<Item = Scalar>, ReadingError> {
372    // Restore source record scalars
373    let record_chunks = erasure_coding
374        .recover_source(sector_record_chunks)
375        .map_err(|error| ReadingError::FailedToErasureDecodeRecord {
376            piece_offset,
377            error,
378        })?;
379
380    // Required for safety invariant below
381    if record_chunks.len() != Record::NUM_CHUNKS {
382        return Err(ReadingError::WrongRecordSizeAfterDecoding {
383            expected: Record::NUM_CHUNKS,
384            actual: record_chunks.len(),
385        });
386    }
387
388    Ok(record_chunks)
389}
390
391/// Read metadata (commitment and witness) for record
392pub(crate) async fn read_record_metadata<S, A>(
393    piece_offset: PieceOffset,
394    pieces_in_sector: u16,
395    sector: &ReadAt<S, A>,
396) -> Result<RecordMetadata, ReadingError>
397where
398    S: ReadAtSync,
399    A: ReadAtAsync,
400{
401    let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) as u64
402        + sector_record_chunks_size(pieces_in_sector) as u64;
403    // Move to the beginning of the commitment and witness we care about
404    let record_metadata_offset =
405        sector_metadata_start + RecordMetadata::encoded_size() as u64 * u64::from(piece_offset);
406
407    let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()];
408    match sector {
409        ReadAt::Sync(sector) => {
410            sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?;
411        }
412        ReadAt::Async(sector) => {
413            record_metadata_bytes = sector
414                .read_at(record_metadata_bytes, record_metadata_offset)
415                .await?;
416        }
417    }
418    let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref())
419        .expect("Length is correct, contents doesn't have specific structure to it; qed");
420
421    Ok(record_metadata)
422}
423
424/// Read piece from sector.
425///
426/// NOTE: Even though this function is async, proof of time table generation is expensive and should
427/// be done in a dedicated thread where blocking is allowed.
428pub async fn read_piece<PosTable, S, A>(
429    piece_offset: PieceOffset,
430    sector_id: &SectorId,
431    sector_metadata: &SectorMetadataChecksummed,
432    sector: &ReadAt<S, A>,
433    erasure_coding: &ErasureCoding,
434    mode: ReadSectorRecordChunksMode,
435    table_generator: &mut PosTable::Generator,
436) -> Result<Piece, ReadingError>
437where
438    PosTable: Table,
439    S: ReadAtSync,
440    A: ReadAtAsync,
441{
442    let pieces_in_sector = sector_metadata.pieces_in_sector;
443
444    let sector_contents_map = {
445        let mut sector_contents_map_bytes =
446            vec![0; SectorContentsMap::encoded_size(pieces_in_sector)];
447        match sector {
448            ReadAt::Sync(sector) => {
449                sector.read_at(&mut sector_contents_map_bytes, 0)?;
450            }
451            ReadAt::Async(sector) => {
452                sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?;
453            }
454        }
455
456        SectorContentsMap::from_bytes(&sector_contents_map_bytes, pieces_in_sector)?
457    };
458
459    let sector_record_chunks = read_sector_record_chunks(
460        piece_offset,
461        pieces_in_sector,
462        &sector_metadata.s_bucket_offsets(),
463        &sector_contents_map,
464        &table_generator.generate(&sector_id.derive_evaluation_seed(piece_offset)),
465        sector,
466        mode,
467    )
468    .await?;
469    // Restore source record scalars
470    let record_chunks =
471        recover_source_record_chunks(&sector_record_chunks, piece_offset, erasure_coding)?;
472
473    let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?;
474
475    let mut piece = Piece::default();
476
477    piece
478        .record_mut()
479        .iter_mut()
480        .zip(record_chunks)
481        .for_each(|(output, input)| {
482            *output = input.to_bytes();
483        });
484
485    *piece.commitment_mut() = record_metadata.commitment;
486    *piece.witness_mut() = record_metadata.witness;
487
488    // Verify checksum
489    let actual_checksum = blake3_hash(piece.as_ref());
490    if actual_checksum != record_metadata.piece_checksum {
491        debug!(
492            ?sector_id,
493            %piece_offset,
494            actual_checksum = %hex::encode(actual_checksum),
495            expected_checksum = %hex::encode(record_metadata.piece_checksum),
496            "Hash doesn't match, plotted piece is corrupted"
497        );
498
499        return Err(ReadingError::ChecksumMismatch);
500    }
501
502    Ok(piece.to_shared())
503}