subspace_data_retrieval/
segment_downloading.rs1use crate::piece_getter::PieceGetter;
4use futures::StreamExt;
5use std::time::Duration;
6use subspace_archiving::archiver::Segment;
7use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
8use subspace_core_primitives::pieces::Piece;
9use subspace_core_primitives::segments::{
10 ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex,
11};
12use subspace_erasure_coding::ErasureCoding;
13use tokio::task::spawn_blocking;
14use tokio::time::sleep;
15use tracing::debug;
16
17pub const SEGMENT_DOWNLOAD_RETRIES: usize = 3;
20
21pub const SEGMENT_DOWNLOAD_RETRY_DELAY: Duration = Duration::from_secs(10);
24
25#[derive(Debug, thiserror::Error)]
27pub enum SegmentDownloadingError {
28 #[error(
30 "Not enough ({downloaded_pieces}/{}) pieces for segment {segment_index}",
31 RecordedHistorySegment::NUM_RAW_RECORDS
32 )]
33 NotEnoughPieces {
34 segment_index: SegmentIndex,
36 downloaded_pieces: usize,
38 },
39
40 #[error("Piece getter error: {source}")]
42 PieceGetterError {
43 #[from]
44 source: anyhow::Error,
45 },
46
47 #[error("Segment reconstruction error: {source}")]
49 SegmentReconstruction {
50 #[from]
51 source: ReconstructorError,
52 },
53
54 #[error("Segment data decoding error: {source}")]
56 SegmentDecoding {
57 #[from]
58 source: parity_scale_codec::Error,
59 },
60}
61
62pub async fn download_segment<PG>(
64 segment_index: SegmentIndex,
65 piece_getter: &PG,
66 erasure_coding: ErasureCoding,
67 retries: usize,
68 retry_delay: Option<Duration>,
69) -> Result<Segment, SegmentDownloadingError>
70where
71 PG: PieceGetter,
72{
73 let reconstructor = Reconstructor::new(erasure_coding);
74
75 let segment_pieces =
76 download_segment_pieces(segment_index, piece_getter, retries, retry_delay).await?;
77
78 let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
79 .await
80 .expect("Panic if blocking task panicked")?;
81
82 Ok(segment)
83}
84
85pub async fn download_segment_pieces<PG>(
90 segment_index: SegmentIndex,
91 piece_getter: &PG,
92 retries: usize,
93 retry_delay: Option<Duration>,
94) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
95where
96 PG: PieceGetter,
97{
98 let mut existing_pieces = [const { None }; ArchivedHistorySegment::NUM_PIECES];
99
100 for retry in 0..=retries {
101 match download_missing_segment_pieces(segment_index, piece_getter, existing_pieces).await {
102 Ok(segment_pieces) => return Ok(segment_pieces),
103 Err((error, incomplete_segment_pieces)) => {
104 existing_pieces = incomplete_segment_pieces;
105
106 if retry < retries {
107 debug!(
108 %segment_index,
109 %retry,
110 ?retry_delay,
111 ?error,
112 "Failed to download segment pieces once, retrying"
113 );
114 if let Some(retry_delay) = retry_delay {
115 sleep(retry_delay).await;
117 }
118 }
119 }
120 }
121 }
122
123 debug!(
124 %segment_index,
125 %retries,
126 "Failed to download segment pieces"
127 );
128
129 Err(SegmentDownloadingError::NotEnoughPieces {
130 segment_index,
131 downloaded_pieces: existing_pieces
132 .iter()
133 .filter(|piece| piece.is_some())
134 .count(),
135 })
136}
137
138async fn download_missing_segment_pieces<PG>(
145 segment_index: SegmentIndex,
146 piece_getter: &PG,
147 existing_pieces: [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
148) -> Result<
149 Vec<Option<Piece>>,
150 (
151 SegmentDownloadingError,
152 [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
153 ),
154>
155where
156 PG: PieceGetter,
157{
158 let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
159 let mut downloaded_pieces = existing_pieces
160 .iter()
161 .filter(|piece| piece.is_some())
162 .count();
163
164 let mut first_success = None;
166 let mut last_success = None;
167 let mut first_failure = None;
168 let mut last_failure = None;
169
170 let mut segment_pieces = existing_pieces;
171
172 let mut pieces_iter = segment_index
173 .segment_piece_indexes_source_first()
174 .into_iter();
175
176 while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
178 let piece_indices = pieces_iter
179 .by_ref()
180 .filter(|piece_index| segment_pieces[piece_index.position() as usize].is_none())
181 .take(required_pieces_number - downloaded_pieces)
182 .collect();
183
184 let mut received_segment_pieces = match piece_getter.get_pieces(piece_indices).await {
185 Ok(pieces) => pieces,
186 Err(error) => return Err((error.into(), segment_pieces)),
187 };
188
189 while let Some((piece_index, result)) = received_segment_pieces.next().await {
190 match result {
191 Ok(Some(piece)) => {
192 downloaded_pieces += 1;
193 segment_pieces
194 .get_mut(piece_index.position() as usize)
195 .expect("Piece position is by definition within segment; qed")
196 .replace(piece);
197
198 if first_success.is_none() {
199 first_success = Some(piece_index.position());
200 }
201 last_success = Some(piece_index.position());
202 }
203 Ok(None) => {
208 debug!(%piece_index, "Piece was not found");
209 if first_failure.is_none() {
210 first_failure = Some(piece_index.position());
211 }
212 last_failure = Some(piece_index.position());
213 }
214 Err(error) => {
215 debug!(%error, %piece_index, "Failed to get piece");
216 if first_failure.is_none() {
217 first_failure = Some(piece_index.position());
218 }
219 last_failure = Some(piece_index.position());
220 }
221 }
222 }
223 }
224
225 if downloaded_pieces < required_pieces_number {
226 debug!(
227 %segment_index,
228 %downloaded_pieces,
229 %required_pieces_number,
230 ?first_success,
232 ?last_success,
233 ?first_failure,
234 ?last_failure,
235 "Failed to retrieve pieces for segment"
236 );
237
238 return Err((
239 SegmentDownloadingError::NotEnoughPieces {
240 segment_index,
241 downloaded_pieces: segment_pieces
242 .iter()
243 .filter(|piece| piece.is_some())
244 .count(),
245 },
246 segment_pieces,
247 ));
248 }
249
250 Ok(segment_pieces.to_vec())
251}