1use crate::farm::plotted_pieces::PlottedPieces;
4use crate::farmer_cache::FarmerCaches;
5use crate::node_client::NodeClient;
6use async_lock::RwLock as AsyncRwLock;
7use async_trait::async_trait;
8use backoff::backoff::Backoff;
9use backoff::future::retry;
10use backoff::ExponentialBackoff;
11use futures::channel::mpsc;
12use futures::future::FusedFuture;
13use futures::stream::FuturesUnordered;
14use futures::{stream, FutureExt, Stream, StreamExt};
15use std::fmt;
16use std::hash::Hash;
17use std::pin::Pin;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use subspace_core_primitives::pieces::{Piece, PieceIndex};
22use subspace_data_retrieval::piece_getter::PieceGetter;
23use subspace_networking::utils::multihash::ToMultihash;
24use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
25use tracing::{debug, error, trace};
26
27pub mod piece_validator;
28
29const MAX_RANDOM_WALK_ROUNDS: usize = 15;
30
31#[derive(Debug)]
33pub struct DsnCacheRetryPolicy {
34 pub max_retries: u16,
36 pub backoff: ExponentialBackoff,
38}
39
40struct Inner<FarmIndex, PV, NC> {
41 piece_provider: PieceProvider<PV>,
42 farmer_caches: FarmerCaches,
43 node_client: NC,
44 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
45 dsn_cache_retry_policy: DsnCacheRetryPolicy,
46}
47
48pub struct FarmerPieceGetter<FarmIndex, PV, NC> {
52 inner: Arc<Inner<FarmIndex, PV, NC>>,
53}
54
55impl<FarmIndex, PV, NC> fmt::Debug for FarmerPieceGetter<FarmIndex, PV, NC> {
56 #[inline]
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 f.debug_struct("FarmerPieceGetter").finish_non_exhaustive()
59 }
60}
61
62impl<FarmIndex, PV, NC> Clone for FarmerPieceGetter<FarmIndex, PV, NC> {
63 #[inline]
64 fn clone(&self) -> Self {
65 Self {
66 inner: Arc::clone(&self.inner),
67 }
68 }
69}
70
71impl<FarmIndex, PV, NC> FarmerPieceGetter<FarmIndex, PV, NC>
72where
73 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
74 usize: From<FarmIndex>,
75 PV: PieceValidator + Send + 'static,
76 NC: NodeClient,
77{
78 pub fn new(
80 piece_provider: PieceProvider<PV>,
81 farmer_caches: FarmerCaches,
82 node_client: NC,
83 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
84 dsn_cache_retry_policy: DsnCacheRetryPolicy,
85 ) -> Self {
86 Self {
87 inner: Arc::new(Inner {
88 piece_provider,
89 farmer_caches,
90 node_client,
91 plotted_pieces,
92 dsn_cache_retry_policy,
93 }),
94 }
95 }
96
97 pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
99 self.get_piece_fast_internal(piece_index).await
100 }
101
102 async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
103 let inner = &self.inner;
104
105 trace!(%piece_index, "Getting piece from farmer cache");
106 if let Some(piece) = inner
107 .farmer_caches
108 .get_piece(piece_index.to_multihash())
109 .await
110 {
111 trace!(%piece_index, "Got piece from farmer cache successfully");
112 return Some(piece);
113 }
114
115 trace!(%piece_index, "Getting piece from DSN L2 cache");
117 if let Some(piece) = inner.piece_provider.get_piece_from_cache(piece_index).await {
118 trace!(%piece_index, "Got piece from DSN L2 cache");
119 inner
120 .farmer_caches
121 .maybe_store_additional_piece(piece_index, &piece)
122 .await;
123 return Some(piece);
124 }
125
126 trace!(%piece_index, "Getting piece from node");
128 match inner.node_client.piece(piece_index).await {
129 Ok(Some(piece)) => {
130 trace!(%piece_index, "Got piece from node successfully");
131 inner
132 .farmer_caches
133 .maybe_store_additional_piece(piece_index, &piece)
134 .await;
135 return Some(piece);
136 }
137 Ok(None) => {
138 }
140 Err(error) => {
141 error!(
142 %error,
143 %piece_index,
144 "Failed to retrieve first segment piece from node"
145 );
146 }
147 }
148
149 None
150 }
151
152 pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
154 self.get_piece_slow_internal(piece_index).await
155 }
156
157 async fn get_piece_slow_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
159 let inner = &self.inner;
160
161 trace!(%piece_index, "Getting piece from local plot");
162 let maybe_read_piece_fut = inner
163 .plotted_pieces
164 .try_read()
165 .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));
166
167 if let Some(read_piece_fut) = maybe_read_piece_fut {
168 if let Some(piece) = read_piece_fut.await {
169 trace!(%piece_index, "Got piece from local plot successfully");
170 inner
171 .farmer_caches
172 .maybe_store_additional_piece(piece_index, &piece)
173 .await;
174 return Some(piece);
175 }
176 }
177
178 trace!(%piece_index, "Getting piece from DSN L1.");
180
181 let archival_storage_search_result = inner
182 .piece_provider
183 .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
184 .await;
185
186 if let Some(piece) = archival_storage_search_result {
187 trace!(%piece_index, "DSN L1 lookup succeeded");
188 inner
189 .farmer_caches
190 .maybe_store_additional_piece(piece_index, &piece)
191 .await;
192 return Some(piece);
193 }
194
195 None
196 }
197
198 pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
201 WeakFarmerPieceGetter {
202 inner: Arc::downgrade(&self.inner),
203 }
204 }
205}
206
207#[async_trait]
208impl<FarmIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, PV, NC>
209where
210 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
211 usize: From<FarmIndex>,
212 PV: PieceValidator + Send + 'static,
213 NC: NodeClient,
214{
215 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
216 {
217 let retries = AtomicU32::new(0);
218 let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
219 let mut backoff = self.inner.dsn_cache_retry_policy.backoff.clone();
220 backoff.reset();
221
222 let maybe_piece_fut = retry(backoff, || async {
223 let current_attempt = retries.fetch_add(1, Ordering::Relaxed);
224
225 if let Some(piece) = self.get_piece_fast_internal(piece_index).await {
226 trace!(%piece_index, current_attempt, "Got piece fast");
227 return Ok(Some(piece));
228 }
229 if current_attempt >= max_retries {
230 if max_retries > 0 {
231 debug!(
232 %piece_index,
233 current_attempt,
234 max_retries,
235 "Couldn't get a piece fast. No retries left"
236 );
237 }
238 return Ok(None);
239 }
240
241 trace!(%piece_index, current_attempt, "Couldn't get a piece fast, retrying...");
242
243 Err(backoff::Error::transient("Couldn't get piece fast"))
244 });
245
246 if let Ok(Some(piece)) = maybe_piece_fut.await {
247 trace!(%piece_index, "Got piece from cache successfully");
248 return Ok(Some(piece));
249 }
250 };
251
252 if let Some(piece) = self.get_piece_slow_internal(piece_index).await {
253 return Ok(Some(piece));
254 }
255
256 debug!(
257 %piece_index,
258 "Cannot acquire piece: all methods yielded empty result"
259 );
260 Ok(None)
261 }
262
263 async fn get_pieces<'a>(
264 &'a self,
265 piece_indices: Vec<PieceIndex>,
266 ) -> anyhow::Result<
267 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
268 > {
269 let (tx, mut rx) = mpsc::unbounded();
270
271 let fut = async move {
272 let tx = &tx;
273
274 debug!("Getting pieces from farmer cache");
275 let mut pieces_not_found_in_farmer_cache = Vec::new();
276 let mut pieces_in_farmer_cache =
277 self.inner.farmer_caches.get_pieces(piece_indices).await;
278
279 while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await {
280 let Some(piece) = maybe_piece else {
281 pieces_not_found_in_farmer_cache.push(piece_index);
282 continue;
283 };
284 tx.unbounded_send((piece_index, Ok(Some(piece))))
285 .expect("This future isn't polled after receiver is dropped; qed");
286 }
287
288 if pieces_not_found_in_farmer_cache.is_empty() {
289 return;
290 }
291
292 debug!(
293 remaining_piece_count = %pieces_not_found_in_farmer_cache.len(),
294 "Getting pieces from DSN cache"
295 );
296 let mut pieces_not_found_in_dsn_cache = Vec::new();
297 let mut pieces_in_dsn_cache = self
298 .inner
299 .piece_provider
300 .get_from_cache(pieces_not_found_in_farmer_cache)
301 .await;
302
303 while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await {
304 let Some(piece) = maybe_piece else {
305 pieces_not_found_in_dsn_cache.push(piece_index);
306 continue;
307 };
308 self.inner
310 .farmer_caches
311 .maybe_store_additional_piece(piece_index, &piece)
312 .await;
313 tx.unbounded_send((piece_index, Ok(Some(piece))))
314 .expect("This future isn't polled after receiver is dropped; qed");
315 }
316
317 if pieces_not_found_in_dsn_cache.is_empty() {
318 return;
319 }
320
321 debug!(
322 remaining_piece_count = %pieces_not_found_in_dsn_cache.len(),
323 "Getting pieces from node"
324 );
325 let pieces_not_found_on_node = pieces_not_found_in_dsn_cache
326 .into_iter()
327 .map(|piece_index| async move {
328 match self.inner.node_client.piece(piece_index).await {
329 Ok(Some(piece)) => {
330 trace!(%piece_index, "Got piece from node successfully");
331 self.inner
332 .farmer_caches
333 .maybe_store_additional_piece(piece_index, &piece)
334 .await;
335
336 tx.unbounded_send((piece_index, Ok(Some(piece))))
337 .expect("This future isn't polled after receiver is dropped; qed");
338 None
339 }
340 Ok(None) => Some(piece_index),
341 Err(error) => {
342 error!(
343 %error,
344 %piece_index,
345 "Failed to retrieve first segment piece from node"
346 );
347 Some(piece_index)
348 }
349 }
350 })
351 .collect::<FuturesUnordered<_>>()
352 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
353 .collect::<Vec<_>>()
354 .await;
355
356 if pieces_not_found_on_node.is_empty() {
357 return;
358 }
359
360 debug!(
361 remaining_piece_count = %pieces_not_found_on_node.len(),
362 "Some pieces were not easily reachable"
363 );
364 pieces_not_found_on_node
365 .into_iter()
366 .map(|piece_index| async move {
367 let maybe_piece = self.get_piece_slow_internal(piece_index).await;
368
369 tx.unbounded_send((piece_index, Ok(maybe_piece)))
370 .expect("This future isn't polled after receiver is dropped; qed");
371 })
372 .collect::<FuturesUnordered<_>>()
373 .for_each(|()| async {})
375 .await;
376 };
377 let mut fut = Box::pin(fut.fuse());
378
379 Ok(Box::new(stream::poll_fn(move |cx| {
381 if !fut.is_terminated() {
382 let _ = fut.poll_unpin(cx);
384 }
385
386 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
387 return Poll::Ready(maybe_result);
388 }
389
390 Poll::Pending
392 })))
393 }
394}
395
396pub struct WeakFarmerPieceGetter<FarmIndex, PV, NC> {
398 inner: Weak<Inner<FarmIndex, PV, NC>>,
399}
400
401impl<FarmIndex, PV, NC> fmt::Debug for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
402 #[inline]
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 f.debug_struct("WeakFarmerPieceGetter")
405 .finish_non_exhaustive()
406 }
407}
408
409impl<FarmIndex, PV, NC> Clone for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
410 #[inline]
411 fn clone(&self) -> Self {
412 Self {
413 inner: self.inner.clone(),
414 }
415 }
416}
417
418#[ouroboros::self_referencing]
423struct StreamWithPieceGetter<FarmIndex, PV, NC>
424where
425 FarmIndex: 'static,
426 PV: 'static,
427 NC: 'static,
428{
429 piece_getter: FarmerPieceGetter<FarmIndex, PV, NC>,
430 #[borrows(piece_getter)]
431 #[covariant]
432 stream:
433 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'this>,
434}
435
436impl<FarmIndex, PV, NC> Stream for StreamWithPieceGetter<FarmIndex, PV, NC>
437where
438 FarmIndex: 'static,
439 PV: 'static,
440 NC: 'static,
441{
442 type Item = (PieceIndex, anyhow::Result<Option<Piece>>);
443
444 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
445 self.get_mut()
446 .with_stream_mut(|stream| stream.poll_next_unpin(cx))
447 }
448}
449
450#[async_trait]
451impl<FarmIndex, PV, NC> PieceGetter for WeakFarmerPieceGetter<FarmIndex, PV, NC>
452where
453 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
454 usize: From<FarmIndex>,
455 PV: PieceValidator + Send + 'static,
456 NC: NodeClient,
457{
458 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
459 let Some(piece_getter) = self.upgrade() else {
460 debug!("Farmer piece getter upgrade didn't succeed");
461 return Ok(None);
462 };
463
464 piece_getter.get_piece(piece_index).await
465 }
466
467 async fn get_pieces<'a>(
468 &'a self,
469 piece_indices: Vec<PieceIndex>,
470 ) -> anyhow::Result<
471 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
472 > {
473 let Some(piece_getter) = self.upgrade() else {
474 debug!("Farmer piece getter upgrade didn't succeed");
475 return Ok(Box::new(stream::iter(
476 piece_indices
477 .into_iter()
478 .map(|piece_index| (piece_index, Ok(None))),
479 )));
480 };
481
482 let stream_with_piece_getter =
485 StreamWithPieceGetter::try_new_async_send(piece_getter, move |piece_getter| {
486 piece_getter.get_pieces(piece_indices)
487 })
488 .await?;
489
490 Ok(Box::new(stream_with_piece_getter))
491 }
492}
493
494impl<FarmIndex, PV, NC> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
495 pub fn upgrade(&self) -> Option<FarmerPieceGetter<FarmIndex, PV, NC>> {
497 Some(FarmerPieceGetter {
498 inner: self.inner.upgrade()?,
499 })
500 }
501}