subspace_data_retrieval/
segment_downloading.rs1use crate::piece_getter::PieceGetter;
4use futures::StreamExt;
5use subspace_archiving::archiver::Segment;
6use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
7use subspace_core_primitives::pieces::Piece;
8use subspace_core_primitives::segments::{
9 ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex,
10};
11use subspace_erasure_coding::ErasureCoding;
12use tokio::task::spawn_blocking;
13use tracing::debug;
14
15#[derive(Debug, thiserror::Error)]
17pub enum SegmentDownloadingError {
18 #[error(
20 "Not enough ({downloaded_pieces}/{}) pieces for segment {segment_index}",
21 RecordedHistorySegment::NUM_RAW_RECORDS
22 )]
23 NotEnoughPieces {
24 segment_index: SegmentIndex,
26 downloaded_pieces: usize,
28 },
29
30 #[error("Piece getter error: {source}")]
32 PieceGetterError {
33 #[from]
34 source: anyhow::Error,
35 },
36
37 #[error("Segment reconstruction error: {source}")]
39 SegmentReconstruction {
40 #[from]
41 source: ReconstructorError,
42 },
43
44 #[error("Segment data decoding error: {source}")]
46 SegmentDecoding {
47 #[from]
48 source: parity_scale_codec::Error,
49 },
50}
51
52pub async fn download_segment<PG>(
54 segment_index: SegmentIndex,
55 piece_getter: &PG,
56 erasure_coding: ErasureCoding,
57) -> Result<Segment, SegmentDownloadingError>
58where
59 PG: PieceGetter,
60{
61 let reconstructor = Reconstructor::new(erasure_coding);
62
63 let segment_pieces = download_segment_pieces(segment_index, piece_getter).await?;
64
65 let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
66 .await
67 .expect("Panic if blocking task panicked")?;
68
69 Ok(segment)
70}
71
72pub async fn download_segment_pieces<PG>(
76 segment_index: SegmentIndex,
77 piece_getter: &PG,
78) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
79where
80 PG: PieceGetter,
81{
82 let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
83 let mut downloaded_pieces = 0_usize;
84
85 let mut first_success = None;
87 let mut last_success = None;
88 let mut first_failure = None;
89 let mut last_failure = None;
90
91 let mut segment_pieces = vec![None::<Piece>; ArchivedHistorySegment::NUM_PIECES];
92
93 let mut pieces_iter = segment_index
94 .segment_piece_indexes_source_first()
95 .into_iter()
96 .peekable();
97
98 while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
100 let piece_indices = pieces_iter
101 .by_ref()
102 .take(required_pieces_number - downloaded_pieces)
103 .collect();
104
105 let mut received_segment_pieces = piece_getter.get_pieces(piece_indices).await?;
106
107 while let Some((piece_index, result)) = received_segment_pieces.next().await {
108 match result {
109 Ok(Some(piece)) => {
110 downloaded_pieces += 1;
111 segment_pieces
112 .get_mut(piece_index.position() as usize)
113 .expect("Piece position is by definition within segment; qed")
114 .replace(piece);
115
116 if first_success.is_none() {
117 first_success = Some(piece_index.position());
118 }
119 last_success = Some(piece_index.position());
120 }
121 Ok(None) => {
126 debug!(%piece_index, "Piece was not found");
127 if first_failure.is_none() {
128 first_failure = Some(piece_index.position());
129 }
130 last_failure = Some(piece_index.position());
131 }
132 Err(error) => {
133 debug!(%error, %piece_index, "Failed to get piece");
134 if first_failure.is_none() {
135 first_failure = Some(piece_index.position());
136 }
137 last_failure = Some(piece_index.position());
138 }
139 }
140 }
141 }
142
143 if downloaded_pieces < required_pieces_number {
144 debug!(
145 %segment_index,
146 %downloaded_pieces,
147 %required_pieces_number,
148 ?first_success,
150 ?last_success,
151 ?first_failure,
152 ?last_failure,
153 "Failed to retrieve pieces for segment"
154 );
155
156 return Err(SegmentDownloadingError::NotEnoughPieces {
157 segment_index,
158 downloaded_pieces,
159 });
160 }
161
162 Ok(segment_pieces)
163}