subspace_data_retrieval/
segment_downloading.rs1use crate::piece_getter::PieceGetter;
4use futures::StreamExt;
5use std::time::Duration;
6use subspace_archiving::archiver::Segment;
7use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
8use subspace_core_primitives::pieces::Piece;
9use subspace_core_primitives::segments::{
10 ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex,
11};
12use subspace_erasure_coding::ErasureCoding;
13use tokio::task::spawn_blocking;
14use tokio::time::sleep;
15use tracing::debug;
16
17#[derive(Debug, thiserror::Error)]
19pub enum SegmentDownloadingError {
20 #[error(
22 "Not enough ({downloaded_pieces}/{}) pieces for segment {segment_index}",
23 RecordedHistorySegment::NUM_RAW_RECORDS
24 )]
25 NotEnoughPieces {
26 segment_index: SegmentIndex,
28 downloaded_pieces: usize,
30 },
31
32 #[error("Piece getter error: {source}")]
34 PieceGetterError {
35 #[from]
36 source: anyhow::Error,
37 },
38
39 #[error("Segment reconstruction error: {source}")]
41 SegmentReconstruction {
42 #[from]
43 source: ReconstructorError,
44 },
45
46 #[error("Segment data decoding error: {source}")]
48 SegmentDecoding {
49 #[from]
50 source: parity_scale_codec::Error,
51 },
52}
53
54pub async fn download_segment<PG>(
56 segment_index: SegmentIndex,
57 piece_getter: &PG,
58 erasure_coding: ErasureCoding,
59 retries: usize,
60 retry_delay: Option<Duration>,
61) -> Result<Segment, SegmentDownloadingError>
62where
63 PG: PieceGetter,
64{
65 let reconstructor = Reconstructor::new(erasure_coding);
66
67 let segment_pieces =
68 download_segment_pieces(segment_index, piece_getter, retries, retry_delay).await?;
69
70 let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
71 .await
72 .expect("Panic if blocking task panicked")?;
73
74 Ok(segment)
75}
76
77pub async fn download_segment_pieces<PG>(
82 segment_index: SegmentIndex,
83 piece_getter: &PG,
84 retries: usize,
85 retry_delay: Option<Duration>,
86) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
87where
88 PG: PieceGetter,
89{
90 let mut existing_pieces = [const { None }; ArchivedHistorySegment::NUM_PIECES];
91
92 for retry in 0..=retries {
93 match download_missing_segment_pieces(segment_index, piece_getter, existing_pieces).await {
94 Ok(segment_pieces) => return Ok(segment_pieces),
95 Err((error, incomplete_segment_pieces)) => {
96 existing_pieces = incomplete_segment_pieces;
97
98 if retry < retries {
99 debug!(
100 %segment_index,
101 %retry,
102 ?retry_delay,
103 ?error,
104 "Failed to download segment pieces once, retrying"
105 );
106 if let Some(retry_delay) = retry_delay {
107 sleep(retry_delay).await;
109 }
110 }
111 }
112 }
113 }
114
115 debug!(
116 %segment_index,
117 %retries,
118 "Failed to download segment pieces"
119 );
120
121 Err(SegmentDownloadingError::NotEnoughPieces {
122 segment_index,
123 downloaded_pieces: existing_pieces
124 .iter()
125 .filter(|piece| piece.is_some())
126 .count(),
127 })
128}
129
130async fn download_missing_segment_pieces<PG>(
137 segment_index: SegmentIndex,
138 piece_getter: &PG,
139 existing_pieces: [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
140) -> Result<
141 Vec<Option<Piece>>,
142 (
143 SegmentDownloadingError,
144 [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
145 ),
146>
147where
148 PG: PieceGetter,
149{
150 let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
151 let mut downloaded_pieces = existing_pieces
152 .iter()
153 .filter(|piece| piece.is_some())
154 .count();
155
156 let mut first_success = None;
158 let mut last_success = None;
159 let mut first_failure = None;
160 let mut last_failure = None;
161
162 let mut segment_pieces = existing_pieces;
163
164 let mut pieces_iter = segment_index
165 .segment_piece_indexes_source_first()
166 .into_iter()
167 .peekable();
168
169 while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
171 let piece_indices = pieces_iter
172 .by_ref()
173 .filter(|piece_index| segment_pieces[piece_index.position() as usize].is_none())
174 .take(required_pieces_number - downloaded_pieces)
175 .collect();
176
177 let mut received_segment_pieces = match piece_getter.get_pieces(piece_indices).await {
178 Ok(pieces) => pieces,
179 Err(error) => return Err((error.into(), segment_pieces)),
180 };
181
182 while let Some((piece_index, result)) = received_segment_pieces.next().await {
183 match result {
184 Ok(Some(piece)) => {
185 downloaded_pieces += 1;
186 segment_pieces
187 .get_mut(piece_index.position() as usize)
188 .expect("Piece position is by definition within segment; qed")
189 .replace(piece);
190
191 if first_success.is_none() {
192 first_success = Some(piece_index.position());
193 }
194 last_success = Some(piece_index.position());
195 }
196 Ok(None) => {
201 debug!(%piece_index, "Piece was not found");
202 if first_failure.is_none() {
203 first_failure = Some(piece_index.position());
204 }
205 last_failure = Some(piece_index.position());
206 }
207 Err(error) => {
208 debug!(%error, %piece_index, "Failed to get piece");
209 if first_failure.is_none() {
210 first_failure = Some(piece_index.position());
211 }
212 last_failure = Some(piece_index.position());
213 }
214 }
215 }
216 }
217
218 if downloaded_pieces < required_pieces_number {
219 debug!(
220 %segment_index,
221 %downloaded_pieces,
222 %required_pieces_number,
223 ?first_success,
225 ?last_success,
226 ?first_failure,
227 ?last_failure,
228 "Failed to retrieve pieces for segment"
229 );
230
231 return Err((
232 SegmentDownloadingError::NotEnoughPieces {
233 segment_index,
234 downloaded_pieces: segment_pieces
235 .iter()
236 .filter(|piece| piece.is_some())
237 .count(),
238 },
239 segment_pieces,
240 ));
241 }
242
243 Ok(segment_pieces.to_vec())
244}