subspace_farmer/
farmer_cache.rs

1//! A container that caches pieces
2//!
3//! Farmer cache is a container that orchestrates a bunch of piece and plot caches that together
4//! persist pieces in a way that is easy to retrieve comparing to decoding pieces from plots.
5
6mod metrics;
7mod piece_cache_state;
8#[cfg(test)]
9mod tests;
10
11use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
12use crate::farmer_cache::metrics::FarmerCacheMetrics;
13use crate::farmer_cache::piece_cache_state::PieceCachesState;
14use crate::node_client::NodeClient;
15use crate::utils::run_future_in_dedicated_thread;
16use async_lock::RwLock as AsyncRwLock;
17use event_listener_primitives::{Bag, HandlerId};
18use futures::channel::mpsc;
19use futures::future::{Either, FusedFuture};
20use futures::stream::{FuturesOrdered, FuturesUnordered};
21use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt};
22use parking_lot::{Mutex, RwLock};
23use prometheus_client::registry::Registry;
24use rand::prelude::*;
25use rayon::prelude::*;
26use std::collections::hash_map::Entry;
27use std::collections::{HashMap, HashSet};
28use std::future::join;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::Arc;
31use std::task::Poll;
32use std::time::Duration;
33use std::{fmt, mem};
34use subspace_core_primitives::pieces::{Piece, PieceIndex};
35use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
36use subspace_data_retrieval::piece_getter::PieceGetter;
37use subspace_networking::libp2p::kad::RecordKey;
38use subspace_networking::libp2p::PeerId;
39use subspace_networking::utils::multihash::ToMultihash;
40use subspace_networking::KeyWithDistance;
41use tokio::sync::Semaphore;
42use tokio::task::yield_now;
43use tracing::{debug, error, info, info_span, trace, warn, Instrument};
44
45const WORKER_CHANNEL_CAPACITY: usize = 100;
46const SYNC_BATCH_SIZE: usize = 256;
47const SYNC_CONCURRENT_BATCHES: usize = 4;
48/// Make caches available as they are building without waiting for the initialization to finish,
49/// this number defines an interval in pieces after which cache is updated
50const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
51const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
52
53type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
54type Handler<A> = Bag<HandlerFn<A>, A>;
55type CacheIndex = u8;
56
57#[derive(Default, Debug)]
58struct Handlers {
59    progress: Handler<f32>,
60}
61
62#[derive(Debug, Clone, Copy)]
63struct FarmerCacheOffset {
64    cache_index: CacheIndex,
65    piece_offset: PieceCacheOffset,
66}
67
68impl FarmerCacheOffset {
69    fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
70        Self {
71            cache_index,
72            piece_offset,
73        }
74    }
75}
76
77#[derive(Debug, Clone)]
78struct CacheBackend {
79    backend: Arc<dyn PieceCache>,
80    used_capacity: u32,
81    total_capacity: u32,
82}
83
84impl std::ops::Deref for CacheBackend {
85    type Target = Arc<dyn PieceCache>;
86
87    fn deref(&self) -> &Self::Target {
88        &self.backend
89    }
90}
91
92impl CacheBackend {
93    fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
94        Self {
95            backend,
96            used_capacity: 0,
97            total_capacity,
98        }
99    }
100
101    fn next_free(&mut self) -> Option<PieceCacheOffset> {
102        let offset = self.used_capacity;
103        if offset < self.total_capacity {
104            self.used_capacity += 1;
105            Some(PieceCacheOffset(offset))
106        } else {
107            debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
108            None
109        }
110    }
111
112    fn free_size(&self) -> u32 {
113        self.total_capacity - self.used_capacity
114    }
115}
116
117#[derive(Debug)]
118struct CacheState {
119    cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
120    cache_free_offsets: Vec<FarmerCacheOffset>,
121    backend: CacheBackend,
122}
123
124#[derive(Debug)]
125enum WorkerCommand {
126    ReplaceBackingCaches {
127        new_piece_caches: Vec<Arc<dyn PieceCache>>,
128    },
129    ForgetKey {
130        key: RecordKey,
131    },
132}
133
134/// Farmer cache worker used to drive the farmer cache backend
135#[derive(Debug)]
136#[must_use = "Farmer cache will not work unless its worker is running"]
137pub struct FarmerCacheWorker<NC>
138where
139    NC: fmt::Debug,
140{
141    peer_id: PeerId,
142    node_client: NC,
143    piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
144    plot_caches: Arc<PlotCaches>,
145    handlers: Arc<Handlers>,
146    worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
147    metrics: Option<Arc<FarmerCacheMetrics>>,
148}
149
150impl<NC> FarmerCacheWorker<NC>
151where
152    NC: NodeClient,
153{
154    /// Run the cache worker with provided piece getter.
155    ///
156    /// NOTE: Piece getter must not depend on farmer cache in order to avoid reference cycles!
157    pub async fn run<PG>(mut self, piece_getter: PG)
158    where
159        PG: PieceGetter,
160    {
161        // Limit is dynamically set later
162        let mut last_segment_index_internal = SegmentIndex::ZERO;
163
164        let mut worker_receiver = self
165            .worker_receiver
166            .take()
167            .expect("Always set during worker instantiation");
168
169        if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
170            worker_receiver.next().await
171        {
172            self.initialize(
173                &piece_getter,
174                &mut last_segment_index_internal,
175                new_piece_caches,
176            )
177            .await;
178        } else {
179            // Piece cache is dropped before backing caches were sent
180            return;
181        }
182
183        let mut segment_headers_notifications =
184            match self.node_client.subscribe_archived_segment_headers().await {
185                Ok(segment_headers_notifications) => segment_headers_notifications,
186                Err(error) => {
187                    error!(%error, "Failed to subscribe to archived segments notifications");
188                    return;
189                }
190            };
191
192        // Keep up with segment indices that were potentially created since reinitialization,
193        // depending on the size of the diff this may pause block production for a while (due to
194        // subscription we have created above)
195        self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
196            .await;
197
198        loop {
199            select! {
200                maybe_command = worker_receiver.next() => {
201                    let Some(command) = maybe_command else {
202                        // Nothing else left to do
203                        return;
204                    };
205
206                    self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
207                }
208                maybe_segment_header = segment_headers_notifications.next().fuse() => {
209                    if let Some(segment_header) = maybe_segment_header {
210                        self.process_segment_header(&piece_getter, segment_header, &mut last_segment_index_internal).await;
211                    } else {
212                        // Keep-up sync only ends with subscription, which lasts for duration of an
213                        // instance
214                        return;
215                    }
216                }
217            }
218        }
219    }
220
221    async fn handle_command<PG>(
222        &self,
223        command: WorkerCommand,
224        piece_getter: &PG,
225        last_segment_index_internal: &mut SegmentIndex,
226    ) where
227        PG: PieceGetter,
228    {
229        match command {
230            WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
231                self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
232                    .await;
233            }
234            // TODO: Consider implementing optional re-sync of the piece instead of just forgetting
235            WorkerCommand::ForgetKey { key } => {
236                let mut caches = self.piece_caches.write().await;
237                let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
238                let Some(offset) = caches.remove_stored_piece(&key) else {
239                    // Key not exist
240                    return;
241                };
242
243                let cache_index = offset.cache_index;
244                let piece_offset = offset.piece_offset;
245                let Some(backend) = caches.get_backend(cache_index).cloned() else {
246                    // Cache backend not exist
247                    return;
248                };
249
250                caches.push_dangling_free_offset(offset);
251                match backend.read_piece_index(piece_offset).await {
252                    Ok(Some(piece_index)) => {
253                        trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
254                    }
255                    Ok(None) => {
256                        warn!(
257                            %cache_index,
258                            %piece_offset,
259                            "Piece index out of range, this is likely an implementation bug, \
260                            not freeing heap element"
261                        );
262                    }
263                    Err(error) => {
264                        error!(
265                            %error,
266                            %cache_index,
267                            ?key,
268                            %piece_offset,
269                            "Error while reading piece from cache"
270                        );
271                    }
272                }
273            }
274        }
275    }
276
277    async fn initialize<PG>(
278        &self,
279        piece_getter: &PG,
280        last_segment_index_internal: &mut SegmentIndex,
281        new_piece_caches: Vec<Arc<dyn PieceCache>>,
282    ) where
283        PG: PieceGetter,
284    {
285        info!("Initializing piece cache");
286
287        // Pull old cache state since it will be replaced with a new one and reuse its allocations
288        let (mut stored_pieces, mut dangling_free_offsets) =
289            mem::take(&mut *self.piece_caches.write().await).reuse();
290
291        debug!("Collecting pieces that were in the cache before");
292
293        if let Some(metrics) = &self.metrics {
294            metrics.piece_cache_capacity_total.set(0);
295            metrics.piece_cache_capacity_used.set(0);
296        }
297
298        let peer_id = self.peer_id;
299
300        // Build cache state of all backends
301        let piece_caches_number = new_piece_caches.len();
302        let maybe_caches_futures = new_piece_caches
303            .into_iter()
304            .enumerate()
305            .filter_map(|(cache_index, new_cache)| {
306                let total_capacity = new_cache.max_num_elements();
307                let mut backend = CacheBackend::new(new_cache, total_capacity);
308                let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
309                    warn!(
310                        ?piece_caches_number,
311                        "Too many piece caches provided, {cache_index} cache will be ignored",
312                    );
313                    return None;
314                };
315
316                if let Some(metrics) = &self.metrics {
317                    metrics
318                        .piece_cache_capacity_total
319                        .inc_by(total_capacity as i64);
320                }
321
322                let init_fut = async move {
323                    let used_capacity = &mut backend.used_capacity;
324
325                    // Hack with first collecting into `Option` with `Option::take()` call
326                    // later is to satisfy compiler that gets confused about ownership
327                    // otherwise
328                    let mut maybe_contents = match backend.backend.contents().await {
329                        Ok(contents) => Some(contents),
330                        Err(error) => {
331                            warn!(%error, "Failed to get cache contents");
332
333                            None
334                        }
335                    };
336
337                    #[allow(clippy::mutable_key_type)]
338                    let mut cache_stored_pieces = HashMap::new();
339                    let mut cache_free_offsets = Vec::new();
340
341                    let Some(mut contents) = maybe_contents.take() else {
342                        drop(maybe_contents);
343
344                        return CacheState {
345                            cache_stored_pieces,
346                            cache_free_offsets,
347                            backend,
348                        };
349                    };
350
351                    while let Some(maybe_element_details) = contents.next().await {
352                        let (piece_offset, maybe_piece_index) = match maybe_element_details {
353                            Ok(element_details) => element_details,
354                            Err(error) => {
355                                warn!(%error, "Failed to get cache contents element details");
356                                break;
357                            }
358                        };
359                        let offset = FarmerCacheOffset::new(cache_index, piece_offset);
360                        match maybe_piece_index {
361                            Some(piece_index) => {
362                                *used_capacity = piece_offset.0 + 1;
363                                let record_key = RecordKey::from(piece_index.to_multihash());
364                                let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
365                                cache_stored_pieces.insert(key, offset);
366                            }
367                            None => {
368                                // TODO: Optimize to not store all free offsets, only dangling
369                                //  offsets are actually necessary
370                                cache_free_offsets.push(offset);
371                            }
372                        }
373
374                        // Allow for task to be aborted
375                        yield_now().await;
376                    }
377
378                    drop(maybe_contents);
379                    drop(contents);
380
381                    CacheState {
382                        cache_stored_pieces,
383                        cache_free_offsets,
384                        backend,
385                    }
386                };
387
388                Some(run_future_in_dedicated_thread(
389                    move || init_fut.instrument(info_span!("", %cache_index)),
390                    format!("piece-cache.{cache_index}"),
391                ))
392            })
393            .collect::<Result<Vec<_>, _>>();
394
395        let caches_futures = match maybe_caches_futures {
396            Ok(caches_futures) => caches_futures,
397            Err(error) => {
398                error!(%error, "Failed to spawn piece cache reading thread");
399
400                return;
401            }
402        };
403
404        let mut backends = Vec::with_capacity(caches_futures.len());
405        let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
406
407        while let Some(maybe_cache) = caches_futures.next().await {
408            match maybe_cache {
409                Ok(cache) => {
410                    let backend = cache.backend;
411                    for (key, cache_offset) in cache.cache_stored_pieces {
412                        if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
413                            dangling_free_offsets.push_front(old_cache_offset);
414                        }
415                    }
416                    dangling_free_offsets.extend(
417                        cache.cache_free_offsets.into_iter().filter(|free_offset| {
418                            free_offset.piece_offset.0 < backend.used_capacity
419                        }),
420                    );
421                    backends.push(backend);
422                }
423                Err(_cancelled) => {
424                    error!("Piece cache reading thread panicked");
425
426                    return;
427                }
428            };
429        }
430
431        let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
432
433        info!("Synchronizing piece cache");
434
435        let last_segment_index = loop {
436            match self.node_client.farmer_app_info().await {
437                Ok(farmer_app_info) => {
438                    let last_segment_index =
439                        farmer_app_info.protocol_info.history_size.segment_index();
440                    // Wait for node to be either fully synced or to be aware of non-zero segment
441                    // index, which would indicate it has started DSN sync and knows about
442                    // up-to-date archived history.
443                    //
444                    // While this doesn't account for situations where node was offline for a long
445                    // time and is aware of old segment headers, this is good enough for piece cache
446                    // sync to proceed and should result in better user experience on average.
447                    if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
448                        break last_segment_index;
449                    }
450                }
451                Err(error) => {
452                    error!(
453                        %error,
454                        "Failed to get farmer app info from node, keeping old cache state without \
455                        updates"
456                    );
457
458                    // Not the latest, but at least something
459                    *self.piece_caches.write().await = caches;
460                    return;
461                }
462            }
463
464            tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
465        };
466
467        debug!(%last_segment_index, "Identified last segment index");
468
469        // Collect all the piece indices that need to be stored, we will sort them later
470        let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
471        // TODO: This may eventually be too much to store in memory, though right now it is a
472        //  non-issue in practice
473        let mut piece_indices_to_store = segment_indices
474            .into_par_iter()
475            .flat_map(|segment_index| {
476                segment_index
477                    .segment_piece_indexes()
478                    .into_par_iter()
479                    .map(|piece_index| {
480                        (
481                            KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
482                            piece_index,
483                        )
484                    })
485            })
486            .collect::<Vec<_>>();
487
488        // Sort pieces by distance from peer to piece such that they are in ascending order
489        // and have higher chance of download
490        piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
491
492        // `HashMap` is faster than `BTreeMap`
493        let mut piece_indices_to_store = piece_indices_to_store
494            .into_iter()
495            .take(caches.total_capacity())
496            .collect::<HashMap<_, _>>();
497
498        let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
499        // Filter-out piece indices that are stored, but should not be as well as clean
500        // `inserted_piece_indices` from already stored piece indices, leaving just those that are
501        // still missing in cache
502        caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
503
504        if let Some(metrics) = &self.metrics {
505            for offset in caches.stored_pieces_offsets() {
506                piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
507            }
508
509            for cache_used in piece_caches_capacity_used {
510                metrics
511                    .piece_cache_capacity_used
512                    .inc_by(i64::from(cache_used));
513            }
514        }
515
516        // Store whatever correct pieces are immediately available after restart
517        self.piece_caches.write().await.clone_from(&caches);
518
519        debug!(
520            count = %piece_indices_to_store.len(),
521            "Identified piece indices that should be cached",
522        );
523
524        let pieces_to_download_total = piece_indices_to_store.len();
525        let piece_indices_to_store = piece_indices_to_store
526            .into_values()
527            .collect::<Vec<_>>()
528            // TODO: Allocating chunks here shouldn't be necessary, but otherwise it fails with
529            //  confusing error described in https://github.com/rust-lang/rust/issues/64552 and
530            //  similar upstream issues
531            .chunks(SYNC_BATCH_SIZE)
532            .map(|chunk| chunk.to_vec())
533            .collect::<Vec<_>>();
534
535        let downloaded_pieces_count = AtomicUsize::new(0);
536        let caches = Mutex::new(caches);
537        self.handlers.progress.call_simple(&0.0);
538        let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
539
540        let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
541        let ignored_cache_indices = &RwLock::new(HashSet::new());
542
543        let downloading_pieces_stream =
544            stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
545                let downloaded_pieces_count = &downloaded_pieces_count;
546                let caches = &caches;
547
548                async move {
549                    let mut permit = downloading_semaphore
550                        .acquire_many(SYNC_BATCH_SIZE as u32)
551                        .await
552                        .expect("Semaphore is never closed; qed");
553                    debug!(%batch, num_pieces = %piece_indices.len(), "Downloading pieces");
554
555                    let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
556                        Ok(pieces_stream) => pieces_stream,
557                        Err(error) => {
558                            error!(
559                                %error,
560                                "Failed to get pieces from piece getter"
561                            );
562                            return;
563                        }
564                    };
565                    let mut pieces_stream = pieces_stream.enumerate();
566
567                    while let Some((index, (piece_index, result))) = pieces_stream.next().await {
568                        debug!(%batch, %index, %piece_index, "Downloaded piece");
569
570                        let piece = match result {
571                            Ok(Some(piece)) => {
572                                trace!(%batch, %piece_index, "Downloaded piece successfully");
573                                piece
574                            }
575                            Ok(None) => {
576                                debug!(%batch, %piece_index, "Couldn't find piece");
577                                continue;
578                            }
579                            Err(error) => {
580                                debug!(
581                                    %batch,
582                                    %error,
583                                    %piece_index,
584                                    "Failed to get piece for piece cache"
585                                );
586                                continue;
587                            }
588                        };
589                        // Release slot for future batches
590                        permit.split(1);
591
592                        let (offset, maybe_backend) = {
593                            let mut caches = caches.lock();
594
595                            // Find plot in which there is a place for new piece to be stored
596                            let Some(offset) = caches.pop_free_offset() else {
597                                error!(
598                                    %batch,
599                                    %piece_index,
600                                    "Failed to store piece in cache, there was no space"
601                                );
602                                break;
603                            };
604
605                            (offset, caches.get_backend(offset.cache_index).cloned())
606                        };
607
608                        let cache_index = offset.cache_index;
609                        let piece_offset = offset.piece_offset;
610
611                        let skip_write = ignored_cache_indices.read().contains(&cache_index);
612                        if skip_write {
613                            trace!(
614                                %batch,
615                                %cache_index,
616                                %piece_index,
617                                %piece_offset,
618                                "Skipping known problematic cache index"
619                            );
620                        } else {
621                            if let Some(backend) = maybe_backend
622                                && let Err(error) =
623                                    backend.write_piece(piece_offset, piece_index, &piece).await
624                            {
625                                error!(
626                                    %error,
627                                    %batch,
628                                    %cache_index,
629                                    %piece_index,
630                                    %piece_offset,
631                                    "Failed to write piece into cache, ignoring this cache going \
632                                    forward"
633                                );
634                                ignored_cache_indices.write().insert(cache_index);
635                                continue;
636                            }
637
638                            let key =
639                                KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
640                            caches.lock().push_stored_piece(key, offset);
641                        }
642
643                        let prev_downloaded_pieces_count =
644                            downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
645                        // Do not print anything or send progress notification after last piece
646                        // until piece cache is written fully below
647                        if prev_downloaded_pieces_count != pieces_to_download_total {
648                            let progress = prev_downloaded_pieces_count as f32
649                                / pieces_to_download_total as f32
650                                * 100.0;
651                            if prev_downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL
652                                == 0
653                            {
654                                let mut piece_caches = self.piece_caches.write().await;
655                                piece_caches.clone_from(&caches.lock());
656
657                                info!("Piece cache sync {progress:.2}% complete");
658                            }
659
660                            self.handlers.progress.call_simple(&progress);
661                        }
662                    }
663                }
664            }));
665
666        // Download several batches concurrently to make sure slow tail of one is compensated by
667        // another
668        downloading_pieces_stream
669            // This allows to schedule new batch while previous batches partially completed, but
670            // avoids excessive memory usage like when all futures are created upfront
671            .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
672            // Simply drain everything
673            .for_each(|()| async {})
674            .await;
675
676        *self.piece_caches.write().await = caches.into_inner();
677        self.handlers.progress.call_simple(&100.0);
678        *last_segment_index_internal = last_segment_index;
679
680        info!("Finished piece cache synchronization");
681    }
682
683    async fn process_segment_header<PG>(
684        &self,
685        piece_getter: &PG,
686        segment_header: SegmentHeader,
687        last_segment_index_internal: &mut SegmentIndex,
688    ) where
689        PG: PieceGetter,
690    {
691        let segment_index = segment_header.segment_index();
692        debug!(%segment_index, "Starting to process newly archived segment");
693
694        if *last_segment_index_internal < segment_index {
695            debug!(%segment_index, "Downloading potentially useful pieces");
696
697            // We do not insert pieces into cache/heap yet, so we don't know if all of these pieces
698            // will be included, but there is a good chance they will be, and we want to acknowledge
699            // new segment header as soon as possible
700            let pieces_to_maybe_include = segment_index
701                .segment_piece_indexes()
702                .into_iter()
703                .map(|piece_index| async move {
704                    let should_store_in_piece_cache = self
705                        .piece_caches
706                        .read()
707                        .await
708                        .should_include_key(self.peer_id, piece_index);
709
710                    let key = RecordKey::from(piece_index.to_multihash());
711                    let should_store_in_plot_cache =
712                        self.plot_caches.should_store(piece_index, &key).await;
713
714                    if !(should_store_in_piece_cache || should_store_in_plot_cache) {
715                        trace!(%piece_index, "Piece doesn't need to be cached #1");
716
717                        return None;
718                    }
719
720                    let maybe_piece_result =
721                        self.node_client
722                            .piece(piece_index)
723                            .await
724                            .inspect_err(|error| {
725                                debug!(
726                                    %error,
727                                    %segment_index,
728                                    %piece_index,
729                                    "Failed to retrieve piece from node right after archiving"
730                                );
731                            });
732
733                    if let Ok(Some(piece)) = maybe_piece_result {
734                        return Some((piece_index, piece));
735                    }
736
737                    match piece_getter.get_piece(piece_index).await {
738                        Ok(Some(piece)) => Some((piece_index, piece)),
739                        Ok(None) => {
740                            warn!(
741                                %segment_index,
742                                %piece_index,
743                                "Failed to retrieve piece right after archiving"
744                            );
745
746                            None
747                        }
748                        Err(error) => {
749                            warn!(
750                                %error,
751                                %segment_index,
752                                %piece_index,
753                                "Failed to retrieve piece right after archiving"
754                            );
755
756                            None
757                        }
758                    }
759                })
760                .collect::<FuturesUnordered<_>>()
761                .filter_map(|maybe_piece| async move { maybe_piece })
762                .collect::<Vec<_>>()
763                .await;
764
765            debug!(%segment_index, "Downloaded potentially useful pieces");
766
767            self.acknowledge_archived_segment_processing(segment_index)
768                .await;
769
770            // Go through potentially matching pieces again now that segment was acknowledged and
771            // try to persist them if necessary
772            for (piece_index, piece) in pieces_to_maybe_include {
773                if !self
774                    .plot_caches
775                    .store_additional_piece(piece_index, &piece)
776                    .await
777                {
778                    trace!(%piece_index, "Piece doesn't need to be cached in plot cache");
779                }
780
781                if !self
782                    .piece_caches
783                    .read()
784                    .await
785                    .should_include_key(self.peer_id, piece_index)
786                {
787                    trace!(%piece_index, "Piece doesn't need to be cached #2");
788
789                    continue;
790                }
791
792                trace!(%piece_index, "Piece needs to be cached #1");
793
794                self.persist_piece_in_cache(piece_index, piece).await;
795            }
796
797            *last_segment_index_internal = segment_index;
798        } else {
799            self.acknowledge_archived_segment_processing(segment_index)
800                .await;
801        }
802
803        debug!(%segment_index, "Finished processing newly archived segment");
804    }
805
806    async fn acknowledge_archived_segment_processing(&self, segment_index: SegmentIndex) {
807        match self
808            .node_client
809            .acknowledge_archived_segment_header(segment_index)
810            .await
811        {
812            Ok(()) => {
813                debug!(%segment_index, "Acknowledged archived segment");
814            }
815            Err(error) => {
816                error!(%segment_index, ?error, "Failed to acknowledge archived segment");
817            }
818        };
819    }
820
821    async fn keep_up_after_initial_sync<PG>(
822        &self,
823        piece_getter: &PG,
824        last_segment_index_internal: &mut SegmentIndex,
825    ) where
826        PG: PieceGetter,
827    {
828        let last_segment_index = match self.node_client.farmer_app_info().await {
829            Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
830            Err(error) => {
831                error!(
832                    %error,
833                    "Failed to get farmer app info from node, keeping old cache state without \
834                    updates"
835                );
836                return;
837            }
838        };
839
840        if last_segment_index <= *last_segment_index_internal {
841            return;
842        }
843
844        info!(
845            "Syncing piece cache to the latest history size, this may pause block production if \
846            takes too long"
847        );
848
849        // Keep up with segment indices that were potentially created since reinitialization
850        let piece_indices = (*last_segment_index_internal..=last_segment_index)
851            .flat_map(|segment_index| segment_index.segment_piece_indexes());
852
853        // TODO: Download pieces concurrently
854        for piece_index in piece_indices {
855            if !self
856                .piece_caches
857                .read()
858                .await
859                .should_include_key(self.peer_id, piece_index)
860            {
861                trace!(%piece_index, "Piece doesn't need to be cached #3");
862
863                continue;
864            }
865
866            trace!(%piece_index, "Piece needs to be cached #2");
867
868            let result = piece_getter.get_piece(piece_index).await;
869
870            let piece = match result {
871                Ok(Some(piece)) => piece,
872                Ok(None) => {
873                    debug!(%piece_index, "Couldn't find piece");
874                    continue;
875                }
876                Err(error) => {
877                    debug!(
878                        %error,
879                        %piece_index,
880                        "Failed to get piece for piece cache"
881                    );
882                    continue;
883                }
884            };
885
886            self.persist_piece_in_cache(piece_index, piece).await;
887        }
888
889        info!("Finished syncing piece cache to the latest history size");
890
891        *last_segment_index_internal = last_segment_index;
892    }
893
894    /// This assumes it was already checked that piece needs to be stored, no verification for this
895    /// is done internally and invariants will break if this assumption doesn't hold true
896    async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
897        let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
898        let mut caches = self.piece_caches.write().await;
899        match caches.should_replace(&key) {
900            // Entry is already occupied, we need to find and replace old piece with new one
901            Some((old_key, offset)) => {
902                let cache_index = offset.cache_index;
903                let piece_offset = offset.piece_offset;
904                let Some(backend) = caches.get_backend(cache_index) else {
905                    // Cache backend not exist
906                    warn!(
907                        %cache_index,
908                        %piece_index,
909                        "Should have a cached backend, but it didn't exist, this is an \
910                        implementation bug"
911                    );
912                    return;
913                };
914                if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
915                    error!(
916                        %error,
917                        %cache_index,
918                        %piece_index,
919                        %piece_offset,
920                        "Failed to write piece into cache"
921                    );
922                } else {
923                    let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
924                    trace!(
925                        %cache_index,
926                        %old_piece_index,
927                        %piece_index,
928                        %piece_offset,
929                        "Successfully replaced old cached piece"
930                    );
931                    caches.push_stored_piece(key, offset);
932                }
933            }
934            // There is free space in cache, need to find a free spot and place piece there
935            None => {
936                let Some(offset) = caches.pop_free_offset() else {
937                    warn!(
938                        %piece_index,
939                        "Should have inserted piece into cache, but it didn't happen, this is an \
940                        implementation bug"
941                    );
942                    return;
943                };
944                let cache_index = offset.cache_index;
945                let piece_offset = offset.piece_offset;
946                let Some(backend) = caches.get_backend(cache_index) else {
947                    // Cache backend not exist
948                    warn!(
949                        %cache_index,
950                        %piece_index,
951                        "Should have a cached backend, but it didn't exist, this is an \
952                        implementation bug"
953                    );
954                    return;
955                };
956
957                if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
958                    error!(
959                        %error,
960                        %cache_index,
961                        %piece_index,
962                        %piece_offset,
963                        "Failed to write piece into cache"
964                    );
965                } else {
966                    trace!(
967                        %cache_index,
968                        %piece_index,
969                        %piece_offset,
970                        "Successfully stored piece in cache"
971                    );
972                    if let Some(metrics) = &self.metrics {
973                        metrics.piece_cache_capacity_used.inc();
974                    }
975                    caches.push_stored_piece(key, offset);
976                }
977            }
978        };
979    }
980}
981
982#[derive(Debug)]
983struct PlotCaches {
984    /// Additional piece caches
985    caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
986    /// Next plot cache to use for storing pieces
987    next_plot_cache: AtomicUsize,
988}
989
990impl PlotCaches {
991    async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
992        for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
993            match cache.is_piece_maybe_stored(key).await {
994                Ok(MaybePieceStoredResult::No) => {
995                    // Try another one if there is any
996                }
997                Ok(MaybePieceStoredResult::Vacant) => {
998                    return true;
999                }
1000                Ok(MaybePieceStoredResult::Yes) => {
1001                    // Already stored, nothing else left to do
1002                    return false;
1003                }
1004                Err(error) => {
1005                    warn!(
1006                        %cache_index,
1007                        %piece_index,
1008                        %error,
1009                        "Failed to check piece stored in cache"
1010                    );
1011                }
1012            }
1013        }
1014
1015        false
1016    }
1017
1018    /// Store a piece in additional downloaded pieces, if there is space for them
1019    async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1020        let plot_caches = self.caches.read().await;
1021        let plot_caches_len = plot_caches.len();
1022
1023        // Store pieces in plots using round-robin distribution
1024        for _ in 0..plot_caches_len {
1025            let plot_cache_index =
1026                self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1027
1028            match plot_caches[plot_cache_index]
1029                .try_store_piece(piece_index, piece)
1030                .await
1031            {
1032                Ok(true) => {
1033                    return false;
1034                }
1035                Ok(false) => {
1036                    continue;
1037                }
1038                Err(error) => {
1039                    error!(
1040                        %error,
1041                        %piece_index,
1042                        %plot_cache_index,
1043                        "Failed to store additional piece in cache"
1044                    );
1045                    continue;
1046                }
1047            }
1048        }
1049
1050        false
1051    }
1052}
1053
1054/// Farmer cache that aggregates different kinds of caches of multiple disks.
1055///
1056/// Pieces in [`PieceCache`] are stored based on capacity and proximity of piece index to farmer's
1057/// network identity. If capacity is not enough to store all pieces in cache then pieces that are
1058/// further from network identity will be evicted, this is helpful for quick retrieval of pieces
1059/// from DSN as well as plotting purposes.
1060///
1061/// [`PlotCache`] is used as a supplementary cache and is primarily helpful for smaller farmers
1062/// where piece cache is not enough to store all the pieces on the network, while there is a lot of
1063/// space in the plot that is not used by sectors yet and can be leverage as extra caching space.
1064#[derive(Debug, Clone)]
1065pub struct FarmerCache {
1066    peer_id: PeerId,
1067    /// Individual dedicated piece caches
1068    piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1069    /// Additional piece caches
1070    plot_caches: Arc<PlotCaches>,
1071    handlers: Arc<Handlers>,
1072    // We do not want to increase capacity unnecessarily on clone
1073    worker_sender: mpsc::Sender<WorkerCommand>,
1074    metrics: Option<Arc<FarmerCacheMetrics>>,
1075}
1076
1077impl FarmerCache {
1078    /// Create new piece cache instance and corresponding worker.
1079    ///
1080    /// NOTE: Returned future is async, but does blocking operations and should be running in
1081    /// dedicated thread.
1082    pub fn new<NC>(
1083        node_client: NC,
1084        peer_id: PeerId,
1085        registry: Option<&mut Registry>,
1086    ) -> (Self, FarmerCacheWorker<NC>)
1087    where
1088        NC: NodeClient,
1089    {
1090        let caches = Arc::default();
1091        let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1092        let handlers = Arc::new(Handlers::default());
1093
1094        let plot_caches = Arc::new(PlotCaches {
1095            caches: AsyncRwLock::default(),
1096            next_plot_cache: AtomicUsize::new(0),
1097        });
1098        let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1099
1100        let instance = Self {
1101            peer_id,
1102            piece_caches: Arc::clone(&caches),
1103            plot_caches: Arc::clone(&plot_caches),
1104            handlers: Arc::clone(&handlers),
1105            worker_sender,
1106            metrics: metrics.clone(),
1107        };
1108        let worker = FarmerCacheWorker {
1109            peer_id,
1110            node_client,
1111            piece_caches: caches,
1112            plot_caches,
1113            handlers,
1114            worker_receiver: Some(worker_receiver),
1115            metrics,
1116        };
1117
1118        (instance, worker)
1119    }
1120
1121    /// Get piece from cache
1122    pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1123    where
1124        RecordKey: From<Key>,
1125    {
1126        let key = RecordKey::from(key);
1127        let maybe_piece_found = {
1128            let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1129            let caches = self.piece_caches.read().await;
1130
1131            caches.get_stored_piece(&key).and_then(|offset| {
1132                let cache_index = offset.cache_index;
1133                let piece_offset = offset.piece_offset;
1134                Some((
1135                    piece_offset,
1136                    cache_index,
1137                    caches.get_backend(cache_index)?.clone(),
1138                ))
1139            })
1140        };
1141
1142        if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1143            match backend.read_piece(piece_offset).await {
1144                Ok(maybe_piece) => {
1145                    return match maybe_piece {
1146                        Some((_piece_index, piece)) => {
1147                            if let Some(metrics) = &self.metrics {
1148                                metrics.cache_get_hit.inc();
1149                            }
1150                            Some(piece)
1151                        }
1152                        None => {
1153                            error!(
1154                                %cache_index,
1155                                %piece_offset,
1156                                ?key,
1157                                "Piece was expected to be in cache, but wasn't found there"
1158                            );
1159                            if let Some(metrics) = &self.metrics {
1160                                metrics.cache_get_error.inc();
1161                            }
1162                            None
1163                        }
1164                    };
1165                }
1166                Err(error) => {
1167                    error!(
1168                        %error,
1169                        %cache_index,
1170                        ?key,
1171                        %piece_offset,
1172                        "Error while reading piece from cache"
1173                    );
1174
1175                    if let Err(error) = self
1176                        .worker_sender
1177                        .clone()
1178                        .send(WorkerCommand::ForgetKey { key })
1179                        .await
1180                    {
1181                        trace!(%error, "Failed to send ForgetKey command to worker");
1182                    }
1183
1184                    if let Some(metrics) = &self.metrics {
1185                        metrics.cache_get_error.inc();
1186                    }
1187                    return None;
1188                }
1189            }
1190        }
1191
1192        for cache in self.plot_caches.caches.read().await.iter() {
1193            if let Ok(Some(piece)) = cache.read_piece(&key).await {
1194                if let Some(metrics) = &self.metrics {
1195                    metrics.cache_get_hit.inc();
1196                }
1197                return Some(piece);
1198            }
1199        }
1200
1201        if let Some(metrics) = &self.metrics {
1202            metrics.cache_get_miss.inc();
1203        }
1204        None
1205    }
1206
1207    /// Get pieces from cache.
1208    ///
1209    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
1210    pub async fn get_pieces<'a, PieceIndices>(
1211        &'a self,
1212        piece_indices: PieceIndices,
1213    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1214    where
1215        PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1216    {
1217        let mut pieces_to_get_from_plot_cache = Vec::new();
1218
1219        let pieces_to_read_from_piece_cache = {
1220            let caches = self.piece_caches.read().await;
1221            // Pieces to read from piece cache grouped by backend for efficiency reasons
1222            let mut pieces_to_read_from_piece_cache =
1223                HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1224
1225            for piece_index in piece_indices {
1226                let key = RecordKey::from(piece_index.to_multihash());
1227
1228                let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1229                    self.peer_id,
1230                    key.clone(),
1231                )) {
1232                    Some(offset) => offset,
1233                    None => {
1234                        pieces_to_get_from_plot_cache.push((piece_index, key));
1235                        continue;
1236                    }
1237                };
1238
1239                let cache_index = offset.cache_index;
1240                let piece_offset = offset.piece_offset;
1241
1242                match pieces_to_read_from_piece_cache.entry(cache_index) {
1243                    Entry::Occupied(mut entry) => {
1244                        let (_backend, pieces) = entry.get_mut();
1245                        pieces.insert(piece_offset, (piece_index, key));
1246                    }
1247                    Entry::Vacant(entry) => {
1248                        let backend = match caches.get_backend(cache_index) {
1249                            Some(backend) => backend.clone(),
1250                            None => {
1251                                pieces_to_get_from_plot_cache.push((piece_index, key));
1252                                continue;
1253                            }
1254                        };
1255                        entry
1256                            .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1257                    }
1258                }
1259            }
1260
1261            pieces_to_read_from_piece_cache
1262        };
1263
1264        let (tx, mut rx) = mpsc::unbounded();
1265
1266        let fut = async move {
1267            let tx = &tx;
1268
1269            let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1270                .into_iter()
1271                .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1272                    let mut pieces_stream = match backend
1273                        .read_pieces(Box::new(
1274                            pieces_to_get
1275                                .keys()
1276                                .copied()
1277                                .collect::<Vec<_>>()
1278                                .into_iter(),
1279                        ))
1280                        .await
1281                    {
1282                        Ok(pieces_stream) => pieces_stream,
1283                        Err(error) => {
1284                            error!(
1285                                %error,
1286                                %cache_index,
1287                                "Error while reading pieces from cache"
1288                            );
1289
1290                            if let Some(metrics) = &self.metrics {
1291                                metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1292                            }
1293                            for (piece_index, _key) in pieces_to_get.into_values() {
1294                                tx.unbounded_send((piece_index, None)).expect(
1295                                    "This future isn't polled after receiver is dropped; qed",
1296                                );
1297                            }
1298                            return;
1299                        }
1300                    };
1301
1302                    while let Some(maybe_piece) = pieces_stream.next().await {
1303                        let result = match maybe_piece {
1304                            Ok((piece_offset, Some((piece_index, piece)))) => {
1305                                pieces_to_get.remove(&piece_offset);
1306
1307                                if let Some(metrics) = &self.metrics {
1308                                    metrics.cache_get_hit.inc();
1309                                }
1310                                (piece_index, Some(piece))
1311                            }
1312                            Ok((piece_offset, None)) => {
1313                                let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1314                                else {
1315                                    debug!(
1316                                        %cache_index,
1317                                        %piece_offset,
1318                                        "Received piece offset that was not expected"
1319                                    );
1320                                    continue;
1321                                };
1322
1323                                error!(
1324                                    %cache_index,
1325                                    %piece_index,
1326                                    %piece_offset,
1327                                    ?key,
1328                                    "Piece was expected to be in cache, but wasn't found there"
1329                                );
1330                                if let Some(metrics) = &self.metrics {
1331                                    metrics.cache_get_error.inc();
1332                                }
1333                                (piece_index, None)
1334                            }
1335                            Err(error) => {
1336                                error!(
1337                                    %error,
1338                                    %cache_index,
1339                                    "Error while reading piece from cache"
1340                                );
1341
1342                                if let Some(metrics) = &self.metrics {
1343                                    metrics.cache_get_error.inc();
1344                                }
1345                                continue;
1346                            }
1347                        };
1348
1349                        tx.unbounded_send(result)
1350                            .expect("This future isn't polled after receiver is dropped; qed");
1351                    }
1352
1353                    if pieces_to_get.is_empty() {
1354                        return;
1355                    }
1356
1357                    if let Some(metrics) = &self.metrics {
1358                        metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1359                    }
1360                    for (piece_offset, (piece_index, key)) in pieces_to_get {
1361                        error!(
1362                            %cache_index,
1363                            %piece_index,
1364                            %piece_offset,
1365                            ?key,
1366                            "Piece cache didn't return an entry for offset"
1367                        );
1368
1369                        // Uphold invariant of the method that some result should be returned
1370                        // for every unique piece index
1371                        tx.unbounded_send((piece_index, None))
1372                            .expect("This future isn't polled after receiver is dropped; qed");
1373                    }
1374                })
1375                .collect::<FuturesUnordered<_>>();
1376            // TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650
1377            // Simply drain everything
1378            // .for_each(|()| async {})
1379
1380            // TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved
1381            let reading_from_piece_cache_fut = async move {
1382                while let Some(()) = reading_from_piece_cache.next().await {
1383                    // Simply drain everything
1384                }
1385            };
1386
1387            let reading_from_plot_cache_fut = async {
1388                if pieces_to_get_from_plot_cache.is_empty() {
1389                    return;
1390                }
1391
1392                for cache in self.plot_caches.caches.read().await.iter() {
1393                    // Iterating over offsets in reverse order to both traverse elements in async code
1394                    // and being able to efficiently remove entries without extra allocations
1395                    for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1396                        let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1397
1398                        if let Ok(Some(piece)) = cache.read_piece(key).await {
1399                            if let Some(metrics) = &self.metrics {
1400                                metrics.cache_get_hit.inc();
1401                            }
1402                            tx.unbounded_send((*piece_index, Some(piece)))
1403                                .expect("This future isn't polled after receiver is dropped; qed");
1404
1405                            // Due to iteration in reverse order and swapping using elements at the end,
1406                            // this doesn't affect processing of the elements
1407                            pieces_to_get_from_plot_cache.swap_remove(offset);
1408                        }
1409                    }
1410
1411                    if pieces_to_get_from_plot_cache.is_empty() {
1412                        return;
1413                    }
1414                }
1415
1416                if let Some(metrics) = &self.metrics {
1417                    metrics
1418                        .cache_get_miss
1419                        .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1420                }
1421
1422                for (piece_index, _key) in pieces_to_get_from_plot_cache {
1423                    tx.unbounded_send((piece_index, None))
1424                        .expect("This future isn't polled after receiver is dropped; qed");
1425                }
1426            };
1427
1428            join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1429        };
1430        let mut fut = Box::pin(fut.fuse());
1431
1432        // Drive above future and stream back any pieces that were downloaded so far
1433        stream::poll_fn(move |cx| {
1434            if !fut.is_terminated() {
1435                // Result doesn't matter, we'll need to poll stream below anyway
1436                let _ = fut.poll_unpin(cx);
1437            }
1438
1439            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1440                return Poll::Ready(maybe_result);
1441            }
1442
1443            // Exit will be done by the stream above
1444            Poll::Pending
1445        })
1446    }
1447
1448    /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
1449    pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1450        let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1451            piece_indices
1452                .iter()
1453                .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1454        );
1455
1456        // Quick check in piece caches
1457        {
1458            let piece_caches = self.piece_caches.read().await;
1459            pieces_to_find.retain(|_piece_index, key| {
1460                let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1461                !piece_caches.contains_stored_piece(&distance_key)
1462            });
1463        }
1464
1465        // Early exit if everything is cached
1466        if pieces_to_find.is_empty() {
1467            return piece_indices;
1468        }
1469
1470        // Check plot caches concurrently
1471        if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1472            let plot_caches = &plot_caches;
1473            let not_found = pieces_to_find
1474                .into_iter()
1475                .map(|(piece_index, key)| async move {
1476                    let key = &key;
1477
1478                    let found = plot_caches
1479                        .iter()
1480                        .map(|plot_cache| async {
1481                            matches!(
1482                                plot_cache.is_piece_maybe_stored(key).await,
1483                                Ok(MaybePieceStoredResult::Yes)
1484                            )
1485                        })
1486                        .collect::<FuturesUnordered<_>>()
1487                        .any(|found| async move { found })
1488                        .await;
1489
1490                    if found {
1491                        None
1492                    } else {
1493                        Some(piece_index)
1494                    }
1495                })
1496                .collect::<FuturesUnordered<_>>()
1497                .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1498                .collect::<HashSet<_>>()
1499                .await;
1500            piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1501        }
1502        piece_indices
1503    }
1504
1505    /// Find piece in cache and return its retrieval details
1506    pub async fn find_piece(
1507        &self,
1508        piece_index: PieceIndex,
1509    ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1510        let caches = self.piece_caches.read().await;
1511
1512        self.find_piece_internal(&caches, piece_index)
1513    }
1514
1515    /// Find pieces in cache and return their retrieval details
1516    pub async fn find_pieces<PieceIndices>(
1517        &self,
1518        piece_indices: PieceIndices,
1519    ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1520    where
1521        PieceIndices: IntoIterator<Item = PieceIndex>,
1522    {
1523        let caches = self.piece_caches.read().await;
1524
1525        piece_indices
1526            .into_iter()
1527            .filter_map(|piece_index| {
1528                self.find_piece_internal(&caches, piece_index)
1529                    .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1530            })
1531            .collect()
1532    }
1533
1534    fn find_piece_internal(
1535        &self,
1536        caches: &PieceCachesState,
1537        piece_index: PieceIndex,
1538    ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1539        let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1540
1541        let Some(offset) = caches.get_stored_piece(&key) else {
1542            if let Some(metrics) = &self.metrics {
1543                metrics.cache_find_miss.inc();
1544            }
1545
1546            return None;
1547        };
1548        let piece_offset = offset.piece_offset;
1549
1550        if let Some(backend) = caches.get_backend(offset.cache_index) {
1551            if let Some(metrics) = &self.metrics {
1552                metrics.cache_find_hit.inc();
1553            }
1554            return Some((*backend.id(), piece_offset));
1555        }
1556
1557        if let Some(metrics) = &self.metrics {
1558            metrics.cache_find_miss.inc();
1559        }
1560        None
1561    }
1562
1563    /// Try to store a piece in additional downloaded pieces, if there is space for them
1564    pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) {
1565        let key = RecordKey::from(piece_index.to_multihash());
1566
1567        let should_store = self.plot_caches.should_store(piece_index, &key).await;
1568
1569        if !should_store {
1570            return;
1571        }
1572
1573        self.plot_caches
1574            .store_additional_piece(piece_index, piece)
1575            .await;
1576    }
1577
1578    /// Initialize replacement of backing caches
1579    pub async fn replace_backing_caches(
1580        &self,
1581        new_piece_caches: Vec<Arc<dyn PieceCache>>,
1582        new_plot_caches: Vec<Arc<dyn PlotCache>>,
1583    ) {
1584        if let Err(error) = self
1585            .worker_sender
1586            .clone()
1587            .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1588            .await
1589        {
1590            warn!(%error, "Failed to replace backing caches, worker exited");
1591        }
1592
1593        *self.plot_caches.caches.write().await = new_plot_caches;
1594    }
1595
1596    /// Subscribe to cache sync notifications
1597    pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1598        self.handlers.progress.add(callback)
1599    }
1600}
1601
1602/// Collection of [`FarmerCache`] instances for load balancing
1603#[derive(Debug, Clone)]
1604pub struct FarmerCaches {
1605    caches: Arc<[FarmerCache]>,
1606}
1607
1608impl From<Arc<[FarmerCache]>> for FarmerCaches {
1609    fn from(caches: Arc<[FarmerCache]>) -> Self {
1610        Self { caches }
1611    }
1612}
1613
1614impl From<FarmerCache> for FarmerCaches {
1615    fn from(cache: FarmerCache) -> Self {
1616        Self {
1617            caches: Arc::new([cache]),
1618        }
1619    }
1620}
1621
1622impl FarmerCaches {
1623    /// Get piece from cache
1624    pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1625    where
1626        RecordKey: From<Key>,
1627    {
1628        let farmer_cache = self.caches.choose(&mut thread_rng())?;
1629        farmer_cache.get_piece(key).await
1630    }
1631
1632    /// Get pieces from cache.
1633    ///
1634    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
1635    pub async fn get_pieces<'a, PieceIndices>(
1636        &'a self,
1637        piece_indices: PieceIndices,
1638    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1639    where
1640        PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1641    {
1642        let Some(farmer_cache) = self.caches.choose(&mut thread_rng()) else {
1643            return Either::Left(stream::iter(
1644                piece_indices
1645                    .into_iter()
1646                    .map(|piece_index| (piece_index, None)),
1647            ));
1648        };
1649
1650        Either::Right(farmer_cache.get_pieces(piece_indices).await)
1651    }
1652
1653    /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
1654    pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1655        let Some(farmer_cache) = self.caches.choose(&mut thread_rng()) else {
1656            return Vec::new();
1657        };
1658
1659        farmer_cache.has_pieces(piece_indices).await
1660    }
1661
1662    /// Try to store a piece in additional downloaded pieces, if there is space for them
1663    pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) {
1664        self.caches
1665            .iter()
1666            .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1667            .collect::<FuturesUnordered<_>>()
1668            .for_each(|()| async {})
1669            .await;
1670    }
1671}
1672
1673/// Extracts the `PieceIndex` from a `RecordKey`.
1674fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1675    let len = key.as_ref().len();
1676    let s = len - PieceIndex::SIZE;
1677
1678    let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1679    piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1680
1681    PieceIndex::from_bytes(piece_index_bytes)
1682}