subspace_data_retrieval/
piece_getter.rs

1//! Getting object pieces from the Subspace Distributed Storage Network, or various caches.
2
3use async_trait::async_trait;
4use futures::{stream, Stream, StreamExt};
5use std::fmt;
6use std::future::Future;
7use std::sync::Arc;
8use subspace_archiving::archiver::NewArchivedSegment;
9use subspace_core_primitives::pieces::{Piece, PieceIndex};
10
11/// Trait representing a way to get pieces
12#[async_trait]
13pub trait PieceGetter: fmt::Debug {
14    /// Get piece by index.
15    ///
16    /// Returns `Ok(None)` if the piece is not found.
17    /// Returns `Err(_)` if trying to get the piece caused an error.
18    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
19
20    /// Get pieces with provided indices.
21    ///
22    /// The number of elements in the returned stream is the same as the number of unique
23    /// `piece_indices`.
24    async fn get_pieces<'a>(
25        &'a self,
26        piece_indices: Vec<PieceIndex>,
27    ) -> anyhow::Result<
28        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
29    >;
30
31    /// Returns a piece getter that falls back to `other` if `self` does not return the piece.
32    /// Piece getters may need to be wrapped in `Arc` to be used with this method.
33    fn with_fallback<U>(self, other: U) -> FallbackPieceGetter<Self, U>
34    where
35        Self: Send + Sync + Sized,
36        U: PieceGetter + Send + Sync,
37    {
38        FallbackPieceGetter {
39            first: self,
40            second: other,
41        }
42    }
43}
44
45/// A piece getter that falls back to another piece getter if the first one does not return the piece.
46/// If both piece getters don't return the piece, returns the result of the second piece getter.
47#[derive(Debug)]
48pub struct FallbackPieceGetter<T, U>
49where
50    T: PieceGetter + Send + Sync,
51    U: PieceGetter + Send + Sync,
52{
53    first: T,
54    second: U,
55}
56
57#[async_trait]
58impl<T, U> PieceGetter for FallbackPieceGetter<T, U>
59where
60    T: PieceGetter + Send + Sync,
61    U: PieceGetter + Send + Sync,
62{
63    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
64        if let Ok(Some(piece)) = self.first.get_piece(piece_index).await {
65            Ok(Some(piece))
66        } else {
67            self.second.get_piece(piece_index).await
68        }
69    }
70
71    async fn get_pieces<'a>(
72        &'a self,
73        piece_indices: Vec<PieceIndex>,
74    ) -> anyhow::Result<
75        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
76    > {
77        let first_stream = self.first.get_pieces(piece_indices.clone()).await;
78
79        let fallback_stream = if let Ok(first_stream) = first_stream {
80            // For each missing piece, try the second piece getter
81            Box::new(Box::pin(first_stream.then(
82                |(piece_index, piece_result)| fallback_to(piece_index, piece_result, &self.second),
83            )))
84                as Box<
85                    dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin,
86                >
87        } else {
88            // If no pieces are available, just use the second piece getter
89            self.second.get_pieces(piece_indices).await?
90        };
91
92        Ok(fallback_stream)
93    }
94}
95
96/// A heler function which falls back to another piece getter if the first one does not return the
97/// piece.
98///
99/// We can't use an async closure, because async closures eagerly capture lifetimes.
100async fn fallback_to<U>(
101    piece_index: PieceIndex,
102    piece_result: anyhow::Result<Option<Piece>>,
103    second: &U,
104) -> (PieceIndex, anyhow::Result<Option<Piece>>)
105where
106    U: PieceGetter,
107{
108    if let Ok(Some(piece)) = piece_result {
109        return (piece_index, Ok(Some(piece)));
110    }
111
112    (piece_index, second.get_piece(piece_index).await)
113}
114
115// Generic wrapper methods
116#[async_trait]
117impl<T> PieceGetter for Arc<T>
118where
119    T: PieceGetter + Send + Sync + ?Sized,
120{
121    #[inline]
122    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
123        self.as_ref().get_piece(piece_index).await
124    }
125
126    #[inline]
127    async fn get_pieces<'a>(
128        &'a self,
129        piece_indices: Vec<PieceIndex>,
130    ) -> anyhow::Result<
131        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
132    > {
133        self.as_ref().get_pieces(piece_indices).await
134    }
135}
136
137#[async_trait]
138impl<T> PieceGetter for Box<T>
139where
140    T: PieceGetter + Send + Sync + ?Sized,
141{
142    #[inline]
143    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
144        self.as_ref().get_piece(piece_index).await
145    }
146
147    #[inline]
148    async fn get_pieces<'a>(
149        &'a self,
150        piece_indices: Vec<PieceIndex>,
151    ) -> anyhow::Result<
152        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
153    > {
154        self.as_ref().get_pieces(piece_indices).await
155    }
156}
157
158#[async_trait]
159impl<T> PieceGetter for Option<T>
160where
161    T: PieceGetter + Send + Sync,
162{
163    #[inline]
164    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
165        if let Some(piece_getter) = self.as_ref() {
166            piece_getter.get_piece(piece_index).await
167        } else {
168            Ok(None)
169        }
170    }
171
172    #[inline]
173    async fn get_pieces<'a>(
174        &'a self,
175        piece_indices: Vec<PieceIndex>,
176    ) -> anyhow::Result<
177        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
178    > {
179        if let Some(piece_getter) = self.as_ref() {
180            piece_getter.get_pieces(piece_indices).await
181        } else {
182            // The values will all be `Ok(None)`, but we need a stream of them
183            get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
184        }
185    }
186}
187
188// Convenience methods, mainly used in testing
189#[async_trait]
190impl PieceGetter for NewArchivedSegment {
191    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
192        if piece_index.segment_index() == self.segment_header.segment_index() {
193            return Ok(Some(
194                self.pieces
195                    .pieces()
196                    .nth(piece_index.position() as usize)
197                    .expect("Piece position always exists in a segment; qed"),
198            ));
199        }
200
201        Ok(None)
202    }
203
204    async fn get_pieces<'a>(
205        &'a self,
206        piece_indices: Vec<PieceIndex>,
207    ) -> anyhow::Result<
208        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
209    > {
210        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
211    }
212}
213
214// Used for piece caches
215#[async_trait]
216impl PieceGetter for (PieceIndex, Piece) {
217    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
218        if self.0 == piece_index {
219            return Ok(Some(self.1.clone()));
220        }
221
222        Ok(None)
223    }
224
225    async fn get_pieces<'a>(
226        &'a self,
227        piece_indices: Vec<PieceIndex>,
228    ) -> anyhow::Result<
229        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
230    > {
231        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
232    }
233}
234
235#[async_trait]
236impl PieceGetter for Vec<(PieceIndex, Piece)> {
237    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
238        Ok(self.iter().find_map(|(index, piece)| {
239            if *index == piece_index {
240                Some(piece.clone())
241            } else {
242                None
243            }
244        }))
245    }
246
247    async fn get_pieces<'a>(
248        &'a self,
249        piece_indices: Vec<PieceIndex>,
250    ) -> anyhow::Result<
251        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
252    > {
253        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
254    }
255}
256
257/// A default implementation which gets each piece individually, using the `get_piece` async
258/// function.
259///
260/// This is mainly used for testing and caches. Most production implementations can fetch multiple
261/// pieces more efficiently.
262#[expect(clippy::type_complexity, reason = "type matches trait signature")]
263pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
264    // TODO: replace with AsyncFn(PieceIndex) -> anyhow::Result<Option<Piece>> once it stabilises
265    // https://github.com/rust-lang/rust/issues/62290
266    get_piece: Func,
267    piece_indices: PieceIndices,
268) -> anyhow::Result<
269    Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
270>
271where
272    PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
273    Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
274    Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
275{
276    Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
277        move |piece_index| {
278            let get_piece = get_piece.clone();
279            async move { (piece_index, get_piece(piece_index).await) }
280        },
281    ))))
282}