subspace_data_retrieval/
piece_getter.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//! Getting object pieces from the Subspace Distributed Storage Network, or various caches.

use async_trait::async_trait;
use futures::{stream, Stream, StreamExt};
use std::fmt;
use std::future::Future;
use std::sync::Arc;
use subspace_archiving::archiver::NewArchivedSegment;
use subspace_core_primitives::pieces::{Piece, PieceIndex};

/// Trait representing a way to get pieces
#[async_trait]
pub trait PieceGetter: fmt::Debug {
    /// Get piece by index.
    ///
    /// Returns `Ok(None)` if the piece is not found.
    /// Returns `Err(_)` if trying to get the piece caused an error.
    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;

    /// Get pieces with provided indices.
    ///
    /// The number of elements in the returned stream is the same as the number of unique
    /// `piece_indices`.
    async fn get_pieces<'a>(
        &'a self,
        piece_indices: Vec<PieceIndex>,
    ) -> anyhow::Result<
        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
    >;
}

#[async_trait]
impl<T> PieceGetter for Arc<T>
where
    T: PieceGetter + Send + Sync + ?Sized,
{
    #[inline]
    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
        self.as_ref().get_piece(piece_index).await
    }

    #[inline]
    async fn get_pieces<'a>(
        &'a self,
        piece_indices: Vec<PieceIndex>,
    ) -> anyhow::Result<
        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
    > {
        self.as_ref().get_pieces(piece_indices).await
    }
}

// Convenience methods, mainly used in testing
#[async_trait]
impl PieceGetter for NewArchivedSegment {
    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
        if piece_index.segment_index() == self.segment_header.segment_index() {
            return Ok(Some(
                self.pieces
                    .pieces()
                    .nth(piece_index.position() as usize)
                    .expect("Piece position always exists in a segment; qed"),
            ));
        }

        Ok(None)
    }

    async fn get_pieces<'a>(
        &'a self,
        piece_indices: Vec<PieceIndex>,
    ) -> anyhow::Result<
        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
    > {
        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
    }
}

#[async_trait]
impl PieceGetter for (PieceIndex, Piece) {
    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
        if self.0 == piece_index {
            return Ok(Some(self.1.clone()));
        }

        Ok(None)
    }

    async fn get_pieces<'a>(
        &'a self,
        piece_indices: Vec<PieceIndex>,
    ) -> anyhow::Result<
        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
    > {
        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
    }
}

#[async_trait]
impl PieceGetter for Vec<(PieceIndex, Piece)> {
    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
        Ok(self.iter().find_map(|(index, piece)| {
            if *index == piece_index {
                Some(piece.clone())
            } else {
                None
            }
        }))
    }

    async fn get_pieces<'a>(
        &'a self,
        piece_indices: Vec<PieceIndex>,
    ) -> anyhow::Result<
        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
    > {
        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
    }
}

/// A default implementation which gets each piece individually, using the `get_piece` async
/// function.
///
/// This is mainly used for testing, most production implementations can fetch multiple pieces more
/// efficiently.
#[expect(clippy::type_complexity, reason = "type matches trait signature")]
pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
    // TODO: replace with AsyncFn(PieceIndex) -> anyhow::Result<Option<Piece>> once it stabilises
    // https://github.com/rust-lang/rust/issues/62290
    get_piece: Func,
    piece_indices: PieceIndices,
) -> anyhow::Result<
    Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
    PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
    Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
    Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
{
    Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
        move |piece_index| {
            let get_piece = get_piece.clone();
            async move { (piece_index, get_piece(piece_index).await) }
        },
    ))))
}