subspace_data_retrieval/piece_fetcher.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
// Copyright (C) 2024 Subspace Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Fetching pieces of the archived history of Subspace Network.
use crate::object_fetcher::Error;
use crate::piece_getter::{BoxError, ObjectPieceGetter};
use futures::stream::FuturesOrdered;
use futures::TryStreamExt;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use tracing::{debug, trace};
/// Concurrently downloads the exact pieces in `piece_indexes`, returning them in that order.
/// Each piece index must be unique.
///
/// If any piece can't be downloaded, returns an error.
// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks():
// <https://github.com/autonomys/subspace/blob/d71ca47e45e1b53cd2e472413caa23472a91cd74/crates/subspace-service/src/sync_from_dsn/import_blocks.rs#L236-L322>
pub async fn download_pieces<PG>(
piece_indexes: &[PieceIndex],
piece_getter: &PG,
) -> Result<Vec<Piece>, BoxError>
where
PG: ObjectPieceGetter,
{
debug!(
count = piece_indexes.len(),
?piece_indexes,
"Retrieving exact pieces"
);
// TODO:
// - consider using a semaphore to limit the number of concurrent requests, like
// download_segment_pieces()
// - if we're close to the number of pieces in a segment, use segment downloading and piece
// reconstruction instead
// Currently most objects are limited to 4 pieces, so this isn't needed yet.
let received_pieces = piece_indexes
.iter()
.map(|piece_index| async move {
match piece_getter.get_piece(*piece_index).await {
Ok(Some(piece)) => {
trace!(?piece_index, "Piece request succeeded",);
Ok(piece)
}
Ok(None) => {
trace!(?piece_index, "Piece not found");
Err(Error::PieceNotFound {
piece_index: *piece_index,
}
.into())
}
Err(error) => {
trace!(
%error,
?piece_index,
"Piece request caused an error",
);
Err(error)
}
}
})
.collect::<FuturesOrdered<_>>();
// We want exact pieces, so any errors are fatal.
let received_pieces: Vec<Piece> = received_pieces.try_collect().await?;
trace!(
count = piece_indexes.len(),
?piece_indexes,
"Successfully retrieved exact pieces"
);
Ok(received_pieces)
}