subspace_data_retrieval/
segment_downloading.rsuse crate::piece_getter::PieceGetter;
use futures::StreamExt;
use subspace_archiving::archiver::Segment;
use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
use subspace_core_primitives::pieces::Piece;
use subspace_core_primitives::segments::{
ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex,
};
use subspace_erasure_coding::ErasureCoding;
use tokio::task::spawn_blocking;
use tracing::debug;
#[derive(Debug, thiserror::Error)]
pub enum SegmentDownloadingError {
#[error("Not enough ({downloaded_pieces}) pieces")]
NotEnoughPieces {
downloaded_pieces: usize,
},
#[error("Piece getter error: {source}")]
PieceGetterError {
#[from]
source: anyhow::Error,
},
#[error("Segment reconstruction error: {source}")]
SegmentReconstruction {
#[from]
source: ReconstructorError,
},
#[error("Segment data decoding error: {source}")]
SegmentDecoding {
#[from]
source: parity_scale_codec::Error,
},
}
pub async fn download_segment<PG>(
segment_index: SegmentIndex,
piece_getter: &PG,
erasure_coding: ErasureCoding,
) -> Result<Segment, SegmentDownloadingError>
where
PG: PieceGetter,
{
let reconstructor = Reconstructor::new(erasure_coding);
let segment_pieces = download_segment_pieces(segment_index, piece_getter).await?;
let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
.await
.expect("Panic if blocking task panicked")?;
Ok(segment)
}
pub async fn download_segment_pieces<PG>(
segment_index: SegmentIndex,
piece_getter: &PG,
) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
where
PG: PieceGetter,
{
let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
let mut downloaded_pieces = 0_usize;
let mut segment_pieces = vec![None::<Piece>; ArchivedHistorySegment::NUM_PIECES];
let mut pieces_iter = segment_index
.segment_piece_indexes_source_first()
.into_iter();
while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
let piece_indices = pieces_iter
.by_ref()
.take(required_pieces_number - downloaded_pieces)
.collect();
let mut received_segment_pieces = piece_getter.get_pieces(piece_indices).await?;
while let Some((piece_index, result)) = received_segment_pieces.next().await {
match result {
Ok(Some(piece)) => {
downloaded_pieces += 1;
segment_pieces
.get_mut(piece_index.position() as usize)
.expect("Piece position is by definition within segment; qed")
.replace(piece);
}
Ok(None) => {
debug!(%piece_index, "Piece was not found");
}
Err(error) => {
debug!(%error, %piece_index, "Failed to get piece");
}
}
}
}
if downloaded_pieces < required_pieces_number {
debug!(
%segment_index,
%downloaded_pieces,
%required_pieces_number,
"Failed to retrieve pieces for segment"
);
return Err(SegmentDownloadingError::NotEnoughPieces { downloaded_pieces });
}
Ok(segment_pieces)
}