subspace_data_retrieval/
segment_downloading.rs

1//! Fetching segments of the archived history of Subspace Network.
2
3use crate::piece_getter::PieceGetter;
4use futures::StreamExt;
5use std::time::Duration;
6use subspace_archiving::archiver::Segment;
7use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
8use subspace_core_primitives::pieces::Piece;
9use subspace_core_primitives::segments::{
10    ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex,
11};
12use subspace_erasure_coding::ErasureCoding;
13use tokio::task::spawn_blocking;
14use tokio::time::sleep;
15use tracing::debug;
16
17/// Segment getter errors.
18#[derive(Debug, thiserror::Error)]
19pub enum SegmentDownloadingError {
20    /// Not enough pieces
21    #[error(
22        "Not enough ({downloaded_pieces}/{}) pieces for segment {segment_index}",
23        RecordedHistorySegment::NUM_RAW_RECORDS
24    )]
25    NotEnoughPieces {
26        /// The segment we were trying to download
27        segment_index: SegmentIndex,
28        /// Number of pieces that were downloaded
29        downloaded_pieces: usize,
30    },
31
32    /// Piece getter error
33    #[error("Piece getter error: {source}")]
34    PieceGetterError {
35        #[from]
36        source: anyhow::Error,
37    },
38
39    /// Segment reconstruction error
40    #[error("Segment reconstruction error: {source}")]
41    SegmentReconstruction {
42        #[from]
43        source: ReconstructorError,
44    },
45
46    /// Segment decoding error
47    #[error("Segment data decoding error: {source}")]
48    SegmentDecoding {
49        #[from]
50        source: parity_scale_codec::Error,
51    },
52}
53
54/// Concurrently downloads the pieces for `segment_index`, and reconstructs the segment.
55pub async fn download_segment<PG>(
56    segment_index: SegmentIndex,
57    piece_getter: &PG,
58    erasure_coding: ErasureCoding,
59    retries: usize,
60    retry_delay: Option<Duration>,
61) -> Result<Segment, SegmentDownloadingError>
62where
63    PG: PieceGetter,
64{
65    let reconstructor = Reconstructor::new(erasure_coding);
66
67    let segment_pieces =
68        download_segment_pieces(segment_index, piece_getter, retries, retry_delay).await?;
69
70    let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
71        .await
72        .expect("Panic if blocking task panicked")?;
73
74    Ok(segment)
75}
76
77/// Downloads pieces of a segment so that segment can be reconstructed afterward.
78/// Repeatedly attempts to download pieces until the required number of pieces is reached.
79///
80/// Prefers source pieces if available, on error returns the number of available pieces.
81pub async fn download_segment_pieces<PG>(
82    segment_index: SegmentIndex,
83    piece_getter: &PG,
84    retries: usize,
85    retry_delay: Option<Duration>,
86) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
87where
88    PG: PieceGetter,
89{
90    let mut existing_pieces = [const { None }; ArchivedHistorySegment::NUM_PIECES];
91
92    for retry in 0..=retries {
93        match download_missing_segment_pieces(segment_index, piece_getter, existing_pieces).await {
94            Ok(segment_pieces) => return Ok(segment_pieces),
95            Err((error, incomplete_segment_pieces)) => {
96                existing_pieces = incomplete_segment_pieces;
97
98                if retry < retries {
99                    debug!(
100                        %segment_index,
101                        %retry,
102                        ?retry_delay,
103                        ?error,
104                        "Failed to download segment pieces once, retrying"
105                    );
106                    if let Some(retry_delay) = retry_delay {
107                        // Wait before retrying to give the node a chance to find other peers
108                        sleep(retry_delay).await;
109                    }
110                }
111            }
112        }
113    }
114
115    debug!(
116        %segment_index,
117        %retries,
118        "Failed to download segment pieces"
119    );
120
121    Err(SegmentDownloadingError::NotEnoughPieces {
122        segment_index,
123        downloaded_pieces: existing_pieces
124            .iter()
125            .filter(|piece| piece.is_some())
126            .count(),
127    })
128}
129
130/// Tries to download pieces of a segment once, so that segment can be reconstructed afterward.
131/// Pass existing pieces in `existing_pieces`, or use
132/// `[const { None }; ArchivedHistorySegment::NUM_PIECES]` if no pieces are available.
133///
134/// Prefers source pieces if available, on error returns the incomplete piece download (including
135/// existing pieces).
136async fn download_missing_segment_pieces<PG>(
137    segment_index: SegmentIndex,
138    piece_getter: &PG,
139    existing_pieces: [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
140) -> Result<
141    Vec<Option<Piece>>,
142    (
143        SegmentDownloadingError,
144        [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
145    ),
146>
147where
148    PG: PieceGetter,
149{
150    let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
151    let mut downloaded_pieces = existing_pieces
152        .iter()
153        .filter(|piece| piece.is_some())
154        .count();
155
156    // Debugging failure patterns in piece downloads
157    let mut first_success = None;
158    let mut last_success = None;
159    let mut first_failure = None;
160    let mut last_failure = None;
161
162    let mut segment_pieces = existing_pieces;
163
164    let mut pieces_iter = segment_index
165        .segment_piece_indexes_source_first()
166        .into_iter()
167        .peekable();
168
169    // Download in batches until we get enough or exhaust available pieces
170    while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
171        let piece_indices = pieces_iter
172            .by_ref()
173            .filter(|piece_index| segment_pieces[piece_index.position() as usize].is_none())
174            .take(required_pieces_number - downloaded_pieces)
175            .collect();
176
177        let mut received_segment_pieces = match piece_getter.get_pieces(piece_indices).await {
178            Ok(pieces) => pieces,
179            Err(error) => return Err((error.into(), segment_pieces)),
180        };
181
182        while let Some((piece_index, result)) = received_segment_pieces.next().await {
183            match result {
184                Ok(Some(piece)) => {
185                    downloaded_pieces += 1;
186                    segment_pieces
187                        .get_mut(piece_index.position() as usize)
188                        .expect("Piece position is by definition within segment; qed")
189                        .replace(piece);
190
191                    if first_success.is_none() {
192                        first_success = Some(piece_index.position());
193                    }
194                    last_success = Some(piece_index.position());
195                }
196                // We often see an error where 127 pieces are downloaded successfully, but the
197                // other 129 fail. It seems like 1 request in a 128 piece batch fails, then 128
198                // single piece requests are made, and also fail.
199                // Delaying requests after a failure gives the node a chance to find other peers.
200                Ok(None) => {
201                    debug!(%piece_index, "Piece was not found");
202                    if first_failure.is_none() {
203                        first_failure = Some(piece_index.position());
204                    }
205                    last_failure = Some(piece_index.position());
206                }
207                Err(error) => {
208                    debug!(%error, %piece_index, "Failed to get piece");
209                    if first_failure.is_none() {
210                        first_failure = Some(piece_index.position());
211                    }
212                    last_failure = Some(piece_index.position());
213                }
214            }
215        }
216    }
217
218    if downloaded_pieces < required_pieces_number {
219        debug!(
220            %segment_index,
221            %downloaded_pieces,
222            %required_pieces_number,
223            // Piece positions that succeeded/failed
224            ?first_success,
225            ?last_success,
226            ?first_failure,
227            ?last_failure,
228            "Failed to retrieve pieces for segment"
229        );
230
231        return Err((
232            SegmentDownloadingError::NotEnoughPieces {
233                segment_index,
234                downloaded_pieces: segment_pieces
235                    .iter()
236                    .filter(|piece| piece.is_some())
237                    .count(),
238            },
239            segment_pieces,
240        ));
241    }
242
243    Ok(segment_pieces.to_vec())
244}