1use crate::farm::plotted_pieces::PlottedPieces;
4use crate::farmer_cache::FarmerCaches;
5use crate::node_client::NodeClient;
6use async_lock::RwLock as AsyncRwLock;
7use async_trait::async_trait;
8use backoff::ExponentialBackoff;
9use backoff::backoff::Backoff;
10use backoff::future::retry;
11use futures::channel::mpsc;
12use futures::future::FusedFuture;
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, Stream, StreamExt, stream};
15use std::fmt;
16use std::hash::Hash;
17use std::pin::Pin;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use subspace_core_primitives::pieces::{Piece, PieceIndex};
22use subspace_data_retrieval::piece_getter::PieceGetter;
23use subspace_networking::utils::multihash::ToMultihash;
24use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
25use tracing::{debug, error, trace};
26
27pub mod piece_validator;
28
29const MAX_RANDOM_WALK_ROUNDS: usize = 15;
30
31#[derive(Debug)]
33pub struct DsnCacheRetryPolicy {
34 pub max_retries: u16,
36 pub backoff: ExponentialBackoff,
38}
39
40struct Inner<FarmIndex, PV, NC> {
41 piece_provider: PieceProvider<PV>,
42 farmer_caches: FarmerCaches,
43 node_client: NC,
44 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
45 dsn_cache_retry_policy: DsnCacheRetryPolicy,
46}
47
48pub struct FarmerPieceGetter<FarmIndex, PV, NC> {
52 inner: Arc<Inner<FarmIndex, PV, NC>>,
53}
54
55impl<FarmIndex, PV, NC> fmt::Debug for FarmerPieceGetter<FarmIndex, PV, NC> {
56 #[inline]
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 f.debug_struct("FarmerPieceGetter").finish_non_exhaustive()
59 }
60}
61
62impl<FarmIndex, PV, NC> Clone for FarmerPieceGetter<FarmIndex, PV, NC> {
63 #[inline]
64 fn clone(&self) -> Self {
65 Self {
66 inner: Arc::clone(&self.inner),
67 }
68 }
69}
70
71impl<FarmIndex, PV, NC> FarmerPieceGetter<FarmIndex, PV, NC>
72where
73 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
74 usize: From<FarmIndex>,
75 PV: PieceValidator + Send + 'static,
76 NC: NodeClient,
77{
78 pub fn new(
80 piece_provider: PieceProvider<PV>,
81 farmer_caches: FarmerCaches,
82 node_client: NC,
83 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
84 dsn_cache_retry_policy: DsnCacheRetryPolicy,
85 ) -> Self {
86 Self {
87 inner: Arc::new(Inner {
88 piece_provider,
89 farmer_caches,
90 node_client,
91 plotted_pieces,
92 dsn_cache_retry_policy,
93 }),
94 }
95 }
96
97 pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
99 self.get_piece_fast_internal(piece_index).await
100 }
101
102 async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
103 let inner = &self.inner;
104
105 trace!(%piece_index, "Getting piece from farmer cache");
106 if let Some(piece) = inner
107 .farmer_caches
108 .get_piece(piece_index.to_multihash())
109 .await
110 {
111 trace!(%piece_index, "Got piece from farmer cache successfully");
112 return Some(piece);
113 }
114
115 trace!(%piece_index, "Getting piece from DSN L2 cache");
117 if let Some(piece) = inner.piece_provider.get_piece_from_cache(piece_index).await {
118 let added_to_cache = inner
119 .farmer_caches
120 .maybe_store_additional_piece(piece_index, &piece)
121 .await;
122 trace!(%piece_index, %added_to_cache, "Got piece from DSN L2 cache");
123 return Some(piece);
124 }
125
126 trace!(%piece_index, "Getting piece from node");
128 match inner.node_client.piece(piece_index).await {
129 Ok(Some(piece)) => {
130 let added_to_cache = inner
131 .farmer_caches
132 .maybe_store_additional_piece(piece_index, &piece)
133 .await;
134 trace!(%piece_index, %added_to_cache, "Got piece from node successfully");
135 return Some(piece);
136 }
137 Ok(None) => {
138 }
140 Err(error) => {
141 error!(
142 %error,
143 %piece_index,
144 "Failed to retrieve first segment piece from node"
145 );
146 }
147 }
148
149 None
150 }
151
152 pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
154 self.get_piece_slow_internal(piece_index).await
155 }
156
157 async fn get_piece_slow_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
159 let inner = &self.inner;
160
161 trace!(%piece_index, "Getting piece from local plot");
162 let maybe_read_piece_fut = inner
163 .plotted_pieces
164 .try_read()
165 .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));
166
167 if let Some(read_piece_fut) = maybe_read_piece_fut
168 && let Some(piece) = read_piece_fut.await
169 {
170 let added_to_cache = inner
171 .farmer_caches
172 .maybe_store_additional_piece(piece_index, &piece)
173 .await;
174 trace!(%piece_index, %added_to_cache, "Got piece from local plot successfully");
175 return Some(piece);
176 }
177
178 trace!(%piece_index, "Getting piece from DSN L1.");
180
181 let archival_storage_search_result = inner
182 .piece_provider
183 .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
184 .await;
185
186 if let Some(piece) = archival_storage_search_result {
187 let added_to_cache = inner
188 .farmer_caches
189 .maybe_store_additional_piece(piece_index, &piece)
190 .await;
191 trace!(%piece_index, %added_to_cache, "DSN L1 lookup succeeded");
192 return Some(piece);
193 }
194
195 None
196 }
197
198 pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
201 WeakFarmerPieceGetter {
202 inner: Arc::downgrade(&self.inner),
203 }
204 }
205}
206
207#[async_trait]
208impl<FarmIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, PV, NC>
209where
210 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
211 usize: From<FarmIndex>,
212 PV: PieceValidator + Send + 'static,
213 NC: NodeClient,
214{
215 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
216 {
217 let retries = AtomicU32::new(0);
218 let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
219 let mut backoff = self.inner.dsn_cache_retry_policy.backoff.clone();
220 backoff.reset();
221
222 let maybe_piece_fut = retry(backoff, || async {
223 let current_attempt = retries.fetch_add(1, Ordering::Relaxed);
224
225 if let Some(piece) = self.get_piece_fast_internal(piece_index).await {
226 trace!(%piece_index, current_attempt, "Got piece fast");
227 return Ok(Some(piece));
228 }
229 if current_attempt >= max_retries {
230 if max_retries > 0 {
231 debug!(
232 %piece_index,
233 current_attempt,
234 max_retries,
235 "Couldn't get a piece fast. No retries left"
236 );
237 }
238 return Ok(None);
239 }
240
241 trace!(%piece_index, current_attempt, "Couldn't get a piece fast, retrying...");
242
243 Err(backoff::Error::transient("Couldn't get piece fast"))
244 });
245
246 if let Ok(Some(piece)) = maybe_piece_fut.await {
247 trace!(%piece_index, "Got piece from cache successfully");
248 return Ok(Some(piece));
249 }
250 };
251
252 if let Some(piece) = self.get_piece_slow_internal(piece_index).await {
253 return Ok(Some(piece));
254 }
255
256 debug!(
257 %piece_index,
258 "Cannot acquire piece: all methods yielded empty result"
259 );
260 Ok(None)
261 }
262
263 async fn get_pieces<'a>(
264 &'a self,
265 piece_indices: Vec<PieceIndex>,
266 ) -> anyhow::Result<
267 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
268 > {
269 let (tx, mut rx) = mpsc::unbounded();
270
271 let fut = async move {
272 let tx = &tx;
273
274 let piece_count = piece_indices.len();
275 debug!(%piece_count, "Getting pieces from farmer cache");
276 let mut pieces_not_found_in_farmer_cache = Vec::new();
277 let mut pieces_in_farmer_cache =
278 self.inner.farmer_caches.get_pieces(piece_indices).await;
279
280 while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await {
281 let Some(piece) = maybe_piece else {
282 pieces_not_found_in_farmer_cache.push(piece_index);
283 continue;
284 };
285 tx.unbounded_send((piece_index, Ok(Some(piece))))
286 .expect("This future isn't polled after receiver is dropped; qed");
287 }
288
289 if pieces_not_found_in_farmer_cache.is_empty() {
290 return;
291 }
292
293 debug!(
294 remaining_piece_count = %pieces_not_found_in_farmer_cache.len(),
295 %piece_count,
296 "Getting pieces from DSN cache",
297 );
298 let mut pieces_not_found_in_dsn_cache = Vec::new();
299 let mut pieces_in_dsn_cache = self
300 .inner
301 .piece_provider
302 .get_from_cache(pieces_not_found_in_farmer_cache)
303 .await;
304
305 while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await {
306 let Some(piece) = maybe_piece else {
307 pieces_not_found_in_dsn_cache.push(piece_index);
308 continue;
309 };
310 let added_to_cache = self
312 .inner
313 .farmer_caches
314 .maybe_store_additional_piece(piece_index, &piece)
315 .await;
316 trace!(%piece_index, %added_to_cache, "Got piece from DSN cache successfully");
317 tx.unbounded_send((piece_index, Ok(Some(piece))))
318 .expect("This future isn't polled after receiver is dropped; qed");
319 }
320
321 if pieces_not_found_in_dsn_cache.is_empty() {
322 return;
323 }
324
325 debug!(
326 remaining_piece_count = %pieces_not_found_in_dsn_cache.len(),
327 %piece_count,
328 "Getting pieces from node",
329 );
330 let pieces_not_found_on_node = pieces_not_found_in_dsn_cache
331 .into_iter()
332 .map(|piece_index| async move {
333 match self.inner.node_client.piece(piece_index).await {
334 Ok(Some(piece)) => {
335 let added_to_cache = self.inner
336 .farmer_caches
337 .maybe_store_additional_piece(piece_index, &piece)
338 .await;
339 trace!(%piece_index, %added_to_cache, "Got piece from node successfully");
340
341 tx.unbounded_send((piece_index, Ok(Some(piece))))
342 .expect("This future isn't polled after receiver is dropped; qed");
343 None
344 }
345 Ok(None) => Some(piece_index),
346 Err(error) => {
347 error!(
348 %error,
349 %piece_index,
350 "Failed to retrieve first segment piece from node"
351 );
352 Some(piece_index)
353 }
354 }
355 })
356 .collect::<FuturesUnordered<_>>()
357 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
358 .collect::<Vec<_>>()
359 .await;
360
361 if pieces_not_found_on_node.is_empty() {
362 return;
363 }
364
365 debug!(
366 remaining_piece_count = %pieces_not_found_on_node.len(),
367 %piece_count,
368 "Some pieces were not easily reachable",
369 );
370 pieces_not_found_on_node
371 .into_iter()
372 .map(|piece_index| async move {
373 let maybe_piece = self.get_piece_slow_internal(piece_index).await;
374
375 tx.unbounded_send((piece_index, Ok(maybe_piece)))
376 .expect("This future isn't polled after receiver is dropped; qed");
377 })
378 .collect::<FuturesUnordered<_>>()
379 .for_each(|()| async {})
381 .await;
382 };
383 let mut fut = Box::pin(fut.fuse());
384
385 Ok(Box::new(stream::poll_fn(move |cx| {
387 if !fut.is_terminated() {
388 let _ = fut.poll_unpin(cx);
390 }
391
392 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
393 return Poll::Ready(maybe_result);
394 }
395
396 Poll::Pending
398 })))
399 }
400}
401
402pub struct WeakFarmerPieceGetter<FarmIndex, PV, NC> {
404 inner: Weak<Inner<FarmIndex, PV, NC>>,
405}
406
407impl<FarmIndex, PV, NC> fmt::Debug for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
408 #[inline]
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 f.debug_struct("WeakFarmerPieceGetter")
411 .finish_non_exhaustive()
412 }
413}
414
415impl<FarmIndex, PV, NC> Clone for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
416 #[inline]
417 fn clone(&self) -> Self {
418 Self {
419 inner: self.inner.clone(),
420 }
421 }
422}
423
424#[ouroboros::self_referencing]
429struct StreamWithPieceGetter<FarmIndex, PV, NC>
430where
431 FarmIndex: 'static,
432 PV: 'static,
433 NC: 'static,
434{
435 piece_getter: FarmerPieceGetter<FarmIndex, PV, NC>,
436 #[borrows(piece_getter)]
437 #[covariant]
438 stream:
439 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'this>,
440}
441
442impl<FarmIndex, PV, NC> Stream for StreamWithPieceGetter<FarmIndex, PV, NC>
443where
444 FarmIndex: 'static,
445 PV: 'static,
446 NC: 'static,
447{
448 type Item = (PieceIndex, anyhow::Result<Option<Piece>>);
449
450 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
451 self.get_mut()
452 .with_stream_mut(|stream| stream.poll_next_unpin(cx))
453 }
454}
455
456#[async_trait]
457impl<FarmIndex, PV, NC> PieceGetter for WeakFarmerPieceGetter<FarmIndex, PV, NC>
458where
459 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
460 usize: From<FarmIndex>,
461 PV: PieceValidator + Send + 'static,
462 NC: NodeClient,
463{
464 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
465 let Some(piece_getter) = self.upgrade() else {
466 debug!("Farmer piece getter upgrade didn't succeed");
467 return Ok(None);
468 };
469
470 piece_getter.get_piece(piece_index).await
471 }
472
473 async fn get_pieces<'a>(
474 &'a self,
475 piece_indices: Vec<PieceIndex>,
476 ) -> anyhow::Result<
477 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
478 > {
479 let Some(piece_getter) = self.upgrade() else {
480 debug!("Farmer piece getter upgrade didn't succeed");
481 return Ok(Box::new(stream::iter(
482 piece_indices
483 .into_iter()
484 .map(|piece_index| (piece_index, Ok(None))),
485 )));
486 };
487
488 let stream_with_piece_getter =
491 StreamWithPieceGetter::try_new_async_send(piece_getter, move |piece_getter| {
492 piece_getter.get_pieces(piece_indices)
493 })
494 .await?;
495
496 Ok(Box::new(stream_with_piece_getter))
497 }
498}
499
500impl<FarmIndex, PV, NC> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
501 pub fn upgrade(&self) -> Option<FarmerPieceGetter<FarmIndex, PV, NC>> {
503 Some(FarmerPieceGetter {
504 inner: self.inner.upgrade()?,
505 })
506 }
507}