subspace_farmer/
farmer_piece_getter.rs

1//! Farmer-specific piece getter
2
3use crate::farm::plotted_pieces::PlottedPieces;
4use crate::farmer_cache::FarmerCaches;
5use crate::node_client::NodeClient;
6use async_lock::RwLock as AsyncRwLock;
7use async_trait::async_trait;
8use backoff::backoff::Backoff;
9use backoff::future::retry;
10use backoff::ExponentialBackoff;
11use futures::channel::mpsc;
12use futures::future::FusedFuture;
13use futures::stream::FuturesUnordered;
14use futures::{stream, FutureExt, Stream, StreamExt};
15use std::fmt;
16use std::hash::Hash;
17use std::pin::Pin;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use subspace_core_primitives::pieces::{Piece, PieceIndex};
22use subspace_data_retrieval::piece_getter::PieceGetter;
23use subspace_networking::utils::multihash::ToMultihash;
24use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
25use tracing::{debug, error, trace};
26
27pub mod piece_validator;
28
29const MAX_RANDOM_WALK_ROUNDS: usize = 15;
30
31/// Retry policy for getting pieces from DSN cache
32#[derive(Debug)]
33pub struct DsnCacheRetryPolicy {
34    /// Max number of retries when trying to get piece from DSN cache
35    pub max_retries: u16,
36    /// Exponential backoff between retries
37    pub backoff: ExponentialBackoff,
38}
39
40struct Inner<FarmIndex, PV, NC> {
41    piece_provider: PieceProvider<PV>,
42    farmer_caches: FarmerCaches,
43    node_client: NC,
44    plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
45    dsn_cache_retry_policy: DsnCacheRetryPolicy,
46}
47
48/// Farmer-specific piece getter.
49///
50/// Implements [`PieceGetter`] for plotting purposes, but useful outside of that as well.
51pub struct FarmerPieceGetter<FarmIndex, PV, NC> {
52    inner: Arc<Inner<FarmIndex, PV, NC>>,
53}
54
55impl<FarmIndex, PV, NC> fmt::Debug for FarmerPieceGetter<FarmIndex, PV, NC> {
56    #[inline]
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        f.debug_struct("FarmerPieceGetter").finish_non_exhaustive()
59    }
60}
61
62impl<FarmIndex, PV, NC> Clone for FarmerPieceGetter<FarmIndex, PV, NC> {
63    #[inline]
64    fn clone(&self) -> Self {
65        Self {
66            inner: Arc::clone(&self.inner),
67        }
68    }
69}
70
71impl<FarmIndex, PV, NC> FarmerPieceGetter<FarmIndex, PV, NC>
72where
73    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
74    usize: From<FarmIndex>,
75    PV: PieceValidator + Send + 'static,
76    NC: NodeClient,
77{
78    /// Create new instance
79    pub fn new(
80        piece_provider: PieceProvider<PV>,
81        farmer_caches: FarmerCaches,
82        node_client: NC,
83        plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
84        dsn_cache_retry_policy: DsnCacheRetryPolicy,
85    ) -> Self {
86        Self {
87            inner: Arc::new(Inner {
88                piece_provider,
89                farmer_caches,
90                node_client,
91                plotted_pieces,
92                dsn_cache_retry_policy,
93            }),
94        }
95    }
96
97    /// Fast way to get piece using various caches
98    pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
99        self.get_piece_fast_internal(piece_index).await
100    }
101
102    async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
103        let inner = &self.inner;
104
105        trace!(%piece_index, "Getting piece from farmer cache");
106        if let Some(piece) = inner
107            .farmer_caches
108            .get_piece(piece_index.to_multihash())
109            .await
110        {
111            trace!(%piece_index, "Got piece from farmer cache successfully");
112            return Some(piece);
113        }
114
115        // L2 piece acquisition
116        trace!(%piece_index, "Getting piece from DSN L2 cache");
117        if let Some(piece) = inner.piece_provider.get_piece_from_cache(piece_index).await {
118            trace!(%piece_index, "Got piece from DSN L2 cache");
119            inner
120                .farmer_caches
121                .maybe_store_additional_piece(piece_index, &piece)
122                .await;
123            return Some(piece);
124        }
125
126        // Try node's RPC before reaching to L1 (archival storage on DSN)
127        trace!(%piece_index, "Getting piece from node");
128        match inner.node_client.piece(piece_index).await {
129            Ok(Some(piece)) => {
130                trace!(%piece_index, "Got piece from node successfully");
131                inner
132                    .farmer_caches
133                    .maybe_store_additional_piece(piece_index, &piece)
134                    .await;
135                return Some(piece);
136            }
137            Ok(None) => {
138                // Nothing to do
139            }
140            Err(error) => {
141                error!(
142                    %error,
143                    %piece_index,
144                    "Failed to retrieve first segment piece from node"
145                );
146            }
147        }
148
149        None
150    }
151
152    /// Slow way to get piece using archival storage
153    pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
154        self.get_piece_slow_internal(piece_index).await
155    }
156
157    /// Slow way to get piece using archival storage
158    async fn get_piece_slow_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
159        let inner = &self.inner;
160
161        trace!(%piece_index, "Getting piece from local plot");
162        let maybe_read_piece_fut = inner
163            .plotted_pieces
164            .try_read()
165            .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));
166
167        if let Some(read_piece_fut) = maybe_read_piece_fut {
168            if let Some(piece) = read_piece_fut.await {
169                trace!(%piece_index, "Got piece from local plot successfully");
170                inner
171                    .farmer_caches
172                    .maybe_store_additional_piece(piece_index, &piece)
173                    .await;
174                return Some(piece);
175            }
176        }
177
178        // L1 piece acquisition
179        trace!(%piece_index, "Getting piece from DSN L1.");
180
181        let archival_storage_search_result = inner
182            .piece_provider
183            .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
184            .await;
185
186        if let Some(piece) = archival_storage_search_result {
187            trace!(%piece_index, "DSN L1 lookup succeeded");
188            inner
189                .farmer_caches
190                .maybe_store_additional_piece(piece_index, &piece)
191                .await;
192            return Some(piece);
193        }
194
195        None
196    }
197
198    /// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally
199    /// used [`Arc`]
200    pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
201        WeakFarmerPieceGetter {
202            inner: Arc::downgrade(&self.inner),
203        }
204    }
205}
206
207#[async_trait]
208impl<FarmIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, PV, NC>
209where
210    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
211    usize: From<FarmIndex>,
212    PV: PieceValidator + Send + 'static,
213    NC: NodeClient,
214{
215    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
216        {
217            let retries = AtomicU32::new(0);
218            let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
219            let mut backoff = self.inner.dsn_cache_retry_policy.backoff.clone();
220            backoff.reset();
221
222            let maybe_piece_fut = retry(backoff, || async {
223                let current_attempt = retries.fetch_add(1, Ordering::Relaxed);
224
225                if let Some(piece) = self.get_piece_fast_internal(piece_index).await {
226                    trace!(%piece_index, current_attempt, "Got piece fast");
227                    return Ok(Some(piece));
228                }
229                if current_attempt >= max_retries {
230                    if max_retries > 0 {
231                        debug!(
232                            %piece_index,
233                            current_attempt,
234                            max_retries,
235                            "Couldn't get a piece fast. No retries left"
236                        );
237                    }
238                    return Ok(None);
239                }
240
241                trace!(%piece_index, current_attempt, "Couldn't get a piece fast, retrying...");
242
243                Err(backoff::Error::transient("Couldn't get piece fast"))
244            });
245
246            if let Ok(Some(piece)) = maybe_piece_fut.await {
247                trace!(%piece_index, "Got piece from cache successfully");
248                return Ok(Some(piece));
249            }
250        };
251
252        if let Some(piece) = self.get_piece_slow_internal(piece_index).await {
253            return Ok(Some(piece));
254        }
255
256        debug!(
257            %piece_index,
258            "Cannot acquire piece: all methods yielded empty result"
259        );
260        Ok(None)
261    }
262
263    async fn get_pieces<'a>(
264        &'a self,
265        piece_indices: Vec<PieceIndex>,
266    ) -> anyhow::Result<
267        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
268    > {
269        let (tx, mut rx) = mpsc::unbounded();
270
271        let fut = async move {
272            let tx = &tx;
273
274            debug!("Getting pieces from farmer cache");
275            let mut pieces_not_found_in_farmer_cache = Vec::new();
276            let mut pieces_in_farmer_cache =
277                self.inner.farmer_caches.get_pieces(piece_indices).await;
278
279            while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await {
280                let Some(piece) = maybe_piece else {
281                    pieces_not_found_in_farmer_cache.push(piece_index);
282                    continue;
283                };
284                tx.unbounded_send((piece_index, Ok(Some(piece))))
285                    .expect("This future isn't polled after receiver is dropped; qed");
286            }
287
288            if pieces_not_found_in_farmer_cache.is_empty() {
289                return;
290            }
291
292            debug!(
293                remaining_piece_count = %pieces_not_found_in_farmer_cache.len(),
294                "Getting pieces from DSN cache"
295            );
296            let mut pieces_not_found_in_dsn_cache = Vec::new();
297            let mut pieces_in_dsn_cache = self
298                .inner
299                .piece_provider
300                .get_from_cache(pieces_not_found_in_farmer_cache)
301                .await;
302
303            while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await {
304                let Some(piece) = maybe_piece else {
305                    pieces_not_found_in_dsn_cache.push(piece_index);
306                    continue;
307                };
308                // TODO: Would be nice to have concurrency here
309                self.inner
310                    .farmer_caches
311                    .maybe_store_additional_piece(piece_index, &piece)
312                    .await;
313                tx.unbounded_send((piece_index, Ok(Some(piece))))
314                    .expect("This future isn't polled after receiver is dropped; qed");
315            }
316
317            if pieces_not_found_in_dsn_cache.is_empty() {
318                return;
319            }
320
321            debug!(
322                remaining_piece_count = %pieces_not_found_in_dsn_cache.len(),
323                "Getting pieces from node"
324            );
325            let pieces_not_found_on_node = pieces_not_found_in_dsn_cache
326                .into_iter()
327                .map(|piece_index| async move {
328                    match self.inner.node_client.piece(piece_index).await {
329                        Ok(Some(piece)) => {
330                            trace!(%piece_index, "Got piece from node successfully");
331                            self.inner
332                                .farmer_caches
333                                .maybe_store_additional_piece(piece_index, &piece)
334                                .await;
335
336                            tx.unbounded_send((piece_index, Ok(Some(piece))))
337                                .expect("This future isn't polled after receiver is dropped; qed");
338                            None
339                        }
340                        Ok(None) => Some(piece_index),
341                        Err(error) => {
342                            error!(
343                                %error,
344                                %piece_index,
345                                "Failed to retrieve first segment piece from node"
346                            );
347                            Some(piece_index)
348                        }
349                    }
350                })
351                .collect::<FuturesUnordered<_>>()
352                .filter_map(|maybe_piece_index| async move { maybe_piece_index })
353                .collect::<Vec<_>>()
354                .await;
355
356            if pieces_not_found_on_node.is_empty() {
357                return;
358            }
359
360            debug!(
361                remaining_piece_count = %pieces_not_found_on_node.len(),
362                "Some pieces were not easily reachable"
363            );
364            pieces_not_found_on_node
365                .into_iter()
366                .map(|piece_index| async move {
367                    let maybe_piece = self.get_piece_slow_internal(piece_index).await;
368
369                    tx.unbounded_send((piece_index, Ok(maybe_piece)))
370                        .expect("This future isn't polled after receiver is dropped; qed");
371                })
372                .collect::<FuturesUnordered<_>>()
373                // Simply drain everything
374                .for_each(|()| async {})
375                .await;
376        };
377        let mut fut = Box::pin(fut.fuse());
378
379        // Drive above future and stream back any pieces that were downloaded so far
380        Ok(Box::new(stream::poll_fn(move |cx| {
381            if !fut.is_terminated() {
382                // Result doesn't matter, we'll need to poll stream below anyway
383                let _ = fut.poll_unpin(cx);
384            }
385
386            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
387                return Poll::Ready(maybe_result);
388            }
389
390            // Exit will be done by the stream above
391            Poll::Pending
392        })))
393    }
394}
395
396/// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`]
397pub struct WeakFarmerPieceGetter<FarmIndex, PV, NC> {
398    inner: Weak<Inner<FarmIndex, PV, NC>>,
399}
400
401impl<FarmIndex, PV, NC> fmt::Debug for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
402    #[inline]
403    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404        f.debug_struct("WeakFarmerPieceGetter")
405            .finish_non_exhaustive()
406    }
407}
408
409impl<FarmIndex, PV, NC> Clone for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
410    #[inline]
411    fn clone(&self) -> Self {
412        Self {
413            inner: self.inner.clone(),
414        }
415    }
416}
417
418/// This wrapper allows us to return the stream, which in turn depends on `piece_getter` that was
419/// previously on the stack of the inner function. What this wrapper does is create a
420/// self-referential data structure, so we can move both together, while still implementing `Stream`
421/// trait as necessary.
422#[ouroboros::self_referencing]
423struct StreamWithPieceGetter<FarmIndex, PV, NC>
424where
425    FarmIndex: 'static,
426    PV: 'static,
427    NC: 'static,
428{
429    piece_getter: FarmerPieceGetter<FarmIndex, PV, NC>,
430    #[borrows(piece_getter)]
431    #[covariant]
432    stream:
433        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'this>,
434}
435
436impl<FarmIndex, PV, NC> Stream for StreamWithPieceGetter<FarmIndex, PV, NC>
437where
438    FarmIndex: 'static,
439    PV: 'static,
440    NC: 'static,
441{
442    type Item = (PieceIndex, anyhow::Result<Option<Piece>>);
443
444    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
445        self.get_mut()
446            .with_stream_mut(|stream| stream.poll_next_unpin(cx))
447    }
448}
449
450#[async_trait]
451impl<FarmIndex, PV, NC> PieceGetter for WeakFarmerPieceGetter<FarmIndex, PV, NC>
452where
453    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
454    usize: From<FarmIndex>,
455    PV: PieceValidator + Send + 'static,
456    NC: NodeClient,
457{
458    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
459        let Some(piece_getter) = self.upgrade() else {
460            debug!("Farmer piece getter upgrade didn't succeed");
461            return Ok(None);
462        };
463
464        piece_getter.get_piece(piece_index).await
465    }
466
467    async fn get_pieces<'a>(
468        &'a self,
469        piece_indices: Vec<PieceIndex>,
470    ) -> anyhow::Result<
471        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
472    > {
473        let Some(piece_getter) = self.upgrade() else {
474            debug!("Farmer piece getter upgrade didn't succeed");
475            return Ok(Box::new(stream::iter(
476                piece_indices
477                    .into_iter()
478                    .map(|piece_index| (piece_index, Ok(None))),
479            )));
480        };
481
482        // TODO: This is necessary due to more complex lifetimes not yet supported by ouroboros, see
483        //  https://github.com/someguynamedjosh/ouroboros/issues/112
484        let stream_with_piece_getter =
485            StreamWithPieceGetter::try_new_async_send(piece_getter, move |piece_getter| {
486                piece_getter.get_pieces(piece_indices)
487            })
488            .await?;
489
490        Ok(Box::new(stream_with_piece_getter))
491    }
492}
493
494impl<FarmIndex, PV, NC> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
495    /// Try to upgrade to [`FarmerPieceGetter`] if there is at least one other instance of it alive
496    pub fn upgrade(&self) -> Option<FarmerPieceGetter<FarmIndex, PV, NC>> {
497        Some(FarmerPieceGetter {
498            inner: self.inner.upgrade()?,
499        })
500    }
501}