subspace_data_retrieval/
piece_getter.rsuse async_trait::async_trait;
use futures::{stream, Stream, StreamExt};
use std::fmt;
use std::future::Future;
use std::sync::Arc;
use subspace_archiving::archiver::NewArchivedSegment;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
#[async_trait]
pub trait PieceGetter: fmt::Debug {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
async fn get_pieces<'a>(
&'a self,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>;
}
#[async_trait]
impl<T> PieceGetter for Arc<T>
where
T: PieceGetter + Send + Sync + ?Sized,
{
#[inline]
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
self.as_ref().get_piece(piece_index).await
}
#[inline]
async fn get_pieces<'a>(
&'a self,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
> {
self.as_ref().get_pieces(piece_indices).await
}
}
#[async_trait]
impl PieceGetter for NewArchivedSegment {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
if piece_index.segment_index() == self.segment_header.segment_index() {
return Ok(Some(
self.pieces
.pieces()
.nth(piece_index.position() as usize)
.expect("Piece position always exists in a segment; qed"),
));
}
Ok(None)
}
async fn get_pieces<'a>(
&'a self,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
> {
get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
}
}
#[async_trait]
impl PieceGetter for (PieceIndex, Piece) {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
if self.0 == piece_index {
return Ok(Some(self.1.clone()));
}
Ok(None)
}
async fn get_pieces<'a>(
&'a self,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
> {
get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
}
}
#[expect(clippy::type_complexity, reason = "type matches trait signature")]
pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
get_piece: Func,
piece_indices: PieceIndices,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
{
Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
move |piece_index| {
let get_piece = get_piece.clone();
async move { (piece_index, get_piece(piece_index).await) }
},
))))
}