subspace_data_retrieval/
segment_downloading.rs

1//! Fetching segments of the archived history of Subspace Network.
2
3use crate::piece_getter::PieceGetter;
4use futures::StreamExt;
5use subspace_archiving::archiver::Segment;
6use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
7use subspace_core_primitives::pieces::Piece;
8use subspace_core_primitives::segments::{
9    ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex,
10};
11use subspace_erasure_coding::ErasureCoding;
12use tokio::task::spawn_blocking;
13use tracing::debug;
14
15/// Segment getter errors.
16#[derive(Debug, thiserror::Error)]
17pub enum SegmentDownloadingError {
18    /// Not enough pieces
19    #[error(
20        "Not enough ({downloaded_pieces}/{}) pieces for segment {segment_index}",
21        RecordedHistorySegment::NUM_RAW_RECORDS
22    )]
23    NotEnoughPieces {
24        /// The segment we were trying to download
25        segment_index: SegmentIndex,
26        /// Number of pieces that were downloaded
27        downloaded_pieces: usize,
28    },
29
30    /// Piece getter error
31    #[error("Piece getter error: {source}")]
32    PieceGetterError {
33        #[from]
34        source: anyhow::Error,
35    },
36
37    /// Segment reconstruction error
38    #[error("Segment reconstruction error: {source}")]
39    SegmentReconstruction {
40        #[from]
41        source: ReconstructorError,
42    },
43
44    /// Segment decoding error
45    #[error("Segment data decoding error: {source}")]
46    SegmentDecoding {
47        #[from]
48        source: parity_scale_codec::Error,
49    },
50}
51
52/// Concurrently downloads the pieces for `segment_index`, and reconstructs the segment.
53pub async fn download_segment<PG>(
54    segment_index: SegmentIndex,
55    piece_getter: &PG,
56    erasure_coding: ErasureCoding,
57) -> Result<Segment, SegmentDownloadingError>
58where
59    PG: PieceGetter,
60{
61    let reconstructor = Reconstructor::new(erasure_coding);
62
63    let segment_pieces = download_segment_pieces(segment_index, piece_getter).await?;
64
65    let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
66        .await
67        .expect("Panic if blocking task panicked")?;
68
69    Ok(segment)
70}
71
72/// Downloads pieces of the segment such that segment can be reconstructed afterward.
73///
74/// Prefers source pieces if available, on error returns number of downloaded pieces.
75pub async fn download_segment_pieces<PG>(
76    segment_index: SegmentIndex,
77    piece_getter: &PG,
78) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
79where
80    PG: PieceGetter,
81{
82    let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
83    let mut downloaded_pieces = 0_usize;
84
85    // Debugging failure patterns in piece downloads
86    let mut first_success = None;
87    let mut last_success = None;
88    let mut first_failure = None;
89    let mut last_failure = None;
90
91    let mut segment_pieces = vec![None::<Piece>; ArchivedHistorySegment::NUM_PIECES];
92
93    let mut pieces_iter = segment_index
94        .segment_piece_indexes_source_first()
95        .into_iter()
96        .peekable();
97
98    // Download in batches until we get enough or exhaust available pieces
99    while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
100        let piece_indices = pieces_iter
101            .by_ref()
102            .take(required_pieces_number - downloaded_pieces)
103            .collect();
104
105        let mut received_segment_pieces = piece_getter.get_pieces(piece_indices).await?;
106
107        while let Some((piece_index, result)) = received_segment_pieces.next().await {
108            match result {
109                Ok(Some(piece)) => {
110                    downloaded_pieces += 1;
111                    segment_pieces
112                        .get_mut(piece_index.position() as usize)
113                        .expect("Piece position is by definition within segment; qed")
114                        .replace(piece);
115
116                    if first_success.is_none() {
117                        first_success = Some(piece_index.position());
118                    }
119                    last_success = Some(piece_index.position());
120                }
121                // We often see an error where 127 pieces are downloaded successfully, but the
122                // other 129 fail. It seems like 1 request in a 128 piece batch fails, then 128
123                // single piece requests are made, and also fail.
124                // Delaying requests after a failure gives the node a chance to find other peers.
125                Ok(None) => {
126                    debug!(%piece_index, "Piece was not found");
127                    if first_failure.is_none() {
128                        first_failure = Some(piece_index.position());
129                    }
130                    last_failure = Some(piece_index.position());
131                }
132                Err(error) => {
133                    debug!(%error, %piece_index, "Failed to get piece");
134                    if first_failure.is_none() {
135                        first_failure = Some(piece_index.position());
136                    }
137                    last_failure = Some(piece_index.position());
138                }
139            }
140        }
141    }
142
143    if downloaded_pieces < required_pieces_number {
144        debug!(
145            %segment_index,
146            %downloaded_pieces,
147            %required_pieces_number,
148            // Piece positions that succeeded/failed
149            ?first_success,
150            ?last_success,
151            ?first_failure,
152            ?last_failure,
153            "Failed to retrieve pieces for segment"
154        );
155
156        return Err(SegmentDownloadingError::NotEnoughPieces {
157            segment_index,
158            downloaded_pieces,
159        });
160    }
161
162    Ok(segment_pieces)
163}