1mod metrics;
7mod piece_cache_state;
8#[cfg(test)]
9mod tests;
10
11use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
12use crate::farmer_cache::metrics::FarmerCacheMetrics;
13use crate::farmer_cache::piece_cache_state::PieceCachesState;
14use crate::node_client::NodeClient;
15use async_lock::RwLock as AsyncRwLock;
16use event_listener_primitives::{Bag, HandlerId};
17use futures::channel::mpsc;
18use futures::future::{Either, FusedFuture};
19use futures::stream::{FuturesOrdered, FuturesUnordered};
20use futures::{FutureExt, SinkExt, Stream, StreamExt, select, stream};
21use parking_lot::{Mutex, RwLock};
22use prometheus_client::registry::Registry;
23use rand::prelude::*;
24use rayon::prelude::*;
25use std::collections::hash_map::Entry;
26use std::collections::{HashMap, HashSet};
27use std::future::join;
28use std::sync::Arc;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::task::Poll;
31use std::time::Duration;
32use std::{fmt, mem};
33use subspace_core_primitives::pieces::{Piece, PieceIndex};
34use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
35use subspace_data_retrieval::piece_getter::PieceGetter;
36use subspace_networking::KeyWithDistance;
37use subspace_networking::libp2p::PeerId;
38use subspace_networking::libp2p::kad::RecordKey;
39use subspace_networking::utils::multihash::ToMultihash;
40use subspace_process::run_future_in_dedicated_thread;
41use tokio::sync::Semaphore;
42use tokio::task::yield_now;
43use tracing::{Instrument, debug, error, info, info_span, trace, warn};
44
45const WORKER_CHANNEL_CAPACITY: usize = 100;
46const SYNC_BATCH_SIZE: usize = 256;
47const SYNC_CONCURRENT_BATCHES: usize = 4;
48const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
51const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
52
53type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
54type Handler<A> = Bag<HandlerFn<A>, A>;
55type CacheIndex = u8;
56
57#[derive(Default, Debug)]
58struct Handlers {
59 progress: Handler<f32>,
60}
61
62#[derive(Debug, Clone, Copy)]
63struct FarmerCacheOffset {
64 cache_index: CacheIndex,
65 piece_offset: PieceCacheOffset,
66}
67
68impl FarmerCacheOffset {
69 fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
70 Self {
71 cache_index,
72 piece_offset,
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
78struct CacheBackend {
79 backend: Arc<dyn PieceCache>,
80 used_capacity: u32,
81 total_capacity: u32,
82}
83
84impl std::ops::Deref for CacheBackend {
85 type Target = Arc<dyn PieceCache>;
86
87 fn deref(&self) -> &Self::Target {
88 &self.backend
89 }
90}
91
92impl CacheBackend {
93 fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
94 Self {
95 backend,
96 used_capacity: 0,
97 total_capacity,
98 }
99 }
100
101 fn next_free(&mut self) -> Option<PieceCacheOffset> {
102 let offset = self.used_capacity;
103 if offset < self.total_capacity {
104 self.used_capacity += 1;
105 Some(PieceCacheOffset(offset))
106 } else {
107 debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
108 None
109 }
110 }
111
112 fn free_size(&self) -> u32 {
113 self.total_capacity - self.used_capacity
114 }
115}
116
117#[derive(Debug)]
118struct CacheState {
119 cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
120 cache_free_offsets: Vec<FarmerCacheOffset>,
121 backend: CacheBackend,
122}
123
124#[derive(Debug)]
125enum WorkerCommand {
126 ReplaceBackingCaches {
127 new_piece_caches: Vec<Arc<dyn PieceCache>>,
128 },
129 ForgetKey {
130 key: RecordKey,
131 },
132}
133
134#[derive(Debug)]
136#[must_use = "Farmer cache will not work unless its worker is running"]
137pub struct FarmerCacheWorker<NC>
138where
139 NC: fmt::Debug,
140{
141 peer_id: PeerId,
142 node_client: NC,
143 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
144 plot_caches: Arc<PlotCaches>,
145 handlers: Arc<Handlers>,
146 worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
147 metrics: Option<Arc<FarmerCacheMetrics>>,
148}
149
150impl<NC> FarmerCacheWorker<NC>
151where
152 NC: NodeClient,
153{
154 pub async fn run<PG>(mut self, piece_getter: PG)
158 where
159 PG: PieceGetter,
160 {
161 let mut last_segment_index_internal = SegmentIndex::ZERO;
163
164 let mut worker_receiver = self
165 .worker_receiver
166 .take()
167 .expect("Always set during worker instantiation");
168
169 if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
170 worker_receiver.next().await
171 {
172 self.initialize(
173 &piece_getter,
174 &mut last_segment_index_internal,
175 new_piece_caches,
176 )
177 .await;
178 } else {
179 return;
181 }
182
183 let mut segment_headers_notifications =
184 match self.node_client.subscribe_archived_segment_headers().await {
185 Ok(segment_headers_notifications) => segment_headers_notifications,
186 Err(error) => {
187 error!(%error, "Failed to subscribe to archived segments notifications");
188 return;
189 }
190 };
191
192 self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
196 .await;
197
198 loop {
199 select! {
200 maybe_command = worker_receiver.next() => {
201 let Some(command) = maybe_command else {
202 return;
204 };
205
206 self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
207 }
208 maybe_segment_header = segment_headers_notifications.next().fuse() => {
209 if let Some(segment_header) = maybe_segment_header {
210 self.process_segment_header(&piece_getter, segment_header, &mut last_segment_index_internal).await;
211 } else {
212 return;
215 }
216 }
217 }
218 }
219 }
220
221 async fn handle_command<PG>(
222 &self,
223 command: WorkerCommand,
224 piece_getter: &PG,
225 last_segment_index_internal: &mut SegmentIndex,
226 ) where
227 PG: PieceGetter,
228 {
229 match command {
230 WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
231 self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
232 .await;
233 }
234 WorkerCommand::ForgetKey { key } => {
236 let mut caches = self.piece_caches.write().await;
237 let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
238 let Some(offset) = caches.remove_stored_piece(&key) else {
239 return;
241 };
242
243 let cache_index = offset.cache_index;
244 let piece_offset = offset.piece_offset;
245 let Some(backend) = caches.get_backend(cache_index).cloned() else {
246 return;
248 };
249
250 caches.push_dangling_free_offset(offset);
251 match backend.read_piece_index(piece_offset).await {
252 Ok(Some(piece_index)) => {
253 trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
254 }
255 Ok(None) => {
256 warn!(
257 %cache_index,
258 %piece_offset,
259 "Piece index out of range, this is likely an implementation bug, \
260 not freeing heap element"
261 );
262 }
263 Err(error) => {
264 error!(
265 %error,
266 %cache_index,
267 ?key,
268 %piece_offset,
269 "Error while reading piece from cache"
270 );
271 }
272 }
273 }
274 }
275 }
276
277 async fn initialize<PG>(
278 &self,
279 piece_getter: &PG,
280 last_segment_index_internal: &mut SegmentIndex,
281 new_piece_caches: Vec<Arc<dyn PieceCache>>,
282 ) where
283 PG: PieceGetter,
284 {
285 info!("Initializing piece cache");
286
287 let (mut stored_pieces, mut dangling_free_offsets) =
289 mem::take(&mut *self.piece_caches.write().await).reuse();
290
291 debug!("Collecting pieces that were in the cache before");
292
293 if let Some(metrics) = &self.metrics {
294 metrics.piece_cache_capacity_total.set(0);
295 metrics.piece_cache_capacity_used.set(0);
296 }
297
298 let peer_id = self.peer_id;
299
300 let piece_caches_number = new_piece_caches.len();
302 let maybe_caches_futures = new_piece_caches
303 .into_iter()
304 .enumerate()
305 .filter_map(|(cache_index, new_cache)| {
306 let total_capacity = new_cache.max_num_elements();
307 let mut backend = CacheBackend::new(new_cache, total_capacity);
308 let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
309 warn!(
310 ?piece_caches_number,
311 "Too many piece caches provided, {cache_index} cache will be ignored",
312 );
313 return None;
314 };
315
316 if let Some(metrics) = &self.metrics {
317 metrics
318 .piece_cache_capacity_total
319 .inc_by(total_capacity as i64);
320 }
321
322 let init_fut = async move {
323 let used_capacity = &mut backend.used_capacity;
324
325 let mut maybe_contents = match backend.backend.contents().await {
329 Ok(contents) => Some(contents),
330 Err(error) => {
331 warn!(%error, "Failed to get cache contents");
332
333 None
334 }
335 };
336
337 #[allow(clippy::mutable_key_type)]
338 let mut cache_stored_pieces = HashMap::new();
339 let mut cache_free_offsets = Vec::new();
340
341 let Some(mut contents) = maybe_contents.take() else {
342 drop(maybe_contents);
343
344 return CacheState {
345 cache_stored_pieces,
346 cache_free_offsets,
347 backend,
348 };
349 };
350
351 while let Some(maybe_element_details) = contents.next().await {
352 let (piece_offset, maybe_piece_index) = match maybe_element_details {
353 Ok(element_details) => element_details,
354 Err(error) => {
355 warn!(%error, "Failed to get cache contents element details");
356 break;
357 }
358 };
359 let offset = FarmerCacheOffset::new(cache_index, piece_offset);
360 match maybe_piece_index {
361 Some(piece_index) => {
362 *used_capacity = piece_offset.0 + 1;
363 let record_key = RecordKey::from(piece_index.to_multihash());
364 let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
365 cache_stored_pieces.insert(key, offset);
366 }
367 None => {
368 cache_free_offsets.push(offset);
371 }
372 }
373
374 yield_now().await;
376 }
377
378 drop(maybe_contents);
379 drop(contents);
380
381 CacheState {
382 cache_stored_pieces,
383 cache_free_offsets,
384 backend,
385 }
386 };
387
388 Some(run_future_in_dedicated_thread(
389 move || init_fut.instrument(info_span!("", %cache_index)),
390 format!("piece-cache.{cache_index:02}"),
391 ))
392 })
393 .collect::<Result<Vec<_>, _>>();
394
395 let caches_futures = match maybe_caches_futures {
396 Ok(caches_futures) => caches_futures,
397 Err(error) => {
398 error!(%error, "Failed to spawn piece cache reading thread");
399
400 return;
401 }
402 };
403
404 let mut backends = Vec::with_capacity(caches_futures.len());
405 let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
406
407 while let Some(maybe_cache) = caches_futures.next().await {
408 match maybe_cache {
409 Ok(cache) => {
410 let backend = cache.backend;
411 for (key, cache_offset) in cache.cache_stored_pieces {
412 if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
413 dangling_free_offsets.push_front(old_cache_offset);
414 }
415 }
416 dangling_free_offsets.extend(
417 cache.cache_free_offsets.into_iter().filter(|free_offset| {
418 free_offset.piece_offset.0 < backend.used_capacity
419 }),
420 );
421 backends.push(backend);
422 }
423 Err(_cancelled) => {
424 error!("Piece cache reading thread panicked");
425
426 return;
427 }
428 };
429 }
430
431 let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
432
433 info!("Synchronizing piece cache");
434
435 let last_segment_index = loop {
436 match self.node_client.farmer_app_info().await {
437 Ok(farmer_app_info) => {
438 let last_segment_index =
439 farmer_app_info.protocol_info.history_size.segment_index();
440 if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
448 break last_segment_index;
449 }
450 }
451 Err(error) => {
452 error!(
453 %error,
454 "Failed to get farmer app info from node, keeping old cache state without \
455 updates"
456 );
457
458 *self.piece_caches.write().await = caches;
460 return;
461 }
462 }
463
464 tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
465 };
466
467 debug!(%last_segment_index, "Identified last segment index");
468
469 let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
471 let mut piece_indices_to_store = segment_indices
474 .into_par_iter()
475 .flat_map(|segment_index| {
476 segment_index
477 .segment_piece_indexes()
478 .into_par_iter()
479 .map(|piece_index| {
480 (
481 KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
482 piece_index,
483 )
484 })
485 })
486 .collect::<Vec<_>>();
487
488 piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
491
492 let mut piece_indices_to_store = piece_indices_to_store
494 .into_iter()
495 .take(caches.total_capacity())
496 .collect::<HashMap<_, _>>();
497
498 let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
499 caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
503
504 if let Some(metrics) = &self.metrics {
505 for offset in caches.stored_pieces_offsets() {
506 piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
507 }
508
509 for cache_used in piece_caches_capacity_used {
510 metrics
511 .piece_cache_capacity_used
512 .inc_by(i64::from(cache_used));
513 }
514 }
515
516 self.piece_caches.write().await.clone_from(&caches);
518 let stored_count = caches.stored_pieces_offsets().len();
519
520 debug!(
521 %stored_count,
522 count = %piece_indices_to_store.len(),
523 "Identified piece indices that should be cached",
524 );
525
526 let pieces_to_download_total = piece_indices_to_store.len() + stored_count;
527 let piece_indices_to_store = piece_indices_to_store
528 .into_values()
529 .collect::<Vec<_>>()
530 .chunks(SYNC_BATCH_SIZE)
534 .map(|chunk| chunk.to_vec())
535 .collect::<Vec<_>>();
536
537 let downloaded_pieces_count = AtomicUsize::new(stored_count);
538 let caches = Mutex::new(caches);
539 self.handlers.progress.call_simple(&0.0);
540 let batch_count = piece_indices_to_store.len();
541 let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
542
543 let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
544 let ignored_cache_indices = &RwLock::new(HashSet::new());
545
546 let downloading_pieces_stream =
547 stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
548 let downloaded_pieces_count = &downloaded_pieces_count;
549 let caches = &caches;
550 let num_pieces = piece_indices.len();
551
552 trace!(
553 %num_pieces,
554 %batch,
555 %batch_count,
556 first_piece_index = ?piece_indices.first().expect("chunks are never empty"),
557 last_piece_index = ?piece_indices.last().expect("chunks are never empty"),
558 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
559 %pieces_to_download_total,
560 available_permits = %downloading_semaphore.available_permits(),
561 "Started piece cache sync batch",
562 );
563
564 async move {
565 let mut permit = downloading_semaphore
566 .acquire_many(SYNC_BATCH_SIZE as u32)
567 .await
568 .expect("Semaphore is never closed; qed");
569 debug!(%batch, %num_pieces, "Downloading pieces");
570
571 let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
572 Ok(pieces_stream) => pieces_stream,
573 Err(error) => {
574 error!(
575 %error,
576 "Failed to get pieces from piece getter"
577 );
578 return;
579 }
580 };
581 let mut pieces_stream = pieces_stream.enumerate();
582
583 while let Some((index, (piece_index, result))) = pieces_stream.next().await {
584 debug!(%batch, %index, %piece_index, "Downloaded piece");
585 let _permit = permit.split(1);
587
588 let piece = match result {
589 Ok(Some(piece)) => {
590 trace!(%batch, %piece_index, "Downloaded piece successfully");
591 piece
592 }
593 Ok(None) => {
594 debug!(%batch, %piece_index, "Couldn't find piece");
595 continue;
596 }
597 Err(error) => {
598 debug!(
599 %batch,
600 %error,
601 %piece_index,
602 "Failed to get piece for piece cache"
603 );
604 continue;
605 }
606 };
607
608 let (offset, maybe_backend) = {
609 let mut caches = caches.lock();
610
611 let Some(offset) = caches.pop_free_offset() else {
613 error!(
614 %batch,
615 %piece_index,
616 "Failed to store piece in cache, there was no space"
617 );
618 break;
619 };
620
621 (offset, caches.get_backend(offset.cache_index).cloned())
622 };
623
624 let cache_index = offset.cache_index;
625 let piece_offset = offset.piece_offset;
626
627 let skip_write = ignored_cache_indices.read().contains(&cache_index);
628 if skip_write {
629 trace!(
630 %batch,
631 %cache_index,
632 %piece_index,
633 %piece_offset,
634 "Skipping known problematic cache index"
635 );
636 } else {
637 if let Some(backend) = maybe_backend
638 && let Err(error) =
639 backend.write_piece(piece_offset, piece_index, &piece).await
640 {
641 error!(
642 %error,
643 %batch,
644 %cache_index,
645 %piece_index,
646 %piece_offset,
647 "Failed to write piece into cache, ignoring this cache going \
648 forward"
649 );
650 ignored_cache_indices.write().insert(cache_index);
651 continue;
652 }
653
654 let key =
655 KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
656 caches.lock().push_stored_piece(key, offset);
657 }
658
659 let prev_downloaded_pieces_count =
660 downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
661 if prev_downloaded_pieces_count != pieces_to_download_total {
664 let progress = prev_downloaded_pieces_count as f32
665 / pieces_to_download_total as f32
666 * 100.0;
667 if prev_downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL
668 == 0
669 {
670 let mut piece_caches = self.piece_caches.write().await;
671 piece_caches.clone_from(&caches.lock());
672
673 info!(
674 "Piece cache sync {progress:.2}% complete ({} / {})",
675 bytesize::to_string(
676 (prev_downloaded_pieces_count * Piece::SIZE) as u64,
677 true,
678 ),
679 bytesize::to_string(
680 (pieces_to_download_total * Piece::SIZE) as u64,
681 true,
682 ),
683 );
684 }
685
686 self.handlers.progress.call_simple(&progress);
687 }
688 }
689
690 trace!(
691 %num_pieces,
692 %batch,
693 %batch_count,
694 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
695 %pieces_to_download_total,
696 available_permits = %downloading_semaphore.available_permits(),
697 "Finished piece cache sync batch",
698 );
699 }
700 }));
701
702 downloading_pieces_stream
705 .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
708 .for_each(|()| async {})
710 .await;
711
712 *self.piece_caches.write().await = caches.into_inner();
713 self.handlers.progress.call_simple(&100.0);
714 *last_segment_index_internal = last_segment_index;
715
716 info!("Finished piece cache synchronization");
717 }
718
719 async fn process_segment_header<PG>(
720 &self,
721 piece_getter: &PG,
722 segment_header: SegmentHeader,
723 last_segment_index_internal: &mut SegmentIndex,
724 ) where
725 PG: PieceGetter,
726 {
727 let segment_index = segment_header.segment_index();
728 debug!(%segment_index, "Starting to process newly archived segment");
729
730 if *last_segment_index_internal < segment_index {
731 debug!(%segment_index, "Downloading potentially useful pieces");
732
733 let pieces_to_maybe_include = segment_index
737 .segment_piece_indexes()
738 .into_iter()
739 .map(|piece_index| async move {
740 let should_store_in_piece_cache = self
741 .piece_caches
742 .read()
743 .await
744 .should_include_key(self.peer_id, piece_index);
745
746 let key = RecordKey::from(piece_index.to_multihash());
747 let should_store_in_plot_cache =
748 self.plot_caches.should_store(piece_index, &key).await;
749
750 if !(should_store_in_piece_cache || should_store_in_plot_cache) {
751 trace!(%piece_index, "Piece doesn't need to be cached #1");
752
753 return None;
754 }
755
756 let maybe_piece_result =
757 self.node_client
758 .piece(piece_index)
759 .await
760 .inspect_err(|error| {
761 debug!(
762 %error,
763 %segment_index,
764 %piece_index,
765 "Failed to retrieve piece from node right after archiving"
766 );
767 });
768
769 if let Ok(Some(piece)) = maybe_piece_result {
770 return Some((piece_index, piece));
771 }
772
773 match piece_getter.get_piece(piece_index).await {
774 Ok(Some(piece)) => Some((piece_index, piece)),
775 Ok(None) => {
776 warn!(
777 %segment_index,
778 %piece_index,
779 "Failed to retrieve piece right after archiving"
780 );
781
782 None
783 }
784 Err(error) => {
785 warn!(
786 %error,
787 %segment_index,
788 %piece_index,
789 "Failed to retrieve piece right after archiving"
790 );
791
792 None
793 }
794 }
795 })
796 .collect::<FuturesUnordered<_>>()
797 .filter_map(|maybe_piece| async move { maybe_piece })
798 .collect::<Vec<_>>()
799 .await;
800
801 debug!(%segment_index, "Downloaded potentially useful pieces");
802
803 self.acknowledge_archived_segment_processing(segment_index)
804 .await;
805
806 for (piece_index, piece) in pieces_to_maybe_include {
809 if !self
810 .plot_caches
811 .store_additional_piece(piece_index, &piece)
812 .await
813 {
814 trace!(%piece_index, "Piece could not be cached in plot cache");
815 }
816
817 if !self
818 .piece_caches
819 .read()
820 .await
821 .should_include_key(self.peer_id, piece_index)
822 {
823 trace!(%piece_index, "Piece doesn't need to be cached #2");
824
825 continue;
826 }
827
828 trace!(%piece_index, "Piece needs to be cached #1");
829
830 self.persist_piece_in_cache(piece_index, piece).await;
831 }
832
833 *last_segment_index_internal = segment_index;
834 } else {
835 self.acknowledge_archived_segment_processing(segment_index)
836 .await;
837 }
838
839 debug!(%segment_index, "Finished processing newly archived segment");
840 }
841
842 async fn acknowledge_archived_segment_processing(&self, segment_index: SegmentIndex) {
843 match self
844 .node_client
845 .acknowledge_archived_segment_header(segment_index)
846 .await
847 {
848 Ok(()) => {
849 debug!(%segment_index, "Acknowledged archived segment");
850 }
851 Err(error) => {
852 error!(%segment_index, ?error, "Failed to acknowledge archived segment");
853 }
854 };
855 }
856
857 async fn keep_up_after_initial_sync<PG>(
858 &self,
859 piece_getter: &PG,
860 last_segment_index_internal: &mut SegmentIndex,
861 ) where
862 PG: PieceGetter,
863 {
864 let last_segment_index = match self.node_client.farmer_app_info().await {
865 Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
866 Err(error) => {
867 error!(
868 %error,
869 "Failed to get farmer app info from node, keeping old cache state without \
870 updates"
871 );
872 return;
873 }
874 };
875
876 if last_segment_index <= *last_segment_index_internal {
877 return;
878 }
879
880 info!(
881 "Syncing piece cache to the latest history size, this may pause block production if \
882 takes too long"
883 );
884
885 let piece_indices = (*last_segment_index_internal..=last_segment_index)
887 .flat_map(|segment_index| segment_index.segment_piece_indexes());
888
889 for piece_index in piece_indices {
891 if !self
892 .piece_caches
893 .read()
894 .await
895 .should_include_key(self.peer_id, piece_index)
896 {
897 trace!(%piece_index, "Piece doesn't need to be cached #3");
898
899 continue;
900 }
901
902 trace!(%piece_index, "Piece needs to be cached #2");
903
904 let result = piece_getter.get_piece(piece_index).await;
905
906 let piece = match result {
907 Ok(Some(piece)) => piece,
908 Ok(None) => {
909 debug!(%piece_index, "Couldn't find piece");
910 continue;
911 }
912 Err(error) => {
913 debug!(
914 %error,
915 %piece_index,
916 "Failed to get piece for piece cache"
917 );
918 continue;
919 }
920 };
921
922 self.persist_piece_in_cache(piece_index, piece).await;
923 }
924
925 info!("Finished syncing piece cache to the latest history size");
926
927 *last_segment_index_internal = last_segment_index;
928 }
929
930 async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
933 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
934 let mut caches = self.piece_caches.write().await;
935 match caches.should_replace(&key) {
936 Some((old_key, offset)) => {
938 let cache_index = offset.cache_index;
939 let piece_offset = offset.piece_offset;
940 let Some(backend) = caches.get_backend(cache_index) else {
941 warn!(
943 %cache_index,
944 %piece_index,
945 "Should have a cached backend, but it didn't exist, this is an \
946 implementation bug"
947 );
948 return;
949 };
950 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
951 error!(
952 %error,
953 %cache_index,
954 %piece_index,
955 %piece_offset,
956 "Failed to write piece into cache"
957 );
958 } else {
959 let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
960 trace!(
961 %cache_index,
962 %old_piece_index,
963 %piece_index,
964 %piece_offset,
965 "Successfully replaced old cached piece"
966 );
967 caches.push_stored_piece(key, offset);
968 }
969 }
970 None => {
972 let Some(offset) = caches.pop_free_offset() else {
973 warn!(
974 %piece_index,
975 "Should have inserted piece into cache, but it didn't happen, this is an \
976 implementation bug"
977 );
978 return;
979 };
980 let cache_index = offset.cache_index;
981 let piece_offset = offset.piece_offset;
982 let Some(backend) = caches.get_backend(cache_index) else {
983 warn!(
985 %cache_index,
986 %piece_index,
987 "Should have a cached backend, but it didn't exist, this is an \
988 implementation bug"
989 );
990 return;
991 };
992
993 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
994 error!(
995 %error,
996 %cache_index,
997 %piece_index,
998 %piece_offset,
999 "Failed to write piece into cache"
1000 );
1001 } else {
1002 trace!(
1003 %cache_index,
1004 %piece_index,
1005 %piece_offset,
1006 "Successfully stored piece in cache"
1007 );
1008 if let Some(metrics) = &self.metrics {
1009 metrics.piece_cache_capacity_used.inc();
1010 }
1011 caches.push_stored_piece(key, offset);
1012 }
1013 }
1014 };
1015 }
1016}
1017
1018#[derive(Debug)]
1019struct PlotCaches {
1020 caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
1022 next_plot_cache: AtomicUsize,
1024}
1025
1026impl PlotCaches {
1027 async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
1033 for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
1034 match cache.is_piece_maybe_stored(key).await {
1035 Ok(MaybePieceStoredResult::No) => {
1036 }
1038 Ok(MaybePieceStoredResult::Vacant) => {
1039 return true;
1040 }
1041 Ok(MaybePieceStoredResult::Yes) => {
1042 return false;
1044 }
1045 Err(error) => {
1046 warn!(
1047 %cache_index,
1048 %piece_index,
1049 %error,
1050 "Failed to check piece stored in cache"
1051 );
1052 }
1053 }
1054 }
1055
1056 false
1057 }
1058
1059 async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1063 let plot_caches = self.caches.read().await;
1064 let plot_caches_len = plot_caches.len();
1065
1066 for _ in 0..plot_caches_len {
1068 let plot_cache_index =
1069 self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1070
1071 match plot_caches[plot_cache_index]
1072 .try_store_piece(piece_index, piece)
1073 .await
1074 {
1075 Ok(true) => {
1076 return true;
1077 }
1078 Ok(false) => {
1079 continue;
1080 }
1081 Err(error) => {
1082 error!(
1083 %error,
1084 %piece_index,
1085 %plot_cache_index,
1086 "Failed to store additional piece in cache"
1087 );
1088 continue;
1089 }
1090 }
1091 }
1092
1093 false
1094 }
1095}
1096
1097#[derive(Debug, Clone)]
1108pub struct FarmerCache {
1109 peer_id: PeerId,
1110 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1112 plot_caches: Arc<PlotCaches>,
1114 handlers: Arc<Handlers>,
1115 worker_sender: mpsc::Sender<WorkerCommand>,
1117 metrics: Option<Arc<FarmerCacheMetrics>>,
1118}
1119
1120impl FarmerCache {
1121 pub fn new<NC>(
1126 node_client: NC,
1127 peer_id: PeerId,
1128 registry: Option<&mut Registry>,
1129 ) -> (Self, FarmerCacheWorker<NC>)
1130 where
1131 NC: NodeClient,
1132 {
1133 let caches = Arc::default();
1134 let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1135 let handlers = Arc::new(Handlers::default());
1136
1137 let plot_caches = Arc::new(PlotCaches {
1138 caches: AsyncRwLock::default(),
1139 next_plot_cache: AtomicUsize::new(0),
1140 });
1141 let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1142
1143 let instance = Self {
1144 peer_id,
1145 piece_caches: Arc::clone(&caches),
1146 plot_caches: Arc::clone(&plot_caches),
1147 handlers: Arc::clone(&handlers),
1148 worker_sender,
1149 metrics: metrics.clone(),
1150 };
1151 let worker = FarmerCacheWorker {
1152 peer_id,
1153 node_client,
1154 piece_caches: caches,
1155 plot_caches,
1156 handlers,
1157 worker_receiver: Some(worker_receiver),
1158 metrics,
1159 };
1160
1161 (instance, worker)
1162 }
1163
1164 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1166 where
1167 RecordKey: From<Key>,
1168 {
1169 let key = RecordKey::from(key);
1170 let maybe_piece_found = {
1171 let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1172 let caches = self.piece_caches.read().await;
1173
1174 caches.get_stored_piece(&key).and_then(|offset| {
1175 let cache_index = offset.cache_index;
1176 let piece_offset = offset.piece_offset;
1177 Some((
1178 piece_offset,
1179 cache_index,
1180 caches.get_backend(cache_index)?.clone(),
1181 ))
1182 })
1183 };
1184
1185 if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1186 match backend.read_piece(piece_offset).await {
1187 Ok(maybe_piece) => {
1188 return match maybe_piece {
1189 Some((_piece_index, piece)) => {
1190 if let Some(metrics) = &self.metrics {
1191 metrics.cache_get_hit.inc();
1192 }
1193 Some(piece)
1194 }
1195 None => {
1196 error!(
1197 %cache_index,
1198 %piece_offset,
1199 ?key,
1200 "Piece was expected to be in cache, but wasn't found there"
1201 );
1202 if let Some(metrics) = &self.metrics {
1203 metrics.cache_get_error.inc();
1204 }
1205 None
1206 }
1207 };
1208 }
1209 Err(error) => {
1210 error!(
1211 %error,
1212 %cache_index,
1213 ?key,
1214 %piece_offset,
1215 "Error while reading piece from cache"
1216 );
1217
1218 if let Err(error) = self
1219 .worker_sender
1220 .clone()
1221 .send(WorkerCommand::ForgetKey { key })
1222 .await
1223 {
1224 trace!(%error, "Failed to send ForgetKey command to worker");
1225 }
1226
1227 if let Some(metrics) = &self.metrics {
1228 metrics.cache_get_error.inc();
1229 }
1230 return None;
1231 }
1232 }
1233 }
1234
1235 for cache in self.plot_caches.caches.read().await.iter() {
1236 if let Ok(Some(piece)) = cache.read_piece(&key).await {
1237 if let Some(metrics) = &self.metrics {
1238 metrics.cache_get_hit.inc();
1239 }
1240 return Some(piece);
1241 }
1242 }
1243
1244 if let Some(metrics) = &self.metrics {
1245 metrics.cache_get_miss.inc();
1246 }
1247 None
1248 }
1249
1250 pub async fn get_pieces<'a, PieceIndices>(
1254 &'a self,
1255 piece_indices: PieceIndices,
1256 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1257 where
1258 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1259 {
1260 let mut pieces_to_get_from_plot_cache = Vec::new();
1261
1262 let pieces_to_read_from_piece_cache = {
1263 let caches = self.piece_caches.read().await;
1264 let mut pieces_to_read_from_piece_cache =
1266 HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1267
1268 for piece_index in piece_indices {
1269 let key = RecordKey::from(piece_index.to_multihash());
1270
1271 let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1272 self.peer_id,
1273 key.clone(),
1274 )) {
1275 Some(offset) => offset,
1276 None => {
1277 pieces_to_get_from_plot_cache.push((piece_index, key));
1278 continue;
1279 }
1280 };
1281
1282 let cache_index = offset.cache_index;
1283 let piece_offset = offset.piece_offset;
1284
1285 match pieces_to_read_from_piece_cache.entry(cache_index) {
1286 Entry::Occupied(mut entry) => {
1287 let (_backend, pieces) = entry.get_mut();
1288 pieces.insert(piece_offset, (piece_index, key));
1289 }
1290 Entry::Vacant(entry) => {
1291 let backend = match caches.get_backend(cache_index) {
1292 Some(backend) => backend.clone(),
1293 None => {
1294 pieces_to_get_from_plot_cache.push((piece_index, key));
1295 continue;
1296 }
1297 };
1298 entry
1299 .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1300 }
1301 }
1302 }
1303
1304 pieces_to_read_from_piece_cache
1305 };
1306
1307 let (tx, mut rx) = mpsc::unbounded();
1308
1309 let fut = async move {
1310 let tx = &tx;
1311
1312 let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1313 .into_iter()
1314 .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1315 let mut pieces_stream = match backend
1316 .read_pieces(Box::new(
1317 pieces_to_get
1318 .keys()
1319 .copied()
1320 .collect::<Vec<_>>()
1321 .into_iter(),
1322 ))
1323 .await
1324 {
1325 Ok(pieces_stream) => pieces_stream,
1326 Err(error) => {
1327 error!(
1328 %error,
1329 %cache_index,
1330 "Error while reading pieces from cache"
1331 );
1332
1333 if let Some(metrics) = &self.metrics {
1334 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1335 }
1336 for (piece_index, _key) in pieces_to_get.into_values() {
1337 tx.unbounded_send((piece_index, None)).expect(
1338 "This future isn't polled after receiver is dropped; qed",
1339 );
1340 }
1341 return;
1342 }
1343 };
1344
1345 while let Some(maybe_piece) = pieces_stream.next().await {
1346 let result = match maybe_piece {
1347 Ok((piece_offset, Some((piece_index, piece)))) => {
1348 pieces_to_get.remove(&piece_offset);
1349
1350 if let Some(metrics) = &self.metrics {
1351 metrics.cache_get_hit.inc();
1352 }
1353 (piece_index, Some(piece))
1354 }
1355 Ok((piece_offset, None)) => {
1356 let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1357 else {
1358 debug!(
1359 %cache_index,
1360 %piece_offset,
1361 "Received piece offset that was not expected"
1362 );
1363 continue;
1364 };
1365
1366 error!(
1367 %cache_index,
1368 %piece_index,
1369 %piece_offset,
1370 ?key,
1371 "Piece was expected to be in cache, but wasn't found there"
1372 );
1373 if let Some(metrics) = &self.metrics {
1374 metrics.cache_get_error.inc();
1375 }
1376 (piece_index, None)
1377 }
1378 Err(error) => {
1379 error!(
1380 %error,
1381 %cache_index,
1382 "Error while reading piece from cache"
1383 );
1384
1385 if let Some(metrics) = &self.metrics {
1386 metrics.cache_get_error.inc();
1387 }
1388 continue;
1389 }
1390 };
1391
1392 tx.unbounded_send(result)
1393 .expect("This future isn't polled after receiver is dropped; qed");
1394 }
1395
1396 if pieces_to_get.is_empty() {
1397 return;
1398 }
1399
1400 if let Some(metrics) = &self.metrics {
1401 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1402 }
1403 for (piece_offset, (piece_index, key)) in pieces_to_get {
1404 error!(
1405 %cache_index,
1406 %piece_index,
1407 %piece_offset,
1408 ?key,
1409 "Piece cache didn't return an entry for offset"
1410 );
1411
1412 tx.unbounded_send((piece_index, None))
1415 .expect("This future isn't polled after receiver is dropped; qed");
1416 }
1417 })
1418 .collect::<FuturesUnordered<_>>();
1419 let reading_from_piece_cache_fut = async move {
1425 while let Some(()) = reading_from_piece_cache.next().await {
1426 }
1428 };
1429
1430 let reading_from_plot_cache_fut = async {
1431 if pieces_to_get_from_plot_cache.is_empty() {
1432 return;
1433 }
1434
1435 for cache in self.plot_caches.caches.read().await.iter() {
1436 for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1439 let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1440
1441 if let Ok(Some(piece)) = cache.read_piece(key).await {
1442 if let Some(metrics) = &self.metrics {
1443 metrics.cache_get_hit.inc();
1444 }
1445 tx.unbounded_send((*piece_index, Some(piece)))
1446 .expect("This future isn't polled after receiver is dropped; qed");
1447
1448 pieces_to_get_from_plot_cache.swap_remove(offset);
1451 }
1452 }
1453
1454 if pieces_to_get_from_plot_cache.is_empty() {
1455 return;
1456 }
1457 }
1458
1459 if let Some(metrics) = &self.metrics {
1460 metrics
1461 .cache_get_miss
1462 .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1463 }
1464
1465 for (piece_index, _key) in pieces_to_get_from_plot_cache {
1466 tx.unbounded_send((piece_index, None))
1467 .expect("This future isn't polled after receiver is dropped; qed");
1468 }
1469 };
1470
1471 join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1472 };
1473 let mut fut = Box::pin(fut.fuse());
1474
1475 stream::poll_fn(move |cx| {
1477 if !fut.is_terminated() {
1478 let _ = fut.poll_unpin(cx);
1480 }
1481
1482 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1483 return Poll::Ready(maybe_result);
1484 }
1485
1486 Poll::Pending
1488 })
1489 }
1490
1491 pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1493 let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1494 piece_indices
1495 .iter()
1496 .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1497 );
1498
1499 {
1501 let piece_caches = self.piece_caches.read().await;
1502 pieces_to_find.retain(|_piece_index, key| {
1503 let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1504 !piece_caches.contains_stored_piece(&distance_key)
1505 });
1506 }
1507
1508 if pieces_to_find.is_empty() {
1510 return piece_indices;
1511 }
1512
1513 if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1515 let plot_caches = &plot_caches;
1516 let not_found = pieces_to_find
1517 .into_iter()
1518 .map(|(piece_index, key)| async move {
1519 let key = &key;
1520
1521 let found = plot_caches
1522 .iter()
1523 .map(|plot_cache| async {
1524 matches!(
1525 plot_cache.is_piece_maybe_stored(key).await,
1526 Ok(MaybePieceStoredResult::Yes)
1527 )
1528 })
1529 .collect::<FuturesUnordered<_>>()
1530 .any(|found| async move { found })
1531 .await;
1532
1533 if found { None } else { Some(piece_index) }
1534 })
1535 .collect::<FuturesUnordered<_>>()
1536 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1537 .collect::<HashSet<_>>()
1538 .await;
1539 piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1540 }
1541 piece_indices
1542 }
1543
1544 pub async fn find_piece(
1546 &self,
1547 piece_index: PieceIndex,
1548 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1549 let caches = self.piece_caches.read().await;
1550
1551 self.find_piece_internal(&caches, piece_index)
1552 }
1553
1554 pub async fn find_pieces<PieceIndices>(
1556 &self,
1557 piece_indices: PieceIndices,
1558 ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1559 where
1560 PieceIndices: IntoIterator<Item = PieceIndex>,
1561 {
1562 let caches = self.piece_caches.read().await;
1563
1564 piece_indices
1565 .into_iter()
1566 .filter_map(|piece_index| {
1567 self.find_piece_internal(&caches, piece_index)
1568 .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1569 })
1570 .collect()
1571 }
1572
1573 fn find_piece_internal(
1574 &self,
1575 caches: &PieceCachesState,
1576 piece_index: PieceIndex,
1577 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1578 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1579
1580 let Some(offset) = caches.get_stored_piece(&key) else {
1581 if let Some(metrics) = &self.metrics {
1582 metrics.cache_find_miss.inc();
1583 }
1584
1585 return None;
1586 };
1587 let piece_offset = offset.piece_offset;
1588
1589 if let Some(backend) = caches.get_backend(offset.cache_index) {
1590 if let Some(metrics) = &self.metrics {
1591 metrics.cache_find_hit.inc();
1592 }
1593 return Some((*backend.id(), piece_offset));
1594 }
1595
1596 if let Some(metrics) = &self.metrics {
1597 metrics.cache_find_miss.inc();
1598 }
1599 None
1600 }
1601
1602 pub async fn maybe_store_additional_piece(
1606 &self,
1607 piece_index: PieceIndex,
1608 piece: &Piece,
1609 ) -> bool {
1610 let key = RecordKey::from(piece_index.to_multihash());
1611
1612 let should_store = self.plot_caches.should_store(piece_index, &key).await;
1613
1614 if !should_store {
1615 return false;
1616 }
1617
1618 self.plot_caches
1619 .store_additional_piece(piece_index, piece)
1620 .await
1621 }
1622
1623 pub async fn replace_backing_caches(
1625 &self,
1626 new_piece_caches: Vec<Arc<dyn PieceCache>>,
1627 new_plot_caches: Vec<Arc<dyn PlotCache>>,
1628 ) {
1629 if let Err(error) = self
1630 .worker_sender
1631 .clone()
1632 .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1633 .await
1634 {
1635 warn!(%error, "Failed to replace backing caches, worker exited");
1636 }
1637
1638 *self.plot_caches.caches.write().await = new_plot_caches;
1639 }
1640
1641 pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1643 self.handlers.progress.add(callback)
1644 }
1645}
1646
1647#[derive(Debug, Clone)]
1649pub struct FarmerCaches {
1650 caches: Arc<[FarmerCache]>,
1651}
1652
1653impl From<Arc<[FarmerCache]>> for FarmerCaches {
1654 fn from(caches: Arc<[FarmerCache]>) -> Self {
1655 Self { caches }
1656 }
1657}
1658
1659impl From<FarmerCache> for FarmerCaches {
1660 fn from(cache: FarmerCache) -> Self {
1661 Self {
1662 caches: Arc::new([cache]),
1663 }
1664 }
1665}
1666
1667impl FarmerCaches {
1668 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1670 where
1671 RecordKey: From<Key>,
1672 {
1673 let farmer_cache = self.caches.choose(&mut thread_rng())?;
1674 farmer_cache.get_piece(key).await
1675 }
1676
1677 pub async fn get_pieces<'a, PieceIndices>(
1681 &'a self,
1682 piece_indices: PieceIndices,
1683 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1684 where
1685 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1686 {
1687 let Some(farmer_cache) = self.caches.choose(&mut thread_rng()) else {
1688 return Either::Left(stream::iter(
1689 piece_indices
1690 .into_iter()
1691 .map(|piece_index| (piece_index, None)),
1692 ));
1693 };
1694
1695 Either::Right(farmer_cache.get_pieces(piece_indices).await)
1696 }
1697
1698 pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1700 let Some(farmer_cache) = self.caches.choose(&mut thread_rng()) else {
1701 return Vec::new();
1702 };
1703
1704 farmer_cache.has_pieces(piece_indices).await
1705 }
1706
1707 pub async fn maybe_store_additional_piece(
1711 &self,
1712 piece_index: PieceIndex,
1713 piece: &Piece,
1714 ) -> bool {
1715 self.caches
1717 .iter()
1718 .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1719 .collect::<FuturesUnordered<_>>()
1720 .fold::<bool, _, _>(false, |acc, stored| async move { acc || stored })
1721 .await
1722 }
1723}
1724
1725fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1727 let len = key.as_ref().len();
1728 let s = len - PieceIndex::SIZE;
1729
1730 let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1731 piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1732
1733 PieceIndex::from_bytes(piece_index_bytes)
1734}