subspace_data_retrieval/
object_fetcher.rs

1//! Fetching objects stored in the archived history of Subspace Network.
2
3use crate::object_fetcher::partial_object::{PartialObject, RawPieceData};
4use crate::object_fetcher::segment_header::{
5    max_segment_header_encoded_size, min_segment_header_encoded_size, MAX_SEGMENT_PADDING,
6};
7use crate::piece_fetcher::download_pieces;
8use crate::piece_getter::PieceGetter;
9use parity_scale_codec::{Compact, CompactLen, Decode};
10use std::sync::Arc;
11use subspace_archiving::archiver::SegmentItem;
12use subspace_core_primitives::hashes::Blake3Hash;
13use subspace_core_primitives::objects::{GlobalObject, GlobalObjectMapping};
14use subspace_core_primitives::pieces::{Piece, PieceIndex, RawRecord};
15use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentIndex};
16use tracing::{debug, trace, warn};
17
18mod partial_object;
19mod segment_header;
20#[cfg(test)]
21mod tests;
22
23pub use segment_header::MAX_BLOCK_LENGTH;
24
25/// The maximum object length the implementation in this module can reliably handle.
26///
27/// Currently objects are limited by the largest block size in the consensus chain, which is 5 MB.
28/// But this implementation can retrieve all objects smaller than a segment (up to 124 MB). Some
29/// objects between 124 MB and 248 MB are supported, if they span 2 segments (but not 3 segments).
30/// But objects that large don't currently exist, so we use the lower limit to avoid potential
31/// security and reliability issues.
32///
33/// The maximum object length excludes segment padding, and the parent segment header at the start
34/// of the next segment.
35//
36// TODO: if the consensus chain supports larger block sizes, implement support for:
37// - objects larger than 124 MB: reconstruct objects that span 3 or more segments, by
38//   reconstructing each full segment
39// - blocks larger than 1 GB: handle padding for blocks with encoded length prefixes that are
40//   longer than 4 bytes, by increasing MAX_SEGMENT_PADDING
41#[inline]
42pub fn max_supported_object_length() -> usize {
43    // segment - variable end padding - segment version variant - segment header item variant
44    // - parent segment header - segment (block) item variant - block size - object size
45    RecordedHistorySegment::SIZE
46        - MAX_SEGMENT_PADDING
47        - 1
48        - 1
49        - max_segment_header_encoded_size()
50        - 1
51        - MAX_ENCODED_LENGTH_SIZE * 2
52}
53
54/// The length of the compact encoding of `max_supported_object_length()`.
55const MAX_ENCODED_LENGTH_SIZE: usize = 4;
56
57/// Used to store the last piece downloaded in an object fetcher batch.
58pub type LastPieceCache = (PieceIndex, Piece);
59
60/// Object fetching errors.
61#[derive(Debug, PartialEq, Eq, thiserror::Error)]
62pub enum Error {
63    /// Supplied piece index is not a source piece
64    #[error("Piece index is not a source piece, object: {mapping:?}")]
65    NotSourcePiece { mapping: GlobalObject },
66
67    /// Supplied piece offset is too large
68    #[error(
69        "Piece offset is too large, must be less than {}, object: {mapping:?}",
70        RawRecord::SIZE
71    )]
72    PieceOffsetTooLarge { mapping: GlobalObject },
73
74    /// Object is too large error
75    #[error(
76        "Data length {data_length} exceeds maximum object size {max_object_len} \
77         for object: {mapping:?}"
78    )]
79    ObjectTooLarge {
80        data_length: usize,
81        max_object_len: usize,
82        mapping: GlobalObject,
83    },
84
85    /// Length prefix is too large error
86    #[error(
87        "Length prefix length {length_prefix_len} exceeds maximum object size {max_object_len} \
88         for object: {mapping:?}"
89    )]
90    LengthPrefixTooLarge {
91        length_prefix_len: usize,
92        max_object_len: usize,
93        mapping: GlobalObject,
94    },
95
96    /// Hash doesn't match data
97    #[error("Incorrect data hash {data_hash:?} for {data_length} byte object: {mapping:?}")]
98    InvalidDataHash {
99        data_hash: Blake3Hash,
100        data_length: usize,
101        mapping: GlobalObject,
102        // The hex-encoded object data, only used in tests
103        #[cfg(test)]
104        data: String,
105    },
106
107    /// Piece getter error
108    #[error("Getting piece caused an error: {error}, object: {mapping:?}")]
109    PieceGetterError {
110        /// The original `anyhow::Error`, debug-printed as a string.
111        /// This allows us to check errors for equality in tests.
112        error: String,
113        mapping: GlobalObject,
114    },
115
116    /// Piece getter couldn't find the piece
117    #[error("Piece {piece_index:?} was not found by piece getter")]
118    PieceNotFound { piece_index: PieceIndex },
119
120    /// Supplied piece offset is inside the minimum segment header size
121    #[error(
122            "Piece offset is inside the segment header, min size of segment header: {}, object: {mapping:?}",
123            min_segment_header_encoded_size(),
124        )]
125    PieceOffsetInSegmentHeader { mapping: GlobalObject },
126
127    /// Segment decoding error
128    #[error("Segment {segment_index:?} data decoding error: {source:?}, object: {mapping:?}")]
129    SegmentDecoding {
130        source: parity_scale_codec::Error,
131        segment_index: SegmentIndex,
132        mapping: GlobalObject,
133    },
134
135    /// Unknown segment variant error
136    #[error(
137        "Decoding segment {segment_index:?} failed: unknown variant: {segment_variant}, \
138         object: {mapping:?}"
139    )]
140    UnknownSegmentVariant {
141        segment_variant: u8,
142        segment_index: SegmentIndex,
143        mapping: GlobalObject,
144    },
145
146    /// Unexpected segment item error
147    #[error(
148        "Segment {segment_index:?} has unexpected item, current progress: {segment_progress}, \
149         object: {mapping:?}, item: {segment_item:?}"
150    )]
151    UnexpectedSegmentItem {
152        segment_progress: usize,
153        segment_index: SegmentIndex,
154        segment_item: Box<SegmentItem>,
155        mapping: GlobalObject,
156    },
157
158    /// Unexpected segment item variant error
159    #[error(
160        "Segment {segment_index:?} has unexpected item, current progress: {segment_progress}, \
161         object: {mapping:?}, item: {segment_item_variant:?}, item size and data lengths: \
162         {segment_item_lengths:?}"
163    )]
164    UnexpectedSegmentItemVariant {
165        segment_progress: usize,
166        segment_index: SegmentIndex,
167        segment_item_variant: u8,
168        segment_item_lengths: Option<(usize, usize)>,
169        mapping: GlobalObject,
170    },
171
172    /// Object length couldn't be decoded after downloading two pieces
173    #[error(
174        "Invalid object: next source piece: {next_source_piece_index:?}, segment data length: \
175         {segment_data_length:?}, object: {mapping:?}"
176    )]
177    InvalidObject {
178        /// The next source piece index after the first two pieces
179        next_source_piece_index: PieceIndex,
180        /// The available object data in the current segment
181        segment_data_length: Option<usize>,
182        mapping: GlobalObject,
183    },
184
185    /// Object extends beyond block continuation, or the mapping is otherwise invalid
186    #[error(
187        "Invalid mapping: data length: {object_data_length:?}, next source piece: \
188         {next_source_piece_index:?}, remaining_piece_count: {remaining_piece_count}, object: \
189         {mapping:?}"
190    )]
191    InvalidMapping {
192        /// The next source piece index, before we attempted concurrent downloads
193        next_source_piece_index: PieceIndex,
194        /// The number of pieces we concurrently downloaded
195        remaining_piece_count: usize,
196        /// The object data length, after the concurrent downloads
197        object_data_length: usize,
198        mapping: GlobalObject,
199    },
200}
201
202/// Object fetcher for the Subspace DSN.
203pub struct ObjectFetcher<PG>
204where
205    PG: PieceGetter + Send + Sync,
206{
207    /// The piece getter used to fetch pieces.
208    piece_getter: Arc<PG>,
209
210    /// The maximum number of data bytes we'll read for a single object.
211    max_object_len: usize,
212}
213
214impl<PG> ObjectFetcher<PG>
215where
216    PG: PieceGetter + Send + Sync,
217{
218    /// Create a new object fetcher with the given configuration.
219    ///
220    /// `max_object_len` is the amount of data bytes we'll read for a single object before giving
221    /// up and returning an error. In this implementation, it is limited to
222    /// [`max_supported_object_length()`], which is much larger than the maximum consensus block
223    /// size.
224    pub fn new(piece_getter: Arc<PG>, mut max_object_len: usize) -> Self {
225        if max_object_len > max_supported_object_length() {
226            warn!(
227                max_object_len,
228                max_supported_object_length = ?max_supported_object_length(),
229                "Object fetcher size limit exceeds maximum supported object size, \
230                limiting to implementation-supported size"
231            );
232
233            max_object_len = max_supported_object_length();
234        }
235
236        Self {
237            piece_getter,
238            max_object_len,
239        }
240    }
241
242    /// Assemble the objects in `mapping` by fetching necessary pieces using the piece getter, and
243    /// putting the objects' bytes together.
244    ///
245    /// Checks the objects' hashes to make sure the correct bytes are returned.
246    ///
247    /// For efficiency, objects in a batch should be sorted by increasing piece index. Objects with
248    /// the same piece index should be sorted by increasing offset. This allows the last piece to
249    /// be re-used for the next object in the batch.
250    ///
251    /// Batches should be split if the gap between object piece indexes is 6 or more. Those objects
252    /// can't share any pieces, because a maximum-sized object only uses 6 pieces.
253    pub async fn fetch_objects(
254        &self,
255        mappings: GlobalObjectMapping,
256    ) -> Result<Vec<Vec<u8>>, Error> {
257        let mut objects = Vec::with_capacity(mappings.objects().len());
258        let mut piece_cache = None;
259
260        // TODO:
261        // - keep the last downloaded piece until it's no longer needed
262        // - document sorting mappings in piece index order
263        for &mapping in mappings.objects() {
264            let GlobalObject {
265                piece_index,
266                offset,
267                ..
268            } = mapping;
269
270            // Validate parameters
271            if !piece_index.is_source() {
272                debug!(
273                    ?mapping,
274                    "Invalid piece index for object: must be a source piece",
275                );
276
277                // Parity pieces contain effectively random data, and can't be used to fetch
278                // objects
279                return Err(Error::NotSourcePiece { mapping });
280            }
281
282            // We could parse each segment header to do this check perfectly, but it's an edge
283            // case, so we just do a best-effort check
284            if piece_index.source_position() == 0
285                && offset < min_segment_header_encoded_size() as u32
286            {
287                debug!(
288                    ?mapping,
289                    min_segment_header_encoded_size = ?min_segment_header_encoded_size(),
290                    "Invalid offset for object: must not be inside the segment header",
291                );
292
293                return Err(Error::PieceOffsetInSegmentHeader { mapping });
294            }
295
296            if offset >= RawRecord::SIZE as u32 {
297                debug!(
298                    ?mapping,
299                    RawRecord_SIZE = RawRecord::SIZE,
300                    "Invalid piece offset for object: must be less than the size of a raw record",
301                );
302
303                return Err(Error::PieceOffsetTooLarge { mapping });
304            }
305
306            // All objects can be assembled from individual pieces, we handle segments by checking
307            // all possible padding, and parsing and discarding segment headers.
308            let data = self.fetch_object(mapping, &mut piece_cache).await?;
309
310            objects.push(data);
311        }
312
313        Ok(objects)
314    }
315
316    /// Single object fetching and assembling.
317    ///
318    /// Each piece is initially turned into a PartialData struct. When there are enough pieces to
319    /// calculate the object's length(s), those pieces are turned into a PartialObject struct.
320    /// After that, each new piece becomes a PartialData (to track padding and segment headers),
321    /// then gets added to the PartialObject.
322    ///
323    /// When the PartialObject has enough data for its shortest length, the data (and corresponding
324    /// padding) is checked against the object hash. If that fails, we check more padding lengths,
325    /// or fetch more data.
326    //
327    // TODO: return last downloaded piece from fetch_object() and pass them to the next fetch_object()
328    async fn fetch_object(
329        &self,
330        mapping: GlobalObject,
331        piece_cache: &mut Option<LastPieceCache>,
332    ) -> Result<Vec<u8>, Error> {
333        let GlobalObject {
334            piece_index,
335            offset,
336            ..
337        } = mapping;
338
339        // The next piece we want to download, starting with piece at index `piece_index`
340        let mut next_source_piece_index = piece_index;
341
342        // The raw data we've read so far
343        let mut raw_data = RawPieceData::new_for_first_piece(mapping);
344
345        // Get pieces until we have enough data to calculate the object's length(s).
346        // Objects with their length bytes at the end of a piece are a rare edge case.
347        let piece = self
348            .read_piece(next_source_piece_index, mapping, piece_cache)
349            .await?;
350
351        // Discard piece data before the offset.
352        // If this is the first piece in a segment, this automatically skips the segment header.
353        let piece_data = piece
354            .record()
355            .to_raw_record_chunks()
356            .flatten()
357            .skip(offset as usize)
358            .copied()
359            .collect::<Vec<u8>>();
360
361        raw_data.add_piece_data(next_source_piece_index, piece_data, mapping)?;
362        next_source_piece_index = next_source_piece_index.next_source_index();
363
364        // Try to create a new partial object, this only works if we have enough data to find its length
365        let mut partial_object = if let Some(partial_object) =
366            PartialObject::new_with_padding(&raw_data, self.max_object_len, mapping)?
367        {
368            // We've used up this data, so just drop it
369            std::mem::drop(raw_data);
370
371            trace!(
372                %next_source_piece_index,
373                ?mapping,
374                ?partial_object,
375                "Successfully decoded partial object length from first piece",
376            );
377
378            partial_object
379        } else {
380            // Need the next piece to read the length of the object data
381            trace!(
382                %next_source_piece_index,
383                ?mapping,
384                ?raw_data,
385                "Part of object length bytes are in next piece, fetching",
386            );
387
388            // Get the second piece for the object
389            let piece = self
390                .read_piece(next_source_piece_index, mapping, piece_cache)
391                .await?;
392            // We want all the piece data
393            let piece_data = piece
394                .record()
395                .to_raw_record_chunks()
396                .flatten()
397                .copied()
398                .collect::<Vec<u8>>();
399
400            raw_data.add_piece_data(next_source_piece_index, piece_data, mapping)?;
401            next_source_piece_index = next_source_piece_index.next_source_index();
402
403            // We should have enough data to create a partial object now
404            if let Some(partial_object) =
405                PartialObject::new_with_padding(&raw_data, self.max_object_len, mapping)?
406            {
407                // We've used up this data, so just drop it
408                std::mem::drop(raw_data);
409
410                trace!(
411                    %next_source_piece_index,
412                    ?mapping,
413                    ?partial_object,
414                    "Successfully decoded partial object length from first two pieces",
415                );
416
417                partial_object
418            } else {
419                // There's something wrong with the mapping, because we can't decode the object's
420                // length after two pieces
421                return Err(Error::InvalidObject {
422                    next_source_piece_index,
423                    segment_data_length: raw_data.segment_data_length(),
424                    mapping,
425                });
426            }
427        };
428
429        // We might already have the whole object, let's check before downloading more pieces
430        if let Some(data) = partial_object.try_reconstruct_object(mapping)? {
431            return Ok(data);
432        }
433
434        // Read more pieces until we have enough data for all possible object lengths.
435        //
436        // Adding padding can change the size of the object up to 256x. But the maximum object size
437        // is 6 pieces, so we get better latency by downloading any pieces that could be needed at
438        // the same time. (Larger objects have already been rejected during length decoding.)
439        let remaining_piece_count = partial_object
440            .max_remaining_download_length()
441            .div_ceil(RawRecord::SIZE);
442
443        if remaining_piece_count > 0 {
444            let remaining_piece_indexes = (next_source_piece_index..)
445                .filter(|i| i.is_source())
446                .take(remaining_piece_count)
447                .collect::<Arc<[PieceIndex]>>();
448            // TODO: turn this into a concurrent stream, which cancels piece downloads if they aren't
449            // needed
450            let pieces = self
451                .read_pieces(remaining_piece_indexes.clone(), mapping, piece_cache)
452                .await?
453                .into_iter()
454                .zip(remaining_piece_indexes.iter().copied())
455                .map(|(piece, piece_index)| {
456                    (
457                        piece_index,
458                        piece
459                            .record()
460                            .to_raw_record_chunks()
461                            .flatten()
462                            .copied()
463                            .collect::<Vec<u8>>(),
464                    )
465                });
466
467            for (piece_index, piece_data) in pieces {
468                let mut new_data = RawPieceData::new_for_next_piece(
469                    partial_object.max_remaining_download_length(),
470                    piece_index,
471                );
472                new_data.add_piece_data(piece_index, piece_data, mapping)?;
473                partial_object.add_piece_data_with_padding(new_data);
474
475                // We might already have the whole object, let's check before decoding more pieces
476                if let Some(data) = partial_object.try_reconstruct_object(mapping)? {
477                    return Ok(data);
478                }
479            }
480        }
481
482        // If the mapping is invalid, we can try to read beyond the downloaded pieces.
483        // Specifically, if a cross-segment object's offset is wrong, we can try to read beyond the
484        // block continuation at the start of the second segment.
485        Err(Error::InvalidMapping {
486            next_source_piece_index,
487            remaining_piece_count,
488            object_data_length: partial_object.fetched_data_length(),
489            mapping,
490        })
491    }
492
493    /// Concurrently read multiple pieces, and return them in the supplied order.
494    ///
495    /// The mapping is only used for error reporting.
496    async fn read_pieces(
497        &self,
498        piece_indexes: Arc<[PieceIndex]>,
499        mapping: GlobalObject,
500        piece_cache: &mut Option<LastPieceCache>,
501    ) -> Result<Vec<Piece>, Error> {
502        download_pieces(
503            piece_indexes.clone(),
504            &piece_cache.clone().with_fallback(self.piece_getter.clone()),
505        )
506        .await
507        .inspect(|pieces| {
508            if let (Some(piece_index), Some(piece)) = (piece_indexes.last(), pieces.last()) {
509                *piece_cache = Some((*piece_index, piece.clone()))
510            }
511        })
512        .map_err(|source| {
513            debug!(
514                ?piece_indexes,
515                error = ?source,
516                ?mapping,
517                "Error fetching pieces during object assembling"
518            );
519
520            Error::PieceGetterError {
521                error: format!("{source:?}"),
522                mapping,
523            }
524        })
525    }
526
527    /// Read and return a single piece.
528    ///
529    /// The mapping is only used for error reporting.
530    async fn read_piece(
531        &self,
532        piece_index: PieceIndex,
533        mapping: GlobalObject,
534        piece_cache: &mut Option<LastPieceCache>,
535    ) -> Result<Piece, Error> {
536        let piece_indexes = Arc::<[PieceIndex]>::from(vec![piece_index]);
537        download_pieces(
538            piece_indexes.clone(),
539            &piece_cache.clone().with_fallback(self.piece_getter.clone()),
540        )
541        .await
542        .inspect(|pieces| *piece_cache = Some((piece_index, pieces[0].clone())))
543        .map(|pieces| {
544            pieces
545                .first()
546                .expect("download_pieces always returns exact pieces or error")
547                .clone()
548        })
549        .map_err(|source| {
550            debug!(
551                %piece_index,
552                error = ?source,
553                ?mapping,
554                "Error fetching piece during object assembling"
555            );
556
557            Error::PieceGetterError {
558                error: format!("{source:?}"),
559                mapping,
560            }
561        })
562    }
563}
564
565/// Validate and decode the encoded length of `data`, including the encoded length bytes.
566/// `data` may be incomplete.
567///
568/// Returns `Ok(Some((data_length_encoded_length, data_length)))` if the length is valid,
569/// `Ok(None)` if there aren't enough bytes to decode the length, otherwise an error.
570///
571/// The mapping is only used for error reporting.
572fn decode_data_length(
573    mut data: &[u8],
574    max_object_len: usize,
575    mapping: GlobalObject,
576) -> Result<Option<(usize, usize)>, Error> {
577    let data_length = match Compact::<u32>::decode(&mut data) {
578        Ok(Compact(data_length)) => {
579            let data_length = data_length as usize;
580            if data_length > max_object_len {
581                debug!(
582                    data_length,
583                    max_object_len,
584                    ?mapping,
585                    "Data length exceeds object size limit for object fetcher"
586                );
587
588                return Err(Error::ObjectTooLarge {
589                    data_length,
590                    max_object_len,
591                    mapping,
592                });
593            }
594
595            data_length
596        }
597        Err(err) => {
598            // Parity doesn't have an easily matched error enum, and all bit sequences are
599            // valid compact encodings. So we assume we don't have enough bytes to decode the
600            // length, unless we already have enough bytes to decode the maximum length.
601            if data.len() >= Compact::<u32>::compact_len(&(max_object_len as u32)) {
602                debug!(
603                    length_prefix_len = data.len(),
604                    max_object_len,
605                    ?mapping,
606                    "Length prefix exceeds object size limit for object fetcher"
607                );
608
609                return Err(Error::LengthPrefixTooLarge {
610                    length_prefix_len: data.len(),
611                    max_object_len,
612                    mapping,
613                });
614            }
615
616            debug!(
617                ?err,
618                ?mapping,
619                "Not enough bytes to decode data length for object"
620            );
621
622            return Ok(None);
623        }
624    };
625
626    let data_length_encoded_length = Compact::<u32>::compact_len(&(data_length as u32));
627
628    trace!(
629        data_length,
630        data_length_encoded_length,
631        ?mapping,
632        "Decoded data length for object"
633    );
634
635    Ok(Some((data_length_encoded_length, data_length)))
636}