1use async_trait::async_trait;
4use futures::{stream, Stream, StreamExt};
5use std::fmt;
6use std::future::Future;
7use std::sync::Arc;
8use subspace_archiving::archiver::NewArchivedSegment;
9use subspace_core_primitives::pieces::{Piece, PieceIndex};
10
11#[async_trait]
13pub trait PieceGetter: fmt::Debug {
14 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
19
20 async fn get_pieces<'a>(
25 &'a self,
26 piece_indices: Vec<PieceIndex>,
27 ) -> anyhow::Result<
28 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
29 >;
30
31 fn with_fallback<U>(self, other: U) -> FallbackPieceGetter<Self, U>
34 where
35 Self: Send + Sync + Sized,
36 U: PieceGetter + Send + Sync,
37 {
38 FallbackPieceGetter {
39 first: self,
40 second: other,
41 }
42 }
43}
44
45#[derive(Debug)]
48pub struct FallbackPieceGetter<T, U>
49where
50 T: PieceGetter + Send + Sync,
51 U: PieceGetter + Send + Sync,
52{
53 first: T,
54 second: U,
55}
56
57#[async_trait]
58impl<T, U> PieceGetter for FallbackPieceGetter<T, U>
59where
60 T: PieceGetter + Send + Sync,
61 U: PieceGetter + Send + Sync,
62{
63 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
64 if let Ok(Some(piece)) = self.first.get_piece(piece_index).await {
65 Ok(Some(piece))
66 } else {
67 self.second.get_piece(piece_index).await
68 }
69 }
70
71 async fn get_pieces<'a>(
72 &'a self,
73 piece_indices: Vec<PieceIndex>,
74 ) -> anyhow::Result<
75 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
76 > {
77 let first_stream = self.first.get_pieces(piece_indices.clone()).await;
78
79 let fallback_stream = if let Ok(first_stream) = first_stream {
80 Box::new(Box::pin(first_stream.then(
82 |(piece_index, piece_result)| fallback_to(piece_index, piece_result, &self.second),
83 )))
84 as Box<
85 dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin,
86 >
87 } else {
88 self.second.get_pieces(piece_indices).await?
90 };
91
92 Ok(fallback_stream)
93 }
94}
95
96async fn fallback_to<U>(
101 piece_index: PieceIndex,
102 piece_result: anyhow::Result<Option<Piece>>,
103 second: &U,
104) -> (PieceIndex, anyhow::Result<Option<Piece>>)
105where
106 U: PieceGetter,
107{
108 if let Ok(Some(piece)) = piece_result {
109 return (piece_index, Ok(Some(piece)));
110 }
111
112 (piece_index, second.get_piece(piece_index).await)
113}
114
115#[async_trait]
117impl<T> PieceGetter for Arc<T>
118where
119 T: PieceGetter + Send + Sync + ?Sized,
120{
121 #[inline]
122 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
123 self.as_ref().get_piece(piece_index).await
124 }
125
126 #[inline]
127 async fn get_pieces<'a>(
128 &'a self,
129 piece_indices: Vec<PieceIndex>,
130 ) -> anyhow::Result<
131 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
132 > {
133 self.as_ref().get_pieces(piece_indices).await
134 }
135}
136
137#[async_trait]
138impl<T> PieceGetter for Box<T>
139where
140 T: PieceGetter + Send + Sync + ?Sized,
141{
142 #[inline]
143 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
144 self.as_ref().get_piece(piece_index).await
145 }
146
147 #[inline]
148 async fn get_pieces<'a>(
149 &'a self,
150 piece_indices: Vec<PieceIndex>,
151 ) -> anyhow::Result<
152 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
153 > {
154 self.as_ref().get_pieces(piece_indices).await
155 }
156}
157
158#[async_trait]
159impl<T> PieceGetter for Option<T>
160where
161 T: PieceGetter + Send + Sync,
162{
163 #[inline]
164 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
165 if let Some(piece_getter) = self.as_ref() {
166 piece_getter.get_piece(piece_index).await
167 } else {
168 Ok(None)
169 }
170 }
171
172 #[inline]
173 async fn get_pieces<'a>(
174 &'a self,
175 piece_indices: Vec<PieceIndex>,
176 ) -> anyhow::Result<
177 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
178 > {
179 if let Some(piece_getter) = self.as_ref() {
180 piece_getter.get_pieces(piece_indices).await
181 } else {
182 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
184 }
185 }
186}
187
188#[async_trait]
190impl PieceGetter for NewArchivedSegment {
191 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
192 if piece_index.segment_index() == self.segment_header.segment_index() {
193 return Ok(Some(
194 self.pieces
195 .pieces()
196 .nth(piece_index.position() as usize)
197 .expect("Piece position always exists in a segment; qed"),
198 ));
199 }
200
201 Ok(None)
202 }
203
204 async fn get_pieces<'a>(
205 &'a self,
206 piece_indices: Vec<PieceIndex>,
207 ) -> anyhow::Result<
208 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
209 > {
210 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
211 }
212}
213
214#[async_trait]
216impl PieceGetter for (PieceIndex, Piece) {
217 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
218 if self.0 == piece_index {
219 return Ok(Some(self.1.clone()));
220 }
221
222 Ok(None)
223 }
224
225 async fn get_pieces<'a>(
226 &'a self,
227 piece_indices: Vec<PieceIndex>,
228 ) -> anyhow::Result<
229 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
230 > {
231 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
232 }
233}
234
235#[async_trait]
236impl PieceGetter for Vec<(PieceIndex, Piece)> {
237 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
238 Ok(self.iter().find_map(|(index, piece)| {
239 if *index == piece_index {
240 Some(piece.clone())
241 } else {
242 None
243 }
244 }))
245 }
246
247 async fn get_pieces<'a>(
248 &'a self,
249 piece_indices: Vec<PieceIndex>,
250 ) -> anyhow::Result<
251 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
252 > {
253 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
254 }
255}
256
257#[expect(clippy::type_complexity, reason = "type matches trait signature")]
263pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
264 get_piece: Func,
267 piece_indices: PieceIndices,
268) -> anyhow::Result<
269 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
270>
271where
272 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
273 Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
274 Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
275{
276 Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
277 move |piece_index| {
278 let get_piece = get_piece.clone();
279 async move { (piece_index, get_piece(piece_index).await) }
280 },
281 ))))
282}