subspace_farmer/
farmer_cache.rs

1//! A container that caches pieces
2//!
3//! Farmer cache is a container that orchestrates a bunch of piece and plot caches that together
4//! persist pieces in a way that is easy to retrieve comparing to decoding pieces from plots.
5
6mod metrics;
7mod piece_cache_state;
8#[cfg(test)]
9mod tests;
10
11use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
12use crate::farmer_cache::metrics::FarmerCacheMetrics;
13use crate::farmer_cache::piece_cache_state::PieceCachesState;
14use crate::node_client::NodeClient;
15use async_lock::RwLock as AsyncRwLock;
16use event_listener_primitives::{Bag, HandlerId};
17use futures::channel::mpsc;
18use futures::future::{Either, FusedFuture};
19use futures::stream::{FuturesOrdered, FuturesUnordered};
20use futures::{FutureExt, SinkExt, Stream, StreamExt, select, stream};
21use parking_lot::{Mutex, RwLock};
22use prometheus_client::registry::Registry;
23use rand::prelude::*;
24use rayon::prelude::*;
25use std::collections::hash_map::Entry;
26use std::collections::{HashMap, HashSet};
27use std::future::join;
28use std::sync::Arc;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::task::Poll;
31use std::time::Duration;
32use std::{fmt, mem};
33use subspace_core_primitives::pieces::{Piece, PieceIndex};
34use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
35use subspace_data_retrieval::piece_getter::PieceGetter;
36use subspace_networking::KeyWithDistance;
37use subspace_networking::libp2p::PeerId;
38use subspace_networking::libp2p::kad::RecordKey;
39use subspace_networking::utils::multihash::ToMultihash;
40use subspace_process::run_future_in_dedicated_thread;
41use tokio::sync::Semaphore;
42use tokio::task::yield_now;
43use tracing::{Instrument, debug, error, info, info_span, trace, warn};
44
45const WORKER_CHANNEL_CAPACITY: usize = 100;
46const SYNC_BATCH_SIZE: usize = 256;
47const SYNC_CONCURRENT_BATCHES: usize = 4;
48/// Make caches available as they are building without waiting for the initialization to finish,
49/// this number defines an interval in pieces after which cache is updated
50const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
51const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
52
53type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
54type Handler<A> = Bag<HandlerFn<A>, A>;
55type CacheIndex = u8;
56
57#[derive(Default, Debug)]
58struct Handlers {
59    progress: Handler<f32>,
60}
61
62#[derive(Debug, Clone, Copy)]
63struct FarmerCacheOffset {
64    cache_index: CacheIndex,
65    piece_offset: PieceCacheOffset,
66}
67
68impl FarmerCacheOffset {
69    fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
70        Self {
71            cache_index,
72            piece_offset,
73        }
74    }
75}
76
77#[derive(Debug, Clone)]
78struct CacheBackend {
79    backend: Arc<dyn PieceCache>,
80    used_capacity: u32,
81    total_capacity: u32,
82}
83
84impl std::ops::Deref for CacheBackend {
85    type Target = Arc<dyn PieceCache>;
86
87    fn deref(&self) -> &Self::Target {
88        &self.backend
89    }
90}
91
92impl CacheBackend {
93    fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
94        Self {
95            backend,
96            used_capacity: 0,
97            total_capacity,
98        }
99    }
100
101    fn next_free(&mut self) -> Option<PieceCacheOffset> {
102        let offset = self.used_capacity;
103        if offset < self.total_capacity {
104            self.used_capacity += 1;
105            Some(PieceCacheOffset(offset))
106        } else {
107            debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
108            None
109        }
110    }
111
112    fn free_size(&self) -> u32 {
113        self.total_capacity - self.used_capacity
114    }
115}
116
117#[derive(Debug)]
118struct CacheState {
119    cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
120    cache_free_offsets: Vec<FarmerCacheOffset>,
121    backend: CacheBackend,
122}
123
124#[derive(Debug)]
125enum WorkerCommand {
126    ReplaceBackingCaches {
127        new_piece_caches: Vec<Arc<dyn PieceCache>>,
128    },
129    ForgetKey {
130        key: RecordKey,
131    },
132}
133
134/// Farmer cache worker used to drive the farmer cache backend
135#[derive(Debug)]
136#[must_use = "Farmer cache will not work unless its worker is running"]
137pub struct FarmerCacheWorker<NC>
138where
139    NC: fmt::Debug,
140{
141    peer_id: PeerId,
142    node_client: NC,
143    piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
144    plot_caches: Arc<PlotCaches>,
145    handlers: Arc<Handlers>,
146    worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
147    metrics: Option<Arc<FarmerCacheMetrics>>,
148}
149
150impl<NC> FarmerCacheWorker<NC>
151where
152    NC: NodeClient,
153{
154    /// Run the cache worker with provided piece getter.
155    ///
156    /// NOTE: Piece getter must not depend on farmer cache in order to avoid reference cycles!
157    pub async fn run<PG>(mut self, piece_getter: PG)
158    where
159        PG: PieceGetter,
160    {
161        // Limit is dynamically set later
162        let mut last_segment_index_internal = SegmentIndex::ZERO;
163
164        let mut worker_receiver = self
165            .worker_receiver
166            .take()
167            .expect("Always set during worker instantiation");
168
169        if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
170            worker_receiver.next().await
171        {
172            self.initialize(
173                &piece_getter,
174                &mut last_segment_index_internal,
175                new_piece_caches,
176            )
177            .await;
178        } else {
179            // Piece cache is dropped before backing caches were sent
180            return;
181        }
182
183        let mut segment_headers_notifications =
184            match self.node_client.subscribe_archived_segment_headers().await {
185                Ok(segment_headers_notifications) => segment_headers_notifications,
186                Err(error) => {
187                    error!(%error, "Failed to subscribe to archived segments notifications");
188                    return;
189                }
190            };
191
192        // Keep up with segment indices that were potentially created since reinitialization,
193        // depending on the size of the diff this may pause block production for a while (due to
194        // subscription we have created above)
195        self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
196            .await;
197
198        loop {
199            select! {
200                maybe_command = worker_receiver.next() => {
201                    let Some(command) = maybe_command else {
202                        // Nothing else left to do
203                        return;
204                    };
205
206                    self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
207                }
208                maybe_segment_header = segment_headers_notifications.next().fuse() => {
209                    if let Some(segment_header) = maybe_segment_header {
210                        self.process_segment_header(&piece_getter, segment_header, &mut last_segment_index_internal).await;
211                    } else {
212                        // Keep-up sync only ends with subscription, which lasts for duration of an
213                        // instance
214                        return;
215                    }
216                }
217            }
218        }
219    }
220
221    async fn handle_command<PG>(
222        &self,
223        command: WorkerCommand,
224        piece_getter: &PG,
225        last_segment_index_internal: &mut SegmentIndex,
226    ) where
227        PG: PieceGetter,
228    {
229        match command {
230            WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
231                self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
232                    .await;
233            }
234            // TODO: Consider implementing optional re-sync of the piece instead of just forgetting
235            WorkerCommand::ForgetKey { key } => {
236                let mut caches = self.piece_caches.write().await;
237                let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
238                let Some(offset) = caches.remove_stored_piece(&key) else {
239                    // Key not exist
240                    return;
241                };
242
243                let cache_index = offset.cache_index;
244                let piece_offset = offset.piece_offset;
245                let Some(backend) = caches.get_backend(cache_index).cloned() else {
246                    // Cache backend not exist
247                    return;
248                };
249
250                caches.push_dangling_free_offset(offset);
251                match backend.read_piece_index(piece_offset).await {
252                    Ok(Some(piece_index)) => {
253                        trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
254                    }
255                    Ok(None) => {
256                        warn!(
257                            %cache_index,
258                            %piece_offset,
259                            "Piece index out of range, this is likely an implementation bug, \
260                            not freeing heap element"
261                        );
262                    }
263                    Err(error) => {
264                        error!(
265                            %error,
266                            %cache_index,
267                            ?key,
268                            %piece_offset,
269                            "Error while reading piece from cache"
270                        );
271                    }
272                }
273            }
274        }
275    }
276
277    async fn initialize<PG>(
278        &self,
279        piece_getter: &PG,
280        last_segment_index_internal: &mut SegmentIndex,
281        new_piece_caches: Vec<Arc<dyn PieceCache>>,
282    ) where
283        PG: PieceGetter,
284    {
285        info!("Initializing piece cache");
286
287        // Pull old cache state since it will be replaced with a new one and reuse its allocations
288        let (mut stored_pieces, mut dangling_free_offsets) =
289            mem::take(&mut *self.piece_caches.write().await).reuse();
290
291        debug!("Collecting pieces that were in the cache before");
292
293        if let Some(metrics) = &self.metrics {
294            metrics.piece_cache_capacity_total.set(0);
295            metrics.piece_cache_capacity_used.set(0);
296        }
297
298        let peer_id = self.peer_id;
299
300        // Build cache state of all backends
301        let piece_caches_number = new_piece_caches.len();
302        let maybe_caches_futures = new_piece_caches
303            .into_iter()
304            .enumerate()
305            .filter_map(|(cache_index, new_cache)| {
306                let total_capacity = new_cache.max_num_elements();
307                let mut backend = CacheBackend::new(new_cache, total_capacity);
308                let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
309                    warn!(
310                        ?piece_caches_number,
311                        "Too many piece caches provided, {cache_index} cache will be ignored",
312                    );
313                    return None;
314                };
315
316                if let Some(metrics) = &self.metrics {
317                    metrics
318                        .piece_cache_capacity_total
319                        .inc_by(total_capacity as i64);
320                }
321
322                let init_fut = async move {
323                    let used_capacity = &mut backend.used_capacity;
324
325                    // Hack with first collecting into `Option` with `Option::take()` call
326                    // later is to satisfy compiler that gets confused about ownership
327                    // otherwise
328                    let mut maybe_contents = match backend.backend.contents().await {
329                        Ok(contents) => Some(contents),
330                        Err(error) => {
331                            warn!(%error, "Failed to get cache contents");
332
333                            None
334                        }
335                    };
336
337                    #[allow(clippy::mutable_key_type)]
338                    let mut cache_stored_pieces = HashMap::new();
339                    let mut cache_free_offsets = Vec::new();
340
341                    let Some(mut contents) = maybe_contents.take() else {
342                        drop(maybe_contents);
343
344                        return CacheState {
345                            cache_stored_pieces,
346                            cache_free_offsets,
347                            backend,
348                        };
349                    };
350
351                    while let Some(maybe_element_details) = contents.next().await {
352                        let (piece_offset, maybe_piece_index) = match maybe_element_details {
353                            Ok(element_details) => element_details,
354                            Err(error) => {
355                                warn!(%error, "Failed to get cache contents element details");
356                                break;
357                            }
358                        };
359                        let offset = FarmerCacheOffset::new(cache_index, piece_offset);
360                        match maybe_piece_index {
361                            Some(piece_index) => {
362                                *used_capacity = piece_offset.0 + 1;
363                                let record_key = RecordKey::from(piece_index.to_multihash());
364                                let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
365                                cache_stored_pieces.insert(key, offset);
366                            }
367                            None => {
368                                // TODO: Optimize to not store all free offsets, only dangling
369                                //  offsets are actually necessary
370                                cache_free_offsets.push(offset);
371                            }
372                        }
373
374                        // Allow for task to be aborted
375                        yield_now().await;
376                    }
377
378                    drop(maybe_contents);
379                    drop(contents);
380
381                    CacheState {
382                        cache_stored_pieces,
383                        cache_free_offsets,
384                        backend,
385                    }
386                };
387
388                Some(run_future_in_dedicated_thread(
389                    move || init_fut.instrument(info_span!("", %cache_index)),
390                    format!("piece-cache.{cache_index:02}"),
391                ))
392            })
393            .collect::<Result<Vec<_>, _>>();
394
395        let caches_futures = match maybe_caches_futures {
396            Ok(caches_futures) => caches_futures,
397            Err(error) => {
398                error!(%error, "Failed to spawn piece cache reading thread");
399
400                return;
401            }
402        };
403
404        let mut backends = Vec::with_capacity(caches_futures.len());
405        let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
406
407        while let Some(maybe_cache) = caches_futures.next().await {
408            match maybe_cache {
409                Ok(cache) => {
410                    let backend = cache.backend;
411                    for (key, cache_offset) in cache.cache_stored_pieces {
412                        if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
413                            dangling_free_offsets.push_front(old_cache_offset);
414                        }
415                    }
416                    dangling_free_offsets.extend(
417                        cache.cache_free_offsets.into_iter().filter(|free_offset| {
418                            free_offset.piece_offset.0 < backend.used_capacity
419                        }),
420                    );
421                    backends.push(backend);
422                }
423                Err(_cancelled) => {
424                    error!("Piece cache reading thread panicked");
425
426                    return;
427                }
428            };
429        }
430
431        let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
432
433        info!("Synchronizing piece cache");
434
435        let last_segment_index = loop {
436            match self.node_client.farmer_app_info().await {
437                Ok(farmer_app_info) => {
438                    let last_segment_index =
439                        farmer_app_info.protocol_info.history_size.segment_index();
440                    // Wait for node to be either fully synced or to be aware of non-zero segment
441                    // index, which would indicate it has started DSN sync and knows about
442                    // up-to-date archived history.
443                    //
444                    // While this doesn't account for situations where node was offline for a long
445                    // time and is aware of old segment headers, this is good enough for piece cache
446                    // sync to proceed and should result in better user experience on average.
447                    if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
448                        break last_segment_index;
449                    }
450                }
451                Err(error) => {
452                    error!(
453                        %error,
454                        "Failed to get farmer app info from node, keeping old cache state without \
455                        updates"
456                    );
457
458                    // Not the latest, but at least something
459                    *self.piece_caches.write().await = caches;
460                    return;
461                }
462            }
463
464            tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
465        };
466
467        debug!(%last_segment_index, "Identified last segment index");
468
469        // Collect all the piece indices that need to be stored, we will sort them later
470        let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
471        // TODO: This may eventually be too much to store in memory, though right now it is a
472        //  non-issue in practice
473        let mut piece_indices_to_store = segment_indices
474            .into_par_iter()
475            .flat_map(|segment_index| {
476                segment_index
477                    .segment_piece_indexes()
478                    .into_par_iter()
479                    .map(|piece_index| {
480                        (
481                            KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
482                            piece_index,
483                        )
484                    })
485            })
486            .collect::<Vec<_>>();
487
488        // Sort pieces by distance from peer to piece such that they are in ascending order
489        // and have higher chance of download
490        piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
491
492        // `HashMap` is faster than `BTreeMap`
493        let mut piece_indices_to_store = piece_indices_to_store
494            .into_iter()
495            .take(caches.total_capacity())
496            .collect::<HashMap<_, _>>();
497
498        let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
499        // Filter-out piece indices that are stored, but should not be as well as clean
500        // `inserted_piece_indices` from already stored piece indices, leaving just those that are
501        // still missing in cache
502        caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
503
504        if let Some(metrics) = &self.metrics {
505            for offset in caches.stored_pieces_offsets() {
506                piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
507            }
508
509            for cache_used in piece_caches_capacity_used {
510                metrics
511                    .piece_cache_capacity_used
512                    .inc_by(i64::from(cache_used));
513            }
514        }
515
516        // Store whatever correct pieces are immediately available after restart
517        self.piece_caches.write().await.clone_from(&caches);
518        let stored_count = caches.stored_pieces_offsets().len();
519
520        debug!(
521            %stored_count,
522            count = %piece_indices_to_store.len(),
523            "Identified piece indices that should be cached",
524        );
525
526        let pieces_to_download_total = piece_indices_to_store.len() + stored_count;
527        let piece_indices_to_store = piece_indices_to_store
528            .into_values()
529            .collect::<Vec<_>>()
530            // TODO: Allocating chunks here shouldn't be necessary, but otherwise it fails with
531            //  confusing error described in https://github.com/rust-lang/rust/issues/64552 and
532            //  similar upstream issues
533            .chunks(SYNC_BATCH_SIZE)
534            .map(|chunk| chunk.to_vec())
535            .collect::<Vec<_>>();
536
537        let downloaded_pieces_count = AtomicUsize::new(stored_count);
538        let caches = Mutex::new(caches);
539        self.handlers.progress.call_simple(&0.0);
540        let batch_count = piece_indices_to_store.len();
541        let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
542
543        let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
544        let ignored_cache_indices = &RwLock::new(HashSet::new());
545
546        let downloading_pieces_stream =
547            stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
548                let downloaded_pieces_count = &downloaded_pieces_count;
549                let caches = &caches;
550                let num_pieces = piece_indices.len();
551
552                trace!(
553                    %num_pieces,
554                    %batch,
555                    %batch_count,
556                    first_piece_index = ?piece_indices.first().expect("chunks are never empty"),
557                    last_piece_index = ?piece_indices.last().expect("chunks are never empty"),
558                    downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
559                    %pieces_to_download_total,
560                    available_permits = %downloading_semaphore.available_permits(),
561                    "Started piece cache sync batch",
562                );
563
564                async move {
565                    let mut permit = downloading_semaphore
566                        .acquire_many(SYNC_BATCH_SIZE as u32)
567                        .await
568                        .expect("Semaphore is never closed; qed");
569                    debug!(%batch, %num_pieces, "Downloading pieces");
570
571                    let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
572                        Ok(pieces_stream) => pieces_stream,
573                        Err(error) => {
574                            error!(
575                                %error,
576                                "Failed to get pieces from piece getter"
577                            );
578                            return;
579                        }
580                    };
581                    let mut pieces_stream = pieces_stream.enumerate();
582
583                    while let Some((index, (piece_index, result))) = pieces_stream.next().await {
584                        debug!(%batch, %index, %piece_index, "Downloaded piece");
585                        // Release slot for future batches, by dropping it along with the piece.
586                        let _permit = permit.split(1);
587
588                        let piece = match result {
589                            Ok(Some(piece)) => {
590                                trace!(%batch, %piece_index, "Downloaded piece successfully");
591                                piece
592                            }
593                            Ok(None) => {
594                                debug!(%batch, %piece_index, "Couldn't find piece");
595                                continue;
596                            }
597                            Err(error) => {
598                                debug!(
599                                    %batch,
600                                    %error,
601                                    %piece_index,
602                                    "Failed to get piece for piece cache"
603                                );
604                                continue;
605                            }
606                        };
607
608                        let (offset, maybe_backend) = {
609                            let mut caches = caches.lock();
610
611                            // Find plot in which there is a place for new piece to be stored
612                            let Some(offset) = caches.pop_free_offset() else {
613                                error!(
614                                    %batch,
615                                    %piece_index,
616                                    "Failed to store piece in cache, there was no space"
617                                );
618                                break;
619                            };
620
621                            (offset, caches.get_backend(offset.cache_index).cloned())
622                        };
623
624                        let cache_index = offset.cache_index;
625                        let piece_offset = offset.piece_offset;
626
627                        let skip_write = ignored_cache_indices.read().contains(&cache_index);
628                        if skip_write {
629                            trace!(
630                                %batch,
631                                %cache_index,
632                                %piece_index,
633                                %piece_offset,
634                                "Skipping known problematic cache index"
635                            );
636                        } else {
637                            if let Some(backend) = maybe_backend
638                                && let Err(error) =
639                                    backend.write_piece(piece_offset, piece_index, &piece).await
640                            {
641                                error!(
642                                    %error,
643                                    %batch,
644                                    %cache_index,
645                                    %piece_index,
646                                    %piece_offset,
647                                    "Failed to write piece into cache, ignoring this cache going \
648                                    forward"
649                                );
650                                ignored_cache_indices.write().insert(cache_index);
651                                continue;
652                            }
653
654                            let key =
655                                KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
656                            caches.lock().push_stored_piece(key, offset);
657                        }
658
659                        let prev_downloaded_pieces_count =
660                            downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
661                        // Do not print anything or send progress notification after last piece
662                        // until piece cache is written fully below
663                        if prev_downloaded_pieces_count != pieces_to_download_total {
664                            let progress = prev_downloaded_pieces_count as f32
665                                / pieces_to_download_total as f32
666                                * 100.0;
667                            if prev_downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL
668                                == 0
669                            {
670                                let mut piece_caches = self.piece_caches.write().await;
671                                piece_caches.clone_from(&caches.lock());
672
673                                info!(
674                                    "Piece cache sync {progress:.2}% complete ({} / {})",
675                                    bytesize::to_string(
676                                        (prev_downloaded_pieces_count * Piece::SIZE) as u64,
677                                        true,
678                                    ),
679                                    bytesize::to_string(
680                                        (pieces_to_download_total * Piece::SIZE) as u64,
681                                        true,
682                                    ),
683                                );
684                            }
685
686                            self.handlers.progress.call_simple(&progress);
687                        }
688                    }
689
690                    trace!(
691                        %num_pieces,
692                        %batch,
693                        %batch_count,
694                        downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
695                        %pieces_to_download_total,
696                        available_permits = %downloading_semaphore.available_permits(),
697                        "Finished piece cache sync batch",
698                    );
699                }
700            }));
701
702        // Download several batches concurrently to make sure slow tail of one is compensated by
703        // another
704        downloading_pieces_stream
705            // This allows to schedule new batch while previous batches partially completed, but
706            // avoids excessive memory usage like when all futures are created upfront
707            .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
708            // Simply drain everything
709            .for_each(|()| async {})
710            .await;
711
712        *self.piece_caches.write().await = caches.into_inner();
713        self.handlers.progress.call_simple(&100.0);
714        *last_segment_index_internal = last_segment_index;
715
716        info!("Finished piece cache synchronization");
717    }
718
719    async fn process_segment_header<PG>(
720        &self,
721        piece_getter: &PG,
722        segment_header: SegmentHeader,
723        last_segment_index_internal: &mut SegmentIndex,
724    ) where
725        PG: PieceGetter,
726    {
727        let segment_index = segment_header.segment_index();
728        debug!(%segment_index, "Starting to process newly archived segment");
729
730        if *last_segment_index_internal < segment_index {
731            debug!(%segment_index, "Downloading potentially useful pieces");
732
733            // We do not insert pieces into cache/heap yet, so we don't know if all of these pieces
734            // will be included, but there is a good chance they will be, and we want to acknowledge
735            // new segment header as soon as possible
736            let pieces_to_maybe_include = segment_index
737                .segment_piece_indexes()
738                .into_iter()
739                .map(|piece_index| async move {
740                    let should_store_in_piece_cache = self
741                        .piece_caches
742                        .read()
743                        .await
744                        .should_include_key(self.peer_id, piece_index);
745
746                    let key = RecordKey::from(piece_index.to_multihash());
747                    let should_store_in_plot_cache =
748                        self.plot_caches.should_store(piece_index, &key).await;
749
750                    if !(should_store_in_piece_cache || should_store_in_plot_cache) {
751                        trace!(%piece_index, "Piece doesn't need to be cached #1");
752
753                        return None;
754                    }
755
756                    let maybe_piece_result =
757                        self.node_client
758                            .piece(piece_index)
759                            .await
760                            .inspect_err(|error| {
761                                debug!(
762                                    %error,
763                                    %segment_index,
764                                    %piece_index,
765                                    "Failed to retrieve piece from node right after archiving"
766                                );
767                            });
768
769                    if let Ok(Some(piece)) = maybe_piece_result {
770                        return Some((piece_index, piece));
771                    }
772
773                    match piece_getter.get_piece(piece_index).await {
774                        Ok(Some(piece)) => Some((piece_index, piece)),
775                        Ok(None) => {
776                            warn!(
777                                %segment_index,
778                                %piece_index,
779                                "Failed to retrieve piece right after archiving"
780                            );
781
782                            None
783                        }
784                        Err(error) => {
785                            warn!(
786                                %error,
787                                %segment_index,
788                                %piece_index,
789                                "Failed to retrieve piece right after archiving"
790                            );
791
792                            None
793                        }
794                    }
795                })
796                .collect::<FuturesUnordered<_>>()
797                .filter_map(|maybe_piece| async move { maybe_piece })
798                .collect::<Vec<_>>()
799                .await;
800
801            debug!(%segment_index, "Downloaded potentially useful pieces");
802
803            self.acknowledge_archived_segment_processing(segment_index)
804                .await;
805
806            // Go through potentially matching pieces again now that segment was acknowledged and
807            // try to persist them if necessary
808            for (piece_index, piece) in pieces_to_maybe_include {
809                if !self
810                    .plot_caches
811                    .store_additional_piece(piece_index, &piece)
812                    .await
813                {
814                    trace!(%piece_index, "Piece could not be cached in plot cache");
815                }
816
817                if !self
818                    .piece_caches
819                    .read()
820                    .await
821                    .should_include_key(self.peer_id, piece_index)
822                {
823                    trace!(%piece_index, "Piece doesn't need to be cached #2");
824
825                    continue;
826                }
827
828                trace!(%piece_index, "Piece needs to be cached #1");
829
830                self.persist_piece_in_cache(piece_index, piece).await;
831            }
832
833            *last_segment_index_internal = segment_index;
834        } else {
835            self.acknowledge_archived_segment_processing(segment_index)
836                .await;
837        }
838
839        debug!(%segment_index, "Finished processing newly archived segment");
840    }
841
842    async fn acknowledge_archived_segment_processing(&self, segment_index: SegmentIndex) {
843        match self
844            .node_client
845            .acknowledge_archived_segment_header(segment_index)
846            .await
847        {
848            Ok(()) => {
849                debug!(%segment_index, "Acknowledged archived segment");
850            }
851            Err(error) => {
852                error!(%segment_index, ?error, "Failed to acknowledge archived segment");
853            }
854        };
855    }
856
857    async fn keep_up_after_initial_sync<PG>(
858        &self,
859        piece_getter: &PG,
860        last_segment_index_internal: &mut SegmentIndex,
861    ) where
862        PG: PieceGetter,
863    {
864        let last_segment_index = match self.node_client.farmer_app_info().await {
865            Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
866            Err(error) => {
867                error!(
868                    %error,
869                    "Failed to get farmer app info from node, keeping old cache state without \
870                    updates"
871                );
872                return;
873            }
874        };
875
876        if last_segment_index <= *last_segment_index_internal {
877            return;
878        }
879
880        info!(
881            "Syncing piece cache to the latest history size, this may pause block production if \
882            takes too long"
883        );
884
885        // Keep up with segment indices that were potentially created since reinitialization
886        let piece_indices = (*last_segment_index_internal..=last_segment_index)
887            .flat_map(|segment_index| segment_index.segment_piece_indexes());
888
889        // TODO: Download pieces concurrently
890        for piece_index in piece_indices {
891            if !self
892                .piece_caches
893                .read()
894                .await
895                .should_include_key(self.peer_id, piece_index)
896            {
897                trace!(%piece_index, "Piece doesn't need to be cached #3");
898
899                continue;
900            }
901
902            trace!(%piece_index, "Piece needs to be cached #2");
903
904            let result = piece_getter.get_piece(piece_index).await;
905
906            let piece = match result {
907                Ok(Some(piece)) => piece,
908                Ok(None) => {
909                    debug!(%piece_index, "Couldn't find piece");
910                    continue;
911                }
912                Err(error) => {
913                    debug!(
914                        %error,
915                        %piece_index,
916                        "Failed to get piece for piece cache"
917                    );
918                    continue;
919                }
920            };
921
922            self.persist_piece_in_cache(piece_index, piece).await;
923        }
924
925        info!("Finished syncing piece cache to the latest history size");
926
927        *last_segment_index_internal = last_segment_index;
928    }
929
930    /// This assumes it was already checked that piece needs to be stored, no verification for this
931    /// is done internally and invariants will break if this assumption doesn't hold true
932    async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
933        let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
934        let mut caches = self.piece_caches.write().await;
935        match caches.should_replace(&key) {
936            // Entry is already occupied, we need to find and replace old piece with new one
937            Some((old_key, offset)) => {
938                let cache_index = offset.cache_index;
939                let piece_offset = offset.piece_offset;
940                let Some(backend) = caches.get_backend(cache_index) else {
941                    // Cache backend not exist
942                    warn!(
943                        %cache_index,
944                        %piece_index,
945                        "Should have a cached backend, but it didn't exist, this is an \
946                        implementation bug"
947                    );
948                    return;
949                };
950                if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
951                    error!(
952                        %error,
953                        %cache_index,
954                        %piece_index,
955                        %piece_offset,
956                        "Failed to write piece into cache"
957                    );
958                } else {
959                    let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
960                    trace!(
961                        %cache_index,
962                        %old_piece_index,
963                        %piece_index,
964                        %piece_offset,
965                        "Successfully replaced old cached piece"
966                    );
967                    caches.push_stored_piece(key, offset);
968                }
969            }
970            // There is free space in cache, need to find a free spot and place piece there
971            None => {
972                let Some(offset) = caches.pop_free_offset() else {
973                    warn!(
974                        %piece_index,
975                        "Should have inserted piece into cache, but it didn't happen, this is an \
976                        implementation bug"
977                    );
978                    return;
979                };
980                let cache_index = offset.cache_index;
981                let piece_offset = offset.piece_offset;
982                let Some(backend) = caches.get_backend(cache_index) else {
983                    // Cache backend not exist
984                    warn!(
985                        %cache_index,
986                        %piece_index,
987                        "Should have a cached backend, but it didn't exist, this is an \
988                        implementation bug"
989                    );
990                    return;
991                };
992
993                if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
994                    error!(
995                        %error,
996                        %cache_index,
997                        %piece_index,
998                        %piece_offset,
999                        "Failed to write piece into cache"
1000                    );
1001                } else {
1002                    trace!(
1003                        %cache_index,
1004                        %piece_index,
1005                        %piece_offset,
1006                        "Successfully stored piece in cache"
1007                    );
1008                    if let Some(metrics) = &self.metrics {
1009                        metrics.piece_cache_capacity_used.inc();
1010                    }
1011                    caches.push_stored_piece(key, offset);
1012                }
1013            }
1014        };
1015    }
1016}
1017
1018#[derive(Debug)]
1019struct PlotCaches {
1020    /// Additional piece caches
1021    caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
1022    /// Next plot cache to use for storing pieces
1023    next_plot_cache: AtomicUsize,
1024}
1025
1026impl PlotCaches {
1027    /// Returns true if there might be space to add a piece to a cache.
1028    /// Returns false if it is already in a cache, or it can't be added to any of the caches.
1029    ///
1030    /// Available space can be overwritten by a sector at any time, so the piece write can still
1031    /// fail even if this returns `true`.
1032    async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
1033        for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
1034            match cache.is_piece_maybe_stored(key).await {
1035                Ok(MaybePieceStoredResult::No) => {
1036                    // Isn't stored or can't be stored, try another cache if there is one
1037                }
1038                Ok(MaybePieceStoredResult::Vacant) => {
1039                    return true;
1040                }
1041                Ok(MaybePieceStoredResult::Yes) => {
1042                    // Already stored, nothing else left to do
1043                    return false;
1044                }
1045                Err(error) => {
1046                    warn!(
1047                        %cache_index,
1048                        %piece_index,
1049                        %error,
1050                        "Failed to check piece stored in cache"
1051                    );
1052                }
1053            }
1054        }
1055
1056        false
1057    }
1058
1059    /// Store a piece in additional downloaded pieces, if there is space for it.
1060    /// Returns `true` if the piece was added to a cache, and `false` if it couldn't be stored,
1061    /// typically because the cache is full.
1062    async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1063        let plot_caches = self.caches.read().await;
1064        let plot_caches_len = plot_caches.len();
1065
1066        // Store pieces in plots using round-robin distribution
1067        for _ in 0..plot_caches_len {
1068            let plot_cache_index =
1069                self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1070
1071            match plot_caches[plot_cache_index]
1072                .try_store_piece(piece_index, piece)
1073                .await
1074            {
1075                Ok(true) => {
1076                    return true;
1077                }
1078                Ok(false) => {
1079                    continue;
1080                }
1081                Err(error) => {
1082                    error!(
1083                        %error,
1084                        %piece_index,
1085                        %plot_cache_index,
1086                        "Failed to store additional piece in cache"
1087                    );
1088                    continue;
1089                }
1090            }
1091        }
1092
1093        false
1094    }
1095}
1096
1097/// Farmer cache that aggregates different kinds of caches of multiple disks.
1098///
1099/// Pieces in [`PieceCache`] are stored based on capacity and proximity of piece index to farmer's
1100/// network identity. If capacity is not enough to store all pieces in cache then pieces that are
1101/// further from network identity will be evicted, this is helpful for quick retrieval of pieces
1102/// from DSN as well as plotting purposes.
1103///
1104/// [`PlotCache`] is used as a supplementary cache and is primarily helpful for smaller farmers
1105/// where piece cache is not enough to store all the pieces on the network, while there is a lot of
1106/// space in the plot that is not used by sectors yet and can be leverage as extra caching space.
1107#[derive(Debug, Clone)]
1108pub struct FarmerCache {
1109    peer_id: PeerId,
1110    /// Individual dedicated piece caches
1111    piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1112    /// Additional piece caches
1113    plot_caches: Arc<PlotCaches>,
1114    handlers: Arc<Handlers>,
1115    // We do not want to increase capacity unnecessarily on clone
1116    worker_sender: mpsc::Sender<WorkerCommand>,
1117    metrics: Option<Arc<FarmerCacheMetrics>>,
1118}
1119
1120impl FarmerCache {
1121    /// Create new piece cache instance and corresponding worker.
1122    ///
1123    /// NOTE: Returned future is async, but does blocking operations and should be running in
1124    /// dedicated thread.
1125    pub fn new<NC>(
1126        node_client: NC,
1127        peer_id: PeerId,
1128        registry: Option<&mut Registry>,
1129    ) -> (Self, FarmerCacheWorker<NC>)
1130    where
1131        NC: NodeClient,
1132    {
1133        let caches = Arc::default();
1134        let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1135        let handlers = Arc::new(Handlers::default());
1136
1137        let plot_caches = Arc::new(PlotCaches {
1138            caches: AsyncRwLock::default(),
1139            next_plot_cache: AtomicUsize::new(0),
1140        });
1141        let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1142
1143        let instance = Self {
1144            peer_id,
1145            piece_caches: Arc::clone(&caches),
1146            plot_caches: Arc::clone(&plot_caches),
1147            handlers: Arc::clone(&handlers),
1148            worker_sender,
1149            metrics: metrics.clone(),
1150        };
1151        let worker = FarmerCacheWorker {
1152            peer_id,
1153            node_client,
1154            piece_caches: caches,
1155            plot_caches,
1156            handlers,
1157            worker_receiver: Some(worker_receiver),
1158            metrics,
1159        };
1160
1161        (instance, worker)
1162    }
1163
1164    /// Get piece from cache
1165    pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1166    where
1167        RecordKey: From<Key>,
1168    {
1169        let key = RecordKey::from(key);
1170        let maybe_piece_found = {
1171            let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1172            let caches = self.piece_caches.read().await;
1173
1174            caches.get_stored_piece(&key).and_then(|offset| {
1175                let cache_index = offset.cache_index;
1176                let piece_offset = offset.piece_offset;
1177                Some((
1178                    piece_offset,
1179                    cache_index,
1180                    caches.get_backend(cache_index)?.clone(),
1181                ))
1182            })
1183        };
1184
1185        if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1186            match backend.read_piece(piece_offset).await {
1187                Ok(maybe_piece) => {
1188                    return match maybe_piece {
1189                        Some((_piece_index, piece)) => {
1190                            if let Some(metrics) = &self.metrics {
1191                                metrics.cache_get_hit.inc();
1192                            }
1193                            Some(piece)
1194                        }
1195                        None => {
1196                            error!(
1197                                %cache_index,
1198                                %piece_offset,
1199                                ?key,
1200                                "Piece was expected to be in cache, but wasn't found there"
1201                            );
1202                            if let Some(metrics) = &self.metrics {
1203                                metrics.cache_get_error.inc();
1204                            }
1205                            None
1206                        }
1207                    };
1208                }
1209                Err(error) => {
1210                    error!(
1211                        %error,
1212                        %cache_index,
1213                        ?key,
1214                        %piece_offset,
1215                        "Error while reading piece from cache"
1216                    );
1217
1218                    if let Err(error) = self
1219                        .worker_sender
1220                        .clone()
1221                        .send(WorkerCommand::ForgetKey { key })
1222                        .await
1223                    {
1224                        trace!(%error, "Failed to send ForgetKey command to worker");
1225                    }
1226
1227                    if let Some(metrics) = &self.metrics {
1228                        metrics.cache_get_error.inc();
1229                    }
1230                    return None;
1231                }
1232            }
1233        }
1234
1235        for cache in self.plot_caches.caches.read().await.iter() {
1236            if let Ok(Some(piece)) = cache.read_piece(&key).await {
1237                if let Some(metrics) = &self.metrics {
1238                    metrics.cache_get_hit.inc();
1239                }
1240                return Some(piece);
1241            }
1242        }
1243
1244        if let Some(metrics) = &self.metrics {
1245            metrics.cache_get_miss.inc();
1246        }
1247        None
1248    }
1249
1250    /// Get pieces from cache.
1251    ///
1252    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
1253    pub async fn get_pieces<'a, PieceIndices>(
1254        &'a self,
1255        piece_indices: PieceIndices,
1256    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1257    where
1258        PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1259    {
1260        let mut pieces_to_get_from_plot_cache = Vec::new();
1261
1262        let pieces_to_read_from_piece_cache = {
1263            let caches = self.piece_caches.read().await;
1264            // Pieces to read from piece cache grouped by backend for efficiency reasons
1265            let mut pieces_to_read_from_piece_cache =
1266                HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1267
1268            for piece_index in piece_indices {
1269                let key = RecordKey::from(piece_index.to_multihash());
1270
1271                let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1272                    self.peer_id,
1273                    key.clone(),
1274                )) {
1275                    Some(offset) => offset,
1276                    None => {
1277                        pieces_to_get_from_plot_cache.push((piece_index, key));
1278                        continue;
1279                    }
1280                };
1281
1282                let cache_index = offset.cache_index;
1283                let piece_offset = offset.piece_offset;
1284
1285                match pieces_to_read_from_piece_cache.entry(cache_index) {
1286                    Entry::Occupied(mut entry) => {
1287                        let (_backend, pieces) = entry.get_mut();
1288                        pieces.insert(piece_offset, (piece_index, key));
1289                    }
1290                    Entry::Vacant(entry) => {
1291                        let backend = match caches.get_backend(cache_index) {
1292                            Some(backend) => backend.clone(),
1293                            None => {
1294                                pieces_to_get_from_plot_cache.push((piece_index, key));
1295                                continue;
1296                            }
1297                        };
1298                        entry
1299                            .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1300                    }
1301                }
1302            }
1303
1304            pieces_to_read_from_piece_cache
1305        };
1306
1307        let (tx, mut rx) = mpsc::unbounded();
1308
1309        let fut = async move {
1310            let tx = &tx;
1311
1312            let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1313                .into_iter()
1314                .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1315                    let mut pieces_stream = match backend
1316                        .read_pieces(Box::new(
1317                            pieces_to_get
1318                                .keys()
1319                                .copied()
1320                                .collect::<Vec<_>>()
1321                                .into_iter(),
1322                        ))
1323                        .await
1324                    {
1325                        Ok(pieces_stream) => pieces_stream,
1326                        Err(error) => {
1327                            error!(
1328                                %error,
1329                                %cache_index,
1330                                "Error while reading pieces from cache"
1331                            );
1332
1333                            if let Some(metrics) = &self.metrics {
1334                                metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1335                            }
1336                            for (piece_index, _key) in pieces_to_get.into_values() {
1337                                tx.unbounded_send((piece_index, None)).expect(
1338                                    "This future isn't polled after receiver is dropped; qed",
1339                                );
1340                            }
1341                            return;
1342                        }
1343                    };
1344
1345                    while let Some(maybe_piece) = pieces_stream.next().await {
1346                        let result = match maybe_piece {
1347                            Ok((piece_offset, Some((piece_index, piece)))) => {
1348                                pieces_to_get.remove(&piece_offset);
1349
1350                                if let Some(metrics) = &self.metrics {
1351                                    metrics.cache_get_hit.inc();
1352                                }
1353                                (piece_index, Some(piece))
1354                            }
1355                            Ok((piece_offset, None)) => {
1356                                let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1357                                else {
1358                                    debug!(
1359                                        %cache_index,
1360                                        %piece_offset,
1361                                        "Received piece offset that was not expected"
1362                                    );
1363                                    continue;
1364                                };
1365
1366                                error!(
1367                                    %cache_index,
1368                                    %piece_index,
1369                                    %piece_offset,
1370                                    ?key,
1371                                    "Piece was expected to be in cache, but wasn't found there"
1372                                );
1373                                if let Some(metrics) = &self.metrics {
1374                                    metrics.cache_get_error.inc();
1375                                }
1376                                (piece_index, None)
1377                            }
1378                            Err(error) => {
1379                                error!(
1380                                    %error,
1381                                    %cache_index,
1382                                    "Error while reading piece from cache"
1383                                );
1384
1385                                if let Some(metrics) = &self.metrics {
1386                                    metrics.cache_get_error.inc();
1387                                }
1388                                continue;
1389                            }
1390                        };
1391
1392                        tx.unbounded_send(result)
1393                            .expect("This future isn't polled after receiver is dropped; qed");
1394                    }
1395
1396                    if pieces_to_get.is_empty() {
1397                        return;
1398                    }
1399
1400                    if let Some(metrics) = &self.metrics {
1401                        metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1402                    }
1403                    for (piece_offset, (piece_index, key)) in pieces_to_get {
1404                        error!(
1405                            %cache_index,
1406                            %piece_index,
1407                            %piece_offset,
1408                            ?key,
1409                            "Piece cache didn't return an entry for offset"
1410                        );
1411
1412                        // Uphold invariant of the method that some result should be returned
1413                        // for every unique piece index
1414                        tx.unbounded_send((piece_index, None))
1415                            .expect("This future isn't polled after receiver is dropped; qed");
1416                    }
1417                })
1418                .collect::<FuturesUnordered<_>>();
1419            // TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650
1420            // Simply drain everything
1421            // .for_each(|()| async {})
1422
1423            // TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved
1424            let reading_from_piece_cache_fut = async move {
1425                while let Some(()) = reading_from_piece_cache.next().await {
1426                    // Simply drain everything
1427                }
1428            };
1429
1430            let reading_from_plot_cache_fut = async {
1431                if pieces_to_get_from_plot_cache.is_empty() {
1432                    return;
1433                }
1434
1435                for cache in self.plot_caches.caches.read().await.iter() {
1436                    // Iterating over offsets in reverse order to both traverse elements in async code
1437                    // and being able to efficiently remove entries without extra allocations
1438                    for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1439                        let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1440
1441                        if let Ok(Some(piece)) = cache.read_piece(key).await {
1442                            if let Some(metrics) = &self.metrics {
1443                                metrics.cache_get_hit.inc();
1444                            }
1445                            tx.unbounded_send((*piece_index, Some(piece)))
1446                                .expect("This future isn't polled after receiver is dropped; qed");
1447
1448                            // Due to iteration in reverse order and swapping using elements at the end,
1449                            // this doesn't affect processing of the elements
1450                            pieces_to_get_from_plot_cache.swap_remove(offset);
1451                        }
1452                    }
1453
1454                    if pieces_to_get_from_plot_cache.is_empty() {
1455                        return;
1456                    }
1457                }
1458
1459                if let Some(metrics) = &self.metrics {
1460                    metrics
1461                        .cache_get_miss
1462                        .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1463                }
1464
1465                for (piece_index, _key) in pieces_to_get_from_plot_cache {
1466                    tx.unbounded_send((piece_index, None))
1467                        .expect("This future isn't polled after receiver is dropped; qed");
1468                }
1469            };
1470
1471            join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1472        };
1473        let mut fut = Box::pin(fut.fuse());
1474
1475        // Drive above future and stream back any pieces that were downloaded so far
1476        stream::poll_fn(move |cx| {
1477            if !fut.is_terminated() {
1478                // Result doesn't matter, we'll need to poll stream below anyway
1479                let _ = fut.poll_unpin(cx);
1480            }
1481
1482            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1483                return Poll::Ready(maybe_result);
1484            }
1485
1486            // Exit will be done by the stream above
1487            Poll::Pending
1488        })
1489    }
1490
1491    /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
1492    pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1493        let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1494            piece_indices
1495                .iter()
1496                .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1497        );
1498
1499        // Quick check in piece caches
1500        {
1501            let piece_caches = self.piece_caches.read().await;
1502            pieces_to_find.retain(|_piece_index, key| {
1503                let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1504                !piece_caches.contains_stored_piece(&distance_key)
1505            });
1506        }
1507
1508        // Early exit if everything is cached
1509        if pieces_to_find.is_empty() {
1510            return piece_indices;
1511        }
1512
1513        // Check plot caches concurrently
1514        if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1515            let plot_caches = &plot_caches;
1516            let not_found = pieces_to_find
1517                .into_iter()
1518                .map(|(piece_index, key)| async move {
1519                    let key = &key;
1520
1521                    let found = plot_caches
1522                        .iter()
1523                        .map(|plot_cache| async {
1524                            matches!(
1525                                plot_cache.is_piece_maybe_stored(key).await,
1526                                Ok(MaybePieceStoredResult::Yes)
1527                            )
1528                        })
1529                        .collect::<FuturesUnordered<_>>()
1530                        .any(|found| async move { found })
1531                        .await;
1532
1533                    if found { None } else { Some(piece_index) }
1534                })
1535                .collect::<FuturesUnordered<_>>()
1536                .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1537                .collect::<HashSet<_>>()
1538                .await;
1539            piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1540        }
1541        piece_indices
1542    }
1543
1544    /// Find piece in cache and return its retrieval details
1545    pub async fn find_piece(
1546        &self,
1547        piece_index: PieceIndex,
1548    ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1549        let caches = self.piece_caches.read().await;
1550
1551        self.find_piece_internal(&caches, piece_index)
1552    }
1553
1554    /// Find pieces in cache and return their retrieval details
1555    pub async fn find_pieces<PieceIndices>(
1556        &self,
1557        piece_indices: PieceIndices,
1558    ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1559    where
1560        PieceIndices: IntoIterator<Item = PieceIndex>,
1561    {
1562        let caches = self.piece_caches.read().await;
1563
1564        piece_indices
1565            .into_iter()
1566            .filter_map(|piece_index| {
1567                self.find_piece_internal(&caches, piece_index)
1568                    .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1569            })
1570            .collect()
1571    }
1572
1573    fn find_piece_internal(
1574        &self,
1575        caches: &PieceCachesState,
1576        piece_index: PieceIndex,
1577    ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1578        let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1579
1580        let Some(offset) = caches.get_stored_piece(&key) else {
1581            if let Some(metrics) = &self.metrics {
1582                metrics.cache_find_miss.inc();
1583            }
1584
1585            return None;
1586        };
1587        let piece_offset = offset.piece_offset;
1588
1589        if let Some(backend) = caches.get_backend(offset.cache_index) {
1590            if let Some(metrics) = &self.metrics {
1591                metrics.cache_find_hit.inc();
1592            }
1593            return Some((*backend.id(), piece_offset));
1594        }
1595
1596        if let Some(metrics) = &self.metrics {
1597            metrics.cache_find_miss.inc();
1598        }
1599        None
1600    }
1601
1602    /// Try to store a piece in additional downloaded pieces, if there is space for it.
1603    /// Returns `true` if the piece was added to this cache, and `false` if it was already stored,
1604    /// or there was no space.
1605    pub async fn maybe_store_additional_piece(
1606        &self,
1607        piece_index: PieceIndex,
1608        piece: &Piece,
1609    ) -> bool {
1610        let key = RecordKey::from(piece_index.to_multihash());
1611
1612        let should_store = self.plot_caches.should_store(piece_index, &key).await;
1613
1614        if !should_store {
1615            return false;
1616        }
1617
1618        self.plot_caches
1619            .store_additional_piece(piece_index, piece)
1620            .await
1621    }
1622
1623    /// Initialize replacement of backing caches
1624    pub async fn replace_backing_caches(
1625        &self,
1626        new_piece_caches: Vec<Arc<dyn PieceCache>>,
1627        new_plot_caches: Vec<Arc<dyn PlotCache>>,
1628    ) {
1629        if let Err(error) = self
1630            .worker_sender
1631            .clone()
1632            .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1633            .await
1634        {
1635            warn!(%error, "Failed to replace backing caches, worker exited");
1636        }
1637
1638        *self.plot_caches.caches.write().await = new_plot_caches;
1639    }
1640
1641    /// Subscribe to cache sync notifications
1642    pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1643        self.handlers.progress.add(callback)
1644    }
1645}
1646
1647/// Collection of [`FarmerCache`] instances for load balancing
1648#[derive(Debug, Clone)]
1649pub struct FarmerCaches {
1650    caches: Arc<[FarmerCache]>,
1651}
1652
1653impl From<Arc<[FarmerCache]>> for FarmerCaches {
1654    fn from(caches: Arc<[FarmerCache]>) -> Self {
1655        Self { caches }
1656    }
1657}
1658
1659impl From<FarmerCache> for FarmerCaches {
1660    fn from(cache: FarmerCache) -> Self {
1661        Self {
1662            caches: Arc::new([cache]),
1663        }
1664    }
1665}
1666
1667impl FarmerCaches {
1668    /// Get piece from cache
1669    pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1670    where
1671        RecordKey: From<Key>,
1672    {
1673        let farmer_cache = self.caches.choose(&mut thread_rng())?;
1674        farmer_cache.get_piece(key).await
1675    }
1676
1677    /// Get pieces from cache.
1678    ///
1679    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
1680    pub async fn get_pieces<'a, PieceIndices>(
1681        &'a self,
1682        piece_indices: PieceIndices,
1683    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1684    where
1685        PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1686    {
1687        let Some(farmer_cache) = self.caches.choose(&mut thread_rng()) else {
1688            return Either::Left(stream::iter(
1689                piece_indices
1690                    .into_iter()
1691                    .map(|piece_index| (piece_index, None)),
1692            ));
1693        };
1694
1695        Either::Right(farmer_cache.get_pieces(piece_indices).await)
1696    }
1697
1698    /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
1699    pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1700        let Some(farmer_cache) = self.caches.choose(&mut thread_rng()) else {
1701            return Vec::new();
1702        };
1703
1704        farmer_cache.has_pieces(piece_indices).await
1705    }
1706
1707    /// Try to store a piece in additional downloaded pieces, if there is space for it.
1708    /// Returns `true` if the piece was added to one or more caches, and `false` if it was already
1709    /// stored, or there was no space.
1710    pub async fn maybe_store_additional_piece(
1711        &self,
1712        piece_index: PieceIndex,
1713        piece: &Piece,
1714    ) -> bool {
1715        // Run all the futures to completion, and take a non-short-circuiting any() on the results.
1716        self.caches
1717            .iter()
1718            .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1719            .collect::<FuturesUnordered<_>>()
1720            .fold::<bool, _, _>(false, |acc, stored| async move { acc || stored })
1721            .await
1722    }
1723}
1724
1725/// Extracts the `PieceIndex` from a `RecordKey`.
1726fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1727    let len = key.as_ref().len();
1728    let s = len - PieceIndex::SIZE;
1729
1730    let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1731    piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1732
1733    PieceIndex::from_bytes(piece_index_bytes)
1734}