subspace_data_retrieval/
segment_downloading.rs

1//! Fetching segments of the archived history of Subspace Network.
2
3use crate::piece_getter::PieceGetter;
4use futures::StreamExt;
5use std::time::Duration;
6use subspace_archiving::archiver::Segment;
7use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
8use subspace_core_primitives::pieces::Piece;
9use subspace_core_primitives::segments::{
10    ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex,
11};
12use subspace_erasure_coding::ErasureCoding;
13use tokio::task::spawn_blocking;
14use tokio::time::sleep;
15use tracing::debug;
16
17/// The number of times we try to download a segment before giving up.
18/// This is a suggested default, callers can supply their own value if needed.
19pub const SEGMENT_DOWNLOAD_RETRIES: usize = 3;
20
21/// The amount of time we wait between segment download retries.
22/// This is a suggested default, callers can supply their own value if needed.
23pub const SEGMENT_DOWNLOAD_RETRY_DELAY: Duration = Duration::from_secs(10);
24
25/// Segment getter errors.
26#[derive(Debug, thiserror::Error)]
27pub enum SegmentDownloadingError {
28    /// Not enough pieces
29    #[error(
30        "Not enough ({downloaded_pieces}/{}) pieces for segment {segment_index}",
31        RecordedHistorySegment::NUM_RAW_RECORDS
32    )]
33    NotEnoughPieces {
34        /// The segment we were trying to download
35        segment_index: SegmentIndex,
36        /// Number of pieces that were downloaded
37        downloaded_pieces: usize,
38    },
39
40    /// Piece getter error
41    #[error("Piece getter error: {source}")]
42    PieceGetterError {
43        #[from]
44        source: anyhow::Error,
45    },
46
47    /// Segment reconstruction error
48    #[error("Segment reconstruction error: {source}")]
49    SegmentReconstruction {
50        #[from]
51        source: ReconstructorError,
52    },
53
54    /// Segment decoding error
55    #[error("Segment data decoding error: {source}")]
56    SegmentDecoding {
57        #[from]
58        source: parity_scale_codec::Error,
59    },
60}
61
62/// Concurrently downloads the pieces for `segment_index`, and reconstructs the segment.
63pub async fn download_segment<PG>(
64    segment_index: SegmentIndex,
65    piece_getter: &PG,
66    erasure_coding: ErasureCoding,
67    retries: usize,
68    retry_delay: Option<Duration>,
69) -> Result<Segment, SegmentDownloadingError>
70where
71    PG: PieceGetter,
72{
73    let reconstructor = Reconstructor::new(erasure_coding);
74
75    let segment_pieces =
76        download_segment_pieces(segment_index, piece_getter, retries, retry_delay).await?;
77
78    let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
79        .await
80        .expect("Panic if blocking task panicked")?;
81
82    Ok(segment)
83}
84
85/// Downloads pieces of a segment so that segment can be reconstructed afterward.
86/// Repeatedly attempts to download pieces until the required number of pieces is reached.
87///
88/// Prefers source pieces if available, on error returns the number of available pieces.
89pub async fn download_segment_pieces<PG>(
90    segment_index: SegmentIndex,
91    piece_getter: &PG,
92    retries: usize,
93    retry_delay: Option<Duration>,
94) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
95where
96    PG: PieceGetter,
97{
98    let mut existing_pieces = [const { None }; ArchivedHistorySegment::NUM_PIECES];
99
100    for retry in 0..=retries {
101        match download_missing_segment_pieces(segment_index, piece_getter, existing_pieces).await {
102            Ok(segment_pieces) => return Ok(segment_pieces),
103            Err((error, incomplete_segment_pieces)) => {
104                existing_pieces = incomplete_segment_pieces;
105
106                if retry < retries {
107                    debug!(
108                        %segment_index,
109                        %retry,
110                        ?retry_delay,
111                        ?error,
112                        "Failed to download segment pieces once, retrying"
113                    );
114                    if let Some(retry_delay) = retry_delay {
115                        // Wait before retrying to give the node a chance to find other peers
116                        sleep(retry_delay).await;
117                    }
118                }
119            }
120        }
121    }
122
123    debug!(
124        %segment_index,
125        %retries,
126        "Failed to download segment pieces"
127    );
128
129    Err(SegmentDownloadingError::NotEnoughPieces {
130        segment_index,
131        downloaded_pieces: existing_pieces
132            .iter()
133            .filter(|piece| piece.is_some())
134            .count(),
135    })
136}
137
138/// Tries to download pieces of a segment once, so that segment can be reconstructed afterward.
139/// Pass existing pieces in `existing_pieces`, or use
140/// `[const { None }; ArchivedHistorySegment::NUM_PIECES]` if no pieces are available.
141///
142/// Prefers source pieces if available, on error returns the incomplete piece download (including
143/// existing pieces).
144async fn download_missing_segment_pieces<PG>(
145    segment_index: SegmentIndex,
146    piece_getter: &PG,
147    existing_pieces: [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
148) -> Result<
149    Vec<Option<Piece>>,
150    (
151        SegmentDownloadingError,
152        [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
153    ),
154>
155where
156    PG: PieceGetter,
157{
158    let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
159    let mut downloaded_pieces = existing_pieces
160        .iter()
161        .filter(|piece| piece.is_some())
162        .count();
163
164    // Debugging failure patterns in piece downloads
165    let mut first_success = None;
166    let mut last_success = None;
167    let mut first_failure = None;
168    let mut last_failure = None;
169
170    let mut segment_pieces = existing_pieces;
171
172    let mut pieces_iter = segment_index
173        .segment_piece_indexes_source_first()
174        .into_iter();
175
176    // Download in batches until we get enough or exhaust available pieces
177    while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
178        let piece_indices = pieces_iter
179            .by_ref()
180            .filter(|piece_index| segment_pieces[piece_index.position() as usize].is_none())
181            .take(required_pieces_number - downloaded_pieces)
182            .collect();
183
184        let mut received_segment_pieces = match piece_getter.get_pieces(piece_indices).await {
185            Ok(pieces) => pieces,
186            Err(error) => return Err((error.into(), segment_pieces)),
187        };
188
189        while let Some((piece_index, result)) = received_segment_pieces.next().await {
190            match result {
191                Ok(Some(piece)) => {
192                    downloaded_pieces += 1;
193                    segment_pieces
194                        .get_mut(piece_index.position() as usize)
195                        .expect("Piece position is by definition within segment; qed")
196                        .replace(piece);
197
198                    if first_success.is_none() {
199                        first_success = Some(piece_index.position());
200                    }
201                    last_success = Some(piece_index.position());
202                }
203                // We often see an error where 127 pieces are downloaded successfully, but the
204                // other 129 fail. It seems like 1 request in a 128 piece batch fails, then 128
205                // single piece requests are made, and also fail.
206                // Delaying requests after a failure gives the node a chance to find other peers.
207                Ok(None) => {
208                    debug!(%piece_index, "Piece was not found");
209                    if first_failure.is_none() {
210                        first_failure = Some(piece_index.position());
211                    }
212                    last_failure = Some(piece_index.position());
213                }
214                Err(error) => {
215                    debug!(%error, %piece_index, "Failed to get piece");
216                    if first_failure.is_none() {
217                        first_failure = Some(piece_index.position());
218                    }
219                    last_failure = Some(piece_index.position());
220                }
221            }
222        }
223    }
224
225    if downloaded_pieces < required_pieces_number {
226        debug!(
227            %segment_index,
228            %downloaded_pieces,
229            %required_pieces_number,
230            // Piece positions that succeeded/failed
231            ?first_success,
232            ?last_success,
233            ?first_failure,
234            ?last_failure,
235            "Failed to retrieve pieces for segment"
236        );
237
238        return Err((
239            SegmentDownloadingError::NotEnoughPieces {
240                segment_index,
241                downloaded_pieces: segment_pieces
242                    .iter()
243                    .filter(|piece| piece.is_some())
244                    .count(),
245            },
246            segment_pieces,
247        ));
248    }
249
250    Ok(segment_pieces.to_vec())
251}