subspace_farmer/
farmer_piece_getter.rs

1//! Farmer-specific piece getter
2
3use crate::farm::plotted_pieces::PlottedPieces;
4use crate::farmer_cache::FarmerCaches;
5use crate::node_client::NodeClient;
6use async_lock::RwLock as AsyncRwLock;
7use async_trait::async_trait;
8use backoff::ExponentialBackoff;
9use backoff::backoff::Backoff;
10use backoff::future::retry;
11use futures::channel::mpsc;
12use futures::future::FusedFuture;
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, Stream, StreamExt, stream};
15use std::fmt;
16use std::hash::Hash;
17use std::pin::Pin;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use subspace_core_primitives::pieces::{Piece, PieceIndex};
22use subspace_data_retrieval::piece_getter::PieceGetter;
23use subspace_networking::utils::multihash::ToMultihash;
24use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
25use tracing::{debug, error, trace};
26
27pub mod piece_validator;
28
29const MAX_RANDOM_WALK_ROUNDS: usize = 15;
30
31/// Retry policy for getting pieces from DSN cache
32#[derive(Debug)]
33pub struct DsnCacheRetryPolicy {
34    /// Max number of retries when trying to get piece from DSN cache
35    pub max_retries: u16,
36    /// Exponential backoff between retries
37    pub backoff: ExponentialBackoff,
38}
39
40struct Inner<FarmIndex, PV, NC> {
41    piece_provider: PieceProvider<PV>,
42    farmer_caches: FarmerCaches,
43    node_client: NC,
44    plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
45    dsn_cache_retry_policy: DsnCacheRetryPolicy,
46}
47
48/// Farmer-specific piece getter.
49///
50/// Implements [`PieceGetter`] for plotting purposes, but useful outside of that as well.
51pub struct FarmerPieceGetter<FarmIndex, PV, NC> {
52    inner: Arc<Inner<FarmIndex, PV, NC>>,
53}
54
55impl<FarmIndex, PV, NC> fmt::Debug for FarmerPieceGetter<FarmIndex, PV, NC> {
56    #[inline]
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        f.debug_struct("FarmerPieceGetter").finish_non_exhaustive()
59    }
60}
61
62impl<FarmIndex, PV, NC> Clone for FarmerPieceGetter<FarmIndex, PV, NC> {
63    #[inline]
64    fn clone(&self) -> Self {
65        Self {
66            inner: Arc::clone(&self.inner),
67        }
68    }
69}
70
71impl<FarmIndex, PV, NC> FarmerPieceGetter<FarmIndex, PV, NC>
72where
73    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
74    usize: From<FarmIndex>,
75    PV: PieceValidator + Send + 'static,
76    NC: NodeClient,
77{
78    /// Create new instance
79    pub fn new(
80        piece_provider: PieceProvider<PV>,
81        farmer_caches: FarmerCaches,
82        node_client: NC,
83        plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
84        dsn_cache_retry_policy: DsnCacheRetryPolicy,
85    ) -> Self {
86        Self {
87            inner: Arc::new(Inner {
88                piece_provider,
89                farmer_caches,
90                node_client,
91                plotted_pieces,
92                dsn_cache_retry_policy,
93            }),
94        }
95    }
96
97    /// Fast way to get piece using various caches
98    pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
99        self.get_piece_fast_internal(piece_index).await
100    }
101
102    async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
103        let inner = &self.inner;
104
105        trace!(%piece_index, "Getting piece from farmer cache");
106        if let Some(piece) = inner
107            .farmer_caches
108            .get_piece(piece_index.to_multihash())
109            .await
110        {
111            trace!(%piece_index, "Got piece from farmer cache successfully");
112            return Some(piece);
113        }
114
115        // L2 piece acquisition
116        trace!(%piece_index, "Getting piece from DSN L2 cache");
117        if let Some(piece) = inner.piece_provider.get_piece_from_cache(piece_index).await {
118            let added_to_cache = inner
119                .farmer_caches
120                .maybe_store_additional_piece(piece_index, &piece)
121                .await;
122            trace!(%piece_index, %added_to_cache, "Got piece from DSN L2 cache");
123            return Some(piece);
124        }
125
126        // Try node's RPC before reaching to L1 (archival storage on DSN)
127        trace!(%piece_index, "Getting piece from node");
128        match inner.node_client.piece(piece_index).await {
129            Ok(Some(piece)) => {
130                let added_to_cache = inner
131                    .farmer_caches
132                    .maybe_store_additional_piece(piece_index, &piece)
133                    .await;
134                trace!(%piece_index, %added_to_cache, "Got piece from node successfully");
135                return Some(piece);
136            }
137            Ok(None) => {
138                // Nothing to do
139            }
140            Err(error) => {
141                error!(
142                    %error,
143                    %piece_index,
144                    "Failed to retrieve first segment piece from node"
145                );
146            }
147        }
148
149        None
150    }
151
152    /// Slow way to get piece using archival storage
153    pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
154        self.get_piece_slow_internal(piece_index).await
155    }
156
157    /// Slow way to get piece using archival storage
158    async fn get_piece_slow_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
159        let inner = &self.inner;
160
161        trace!(%piece_index, "Getting piece from local plot");
162        let maybe_read_piece_fut = inner
163            .plotted_pieces
164            .try_read()
165            .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));
166
167        if let Some(read_piece_fut) = maybe_read_piece_fut
168            && let Some(piece) = read_piece_fut.await
169        {
170            let added_to_cache = inner
171                .farmer_caches
172                .maybe_store_additional_piece(piece_index, &piece)
173                .await;
174            trace!(%piece_index, %added_to_cache, "Got piece from local plot successfully");
175            return Some(piece);
176        }
177
178        // L1 piece acquisition
179        trace!(%piece_index, "Getting piece from DSN L1.");
180
181        let archival_storage_search_result = inner
182            .piece_provider
183            .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
184            .await;
185
186        if let Some(piece) = archival_storage_search_result {
187            let added_to_cache = inner
188                .farmer_caches
189                .maybe_store_additional_piece(piece_index, &piece)
190                .await;
191            trace!(%piece_index, %added_to_cache, "DSN L1 lookup succeeded");
192            return Some(piece);
193        }
194
195        None
196    }
197
198    /// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally
199    /// used [`Arc`]
200    pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
201        WeakFarmerPieceGetter {
202            inner: Arc::downgrade(&self.inner),
203        }
204    }
205}
206
207#[async_trait]
208impl<FarmIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, PV, NC>
209where
210    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
211    usize: From<FarmIndex>,
212    PV: PieceValidator + Send + 'static,
213    NC: NodeClient,
214{
215    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
216        {
217            let retries = AtomicU32::new(0);
218            let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
219            let mut backoff = self.inner.dsn_cache_retry_policy.backoff.clone();
220            backoff.reset();
221
222            let maybe_piece_fut = retry(backoff, || async {
223                let current_attempt = retries.fetch_add(1, Ordering::Relaxed);
224
225                if let Some(piece) = self.get_piece_fast_internal(piece_index).await {
226                    trace!(%piece_index, current_attempt, "Got piece fast");
227                    return Ok(Some(piece));
228                }
229                if current_attempt >= max_retries {
230                    if max_retries > 0 {
231                        debug!(
232                            %piece_index,
233                            current_attempt,
234                            max_retries,
235                            "Couldn't get a piece fast. No retries left"
236                        );
237                    }
238                    return Ok(None);
239                }
240
241                trace!(%piece_index, current_attempt, "Couldn't get a piece fast, retrying...");
242
243                Err(backoff::Error::transient("Couldn't get piece fast"))
244            });
245
246            if let Ok(Some(piece)) = maybe_piece_fut.await {
247                trace!(%piece_index, "Got piece from cache successfully");
248                return Ok(Some(piece));
249            }
250        };
251
252        if let Some(piece) = self.get_piece_slow_internal(piece_index).await {
253            return Ok(Some(piece));
254        }
255
256        debug!(
257            %piece_index,
258            "Cannot acquire piece: all methods yielded empty result"
259        );
260        Ok(None)
261    }
262
263    async fn get_pieces<'a>(
264        &'a self,
265        piece_indices: Vec<PieceIndex>,
266    ) -> anyhow::Result<
267        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
268    > {
269        let (tx, mut rx) = mpsc::unbounded();
270
271        let fut = async move {
272            let tx = &tx;
273
274            let piece_count = piece_indices.len();
275            debug!(%piece_count, "Getting pieces from farmer cache");
276            let mut pieces_not_found_in_farmer_cache = Vec::new();
277            let mut pieces_in_farmer_cache =
278                self.inner.farmer_caches.get_pieces(piece_indices).await;
279
280            while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await {
281                let Some(piece) = maybe_piece else {
282                    pieces_not_found_in_farmer_cache.push(piece_index);
283                    continue;
284                };
285                tx.unbounded_send((piece_index, Ok(Some(piece))))
286                    .expect("This future isn't polled after receiver is dropped; qed");
287            }
288
289            if pieces_not_found_in_farmer_cache.is_empty() {
290                return;
291            }
292
293            debug!(
294                remaining_piece_count = %pieces_not_found_in_farmer_cache.len(),
295                %piece_count,
296                "Getting pieces from DSN cache",
297            );
298            let mut pieces_not_found_in_dsn_cache = Vec::new();
299            let mut pieces_in_dsn_cache = self
300                .inner
301                .piece_provider
302                .get_from_cache(pieces_not_found_in_farmer_cache)
303                .await;
304
305            while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await {
306                let Some(piece) = maybe_piece else {
307                    pieces_not_found_in_dsn_cache.push(piece_index);
308                    continue;
309                };
310                // TODO: Would be nice to have concurrency here
311                let added_to_cache = self
312                    .inner
313                    .farmer_caches
314                    .maybe_store_additional_piece(piece_index, &piece)
315                    .await;
316                trace!(%piece_index, %added_to_cache, "Got piece from DSN cache successfully");
317                tx.unbounded_send((piece_index, Ok(Some(piece))))
318                    .expect("This future isn't polled after receiver is dropped; qed");
319            }
320
321            if pieces_not_found_in_dsn_cache.is_empty() {
322                return;
323            }
324
325            debug!(
326                remaining_piece_count = %pieces_not_found_in_dsn_cache.len(),
327                %piece_count,
328                "Getting pieces from node",
329            );
330            let pieces_not_found_on_node = pieces_not_found_in_dsn_cache
331                .into_iter()
332                .map(|piece_index| async move {
333                    match self.inner.node_client.piece(piece_index).await {
334                        Ok(Some(piece)) => {
335                            let added_to_cache = self.inner
336                                .farmer_caches
337                                .maybe_store_additional_piece(piece_index, &piece)
338                                .await;
339                            trace!(%piece_index, %added_to_cache, "Got piece from node successfully");
340
341                            tx.unbounded_send((piece_index, Ok(Some(piece))))
342                                .expect("This future isn't polled after receiver is dropped; qed");
343                            None
344                        }
345                        Ok(None) => Some(piece_index),
346                        Err(error) => {
347                            error!(
348                                %error,
349                                %piece_index,
350                                "Failed to retrieve first segment piece from node"
351                            );
352                            Some(piece_index)
353                        }
354                    }
355                })
356                .collect::<FuturesUnordered<_>>()
357                .filter_map(|maybe_piece_index| async move { maybe_piece_index })
358                .collect::<Vec<_>>()
359                .await;
360
361            if pieces_not_found_on_node.is_empty() {
362                return;
363            }
364
365            debug!(
366                remaining_piece_count = %pieces_not_found_on_node.len(),
367                %piece_count,
368                "Some pieces were not easily reachable",
369            );
370            pieces_not_found_on_node
371                .into_iter()
372                .map(|piece_index| async move {
373                    let maybe_piece = self.get_piece_slow_internal(piece_index).await;
374
375                    tx.unbounded_send((piece_index, Ok(maybe_piece)))
376                        .expect("This future isn't polled after receiver is dropped; qed");
377                })
378                .collect::<FuturesUnordered<_>>()
379                // Simply drain everything
380                .for_each(|()| async {})
381                .await;
382        };
383        let mut fut = Box::pin(fut.fuse());
384
385        // Drive above future and stream back any pieces that were downloaded so far
386        Ok(Box::new(stream::poll_fn(move |cx| {
387            if !fut.is_terminated() {
388                // Result doesn't matter, we'll need to poll stream below anyway
389                let _ = fut.poll_unpin(cx);
390            }
391
392            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
393                return Poll::Ready(maybe_result);
394            }
395
396            // Exit will be done by the stream above
397            Poll::Pending
398        })))
399    }
400}
401
402/// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`]
403pub struct WeakFarmerPieceGetter<FarmIndex, PV, NC> {
404    inner: Weak<Inner<FarmIndex, PV, NC>>,
405}
406
407impl<FarmIndex, PV, NC> fmt::Debug for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
408    #[inline]
409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410        f.debug_struct("WeakFarmerPieceGetter")
411            .finish_non_exhaustive()
412    }
413}
414
415impl<FarmIndex, PV, NC> Clone for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
416    #[inline]
417    fn clone(&self) -> Self {
418        Self {
419            inner: self.inner.clone(),
420        }
421    }
422}
423
424/// This wrapper allows us to return the stream, which in turn depends on `piece_getter` that was
425/// previously on the stack of the inner function. What this wrapper does is create a
426/// self-referential data structure, so we can move both together, while still implementing `Stream`
427/// trait as necessary.
428#[ouroboros::self_referencing]
429struct StreamWithPieceGetter<FarmIndex, PV, NC>
430where
431    FarmIndex: 'static,
432    PV: 'static,
433    NC: 'static,
434{
435    piece_getter: FarmerPieceGetter<FarmIndex, PV, NC>,
436    #[borrows(piece_getter)]
437    #[covariant]
438    stream:
439        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'this>,
440}
441
442impl<FarmIndex, PV, NC> Stream for StreamWithPieceGetter<FarmIndex, PV, NC>
443where
444    FarmIndex: 'static,
445    PV: 'static,
446    NC: 'static,
447{
448    type Item = (PieceIndex, anyhow::Result<Option<Piece>>);
449
450    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
451        self.get_mut()
452            .with_stream_mut(|stream| stream.poll_next_unpin(cx))
453    }
454}
455
456#[async_trait]
457impl<FarmIndex, PV, NC> PieceGetter for WeakFarmerPieceGetter<FarmIndex, PV, NC>
458where
459    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
460    usize: From<FarmIndex>,
461    PV: PieceValidator + Send + 'static,
462    NC: NodeClient,
463{
464    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
465        let Some(piece_getter) = self.upgrade() else {
466            debug!("Farmer piece getter upgrade didn't succeed");
467            return Ok(None);
468        };
469
470        piece_getter.get_piece(piece_index).await
471    }
472
473    async fn get_pieces<'a>(
474        &'a self,
475        piece_indices: Vec<PieceIndex>,
476    ) -> anyhow::Result<
477        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
478    > {
479        let Some(piece_getter) = self.upgrade() else {
480            debug!("Farmer piece getter upgrade didn't succeed");
481            return Ok(Box::new(stream::iter(
482                piece_indices
483                    .into_iter()
484                    .map(|piece_index| (piece_index, Ok(None))),
485            )));
486        };
487
488        // TODO: This is necessary due to more complex lifetimes not yet supported by ouroboros, see
489        //  https://github.com/someguynamedjosh/ouroboros/issues/112
490        let stream_with_piece_getter =
491            StreamWithPieceGetter::try_new_async_send(piece_getter, move |piece_getter| {
492                piece_getter.get_pieces(piece_indices)
493            })
494            .await?;
495
496        Ok(Box::new(stream_with_piece_getter))
497    }
498}
499
500impl<FarmIndex, PV, NC> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
501    /// Try to upgrade to [`FarmerPieceGetter`] if there is at least one other instance of it alive
502    pub fn upgrade(&self) -> Option<FarmerPieceGetter<FarmIndex, PV, NC>> {
503        Some(FarmerPieceGetter {
504            inner: self.inner.upgrade()?,
505        })
506    }
507}