subspace_data_retrieval/piece_fetcher.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
//! Fetching pieces of the archived history of Subspace Network.
use crate::object_fetcher::Error;
use crate::piece_getter::PieceGetter;
use futures::StreamExt;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use tracing::{debug, trace};
/// Concurrently downloads the exact pieces in `piece_indexes`, returning them in that order.
/// Each piece index must be unique.
///
/// If any piece can't be downloaded, returns an error.
// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks():
// <https://github.com/autonomys/subspace/blob/d71ca47e45e1b53cd2e472413caa23472a91cd74/crates/subspace-service/src/sync_from_dsn/import_blocks.rs#L236-L322>
pub async fn download_pieces<PG>(
piece_indexes: &Vec<PieceIndex>,
piece_getter: &PG,
) -> anyhow::Result<Vec<Piece>>
where
PG: PieceGetter,
{
debug!(
count = piece_indexes.len(),
?piece_indexes,
"Retrieving exact pieces"
);
// TODO:
// - if we're close to the number of pieces in a segment, or we can't find a piece, use segment downloading and piece
// reconstruction instead
// Currently most objects are limited to 4 pieces, so this isn't needed yet.
let mut received_pieces = piece_getter.get_pieces(piece_indexes.clone()).await?;
let mut pieces = Vec::new();
pieces.resize(piece_indexes.len(), Piece::default());
while let Some((piece_index, maybe_piece)) = received_pieces.next().await {
// We want exact pieces, so any errors are fatal.
let piece = maybe_piece?.ok_or(Error::PieceNotFound { piece_index })?;
let index_position = piece_indexes
.iter()
.position(|i| *i == piece_index)
.expect("get_pieces only returns indexes it was supplied; qed");
pieces[index_position] = piece;
}
trace!(
count = piece_indexes.len(),
?piece_indexes,
"Successfully retrieved exact pieces"
);
Ok(pieces)
}