1mod metrics;
7mod piece_cache_state;
8#[cfg(test)]
9mod tests;
10
11use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
12use crate::farmer_cache::metrics::FarmerCacheMetrics;
13use crate::farmer_cache::piece_cache_state::PieceCachesState;
14use crate::node_client::NodeClient;
15use crate::utils::run_future_in_dedicated_thread;
16use async_lock::RwLock as AsyncRwLock;
17use event_listener_primitives::{Bag, HandlerId};
18use futures::channel::mpsc;
19use futures::future::{Either, FusedFuture};
20use futures::stream::{FuturesOrdered, FuturesUnordered};
21use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt};
22use parking_lot::{Mutex, RwLock};
23use prometheus_client::registry::Registry;
24use rand::prelude::*;
25use rayon::prelude::*;
26use std::collections::hash_map::Entry;
27use std::collections::{HashMap, HashSet};
28use std::future::join;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::Arc;
31use std::task::Poll;
32use std::time::Duration;
33use std::{fmt, mem};
34use subspace_core_primitives::pieces::{Piece, PieceIndex};
35use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
36use subspace_data_retrieval::piece_getter::PieceGetter;
37use subspace_networking::libp2p::kad::RecordKey;
38use subspace_networking::libp2p::PeerId;
39use subspace_networking::utils::multihash::ToMultihash;
40use subspace_networking::KeyWithDistance;
41use tokio::sync::Semaphore;
42use tokio::task::yield_now;
43use tracing::{debug, error, info, info_span, trace, warn, Instrument};
44
45const WORKER_CHANNEL_CAPACITY: usize = 100;
46const SYNC_BATCH_SIZE: usize = 256;
47const SYNC_CONCURRENT_BATCHES: usize = 4;
48const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
51const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
52
53type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
54type Handler<A> = Bag<HandlerFn<A>, A>;
55type CacheIndex = u8;
56
57#[derive(Default, Debug)]
58struct Handlers {
59 progress: Handler<f32>,
60}
61
62#[derive(Debug, Clone, Copy)]
63struct FarmerCacheOffset {
64 cache_index: CacheIndex,
65 piece_offset: PieceCacheOffset,
66}
67
68impl FarmerCacheOffset {
69 fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
70 Self {
71 cache_index,
72 piece_offset,
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
78struct CacheBackend {
79 backend: Arc<dyn PieceCache>,
80 used_capacity: u32,
81 total_capacity: u32,
82}
83
84impl std::ops::Deref for CacheBackend {
85 type Target = Arc<dyn PieceCache>;
86
87 fn deref(&self) -> &Self::Target {
88 &self.backend
89 }
90}
91
92impl CacheBackend {
93 fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
94 Self {
95 backend,
96 used_capacity: 0,
97 total_capacity,
98 }
99 }
100
101 fn next_free(&mut self) -> Option<PieceCacheOffset> {
102 let offset = self.used_capacity;
103 if offset < self.total_capacity {
104 self.used_capacity += 1;
105 Some(PieceCacheOffset(offset))
106 } else {
107 debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
108 None
109 }
110 }
111
112 fn free_size(&self) -> u32 {
113 self.total_capacity - self.used_capacity
114 }
115}
116
117#[derive(Debug)]
118struct CacheState {
119 cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
120 cache_free_offsets: Vec<FarmerCacheOffset>,
121 backend: CacheBackend,
122}
123
124#[derive(Debug)]
125enum WorkerCommand {
126 ReplaceBackingCaches {
127 new_piece_caches: Vec<Arc<dyn PieceCache>>,
128 },
129 ForgetKey {
130 key: RecordKey,
131 },
132}
133
134#[derive(Debug)]
136#[must_use = "Farmer cache will not work unless its worker is running"]
137pub struct FarmerCacheWorker<NC>
138where
139 NC: fmt::Debug,
140{
141 peer_id: PeerId,
142 node_client: NC,
143 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
144 plot_caches: Arc<PlotCaches>,
145 handlers: Arc<Handlers>,
146 worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
147 metrics: Option<Arc<FarmerCacheMetrics>>,
148}
149
150impl<NC> FarmerCacheWorker<NC>
151where
152 NC: NodeClient,
153{
154 pub async fn run<PG>(mut self, piece_getter: PG)
158 where
159 PG: PieceGetter,
160 {
161 let mut last_segment_index_internal = SegmentIndex::ZERO;
163
164 let mut worker_receiver = self
165 .worker_receiver
166 .take()
167 .expect("Always set during worker instantiation");
168
169 if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
170 worker_receiver.next().await
171 {
172 self.initialize(
173 &piece_getter,
174 &mut last_segment_index_internal,
175 new_piece_caches,
176 )
177 .await;
178 } else {
179 return;
181 }
182
183 let mut segment_headers_notifications =
184 match self.node_client.subscribe_archived_segment_headers().await {
185 Ok(segment_headers_notifications) => segment_headers_notifications,
186 Err(error) => {
187 error!(%error, "Failed to subscribe to archived segments notifications");
188 return;
189 }
190 };
191
192 self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
196 .await;
197
198 loop {
199 select! {
200 maybe_command = worker_receiver.next() => {
201 let Some(command) = maybe_command else {
202 return;
204 };
205
206 self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
207 }
208 maybe_segment_header = segment_headers_notifications.next().fuse() => {
209 if let Some(segment_header) = maybe_segment_header {
210 self.process_segment_header(&piece_getter, segment_header, &mut last_segment_index_internal).await;
211 } else {
212 return;
215 }
216 }
217 }
218 }
219 }
220
221 async fn handle_command<PG>(
222 &self,
223 command: WorkerCommand,
224 piece_getter: &PG,
225 last_segment_index_internal: &mut SegmentIndex,
226 ) where
227 PG: PieceGetter,
228 {
229 match command {
230 WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
231 self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
232 .await;
233 }
234 WorkerCommand::ForgetKey { key } => {
236 let mut caches = self.piece_caches.write().await;
237 let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
238 let Some(offset) = caches.remove_stored_piece(&key) else {
239 return;
241 };
242
243 let cache_index = offset.cache_index;
244 let piece_offset = offset.piece_offset;
245 let Some(backend) = caches.get_backend(cache_index).cloned() else {
246 return;
248 };
249
250 caches.push_dangling_free_offset(offset);
251 match backend.read_piece_index(piece_offset).await {
252 Ok(Some(piece_index)) => {
253 trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
254 }
255 Ok(None) => {
256 warn!(
257 %cache_index,
258 %piece_offset,
259 "Piece index out of range, this is likely an implementation bug, \
260 not freeing heap element"
261 );
262 }
263 Err(error) => {
264 error!(
265 %error,
266 %cache_index,
267 ?key,
268 %piece_offset,
269 "Error while reading piece from cache"
270 );
271 }
272 }
273 }
274 }
275 }
276
277 async fn initialize<PG>(
278 &self,
279 piece_getter: &PG,
280 last_segment_index_internal: &mut SegmentIndex,
281 new_piece_caches: Vec<Arc<dyn PieceCache>>,
282 ) where
283 PG: PieceGetter,
284 {
285 info!("Initializing piece cache");
286
287 let (mut stored_pieces, mut dangling_free_offsets) =
289 mem::take(&mut *self.piece_caches.write().await).reuse();
290
291 debug!("Collecting pieces that were in the cache before");
292
293 if let Some(metrics) = &self.metrics {
294 metrics.piece_cache_capacity_total.set(0);
295 metrics.piece_cache_capacity_used.set(0);
296 }
297
298 let peer_id = self.peer_id;
299
300 let piece_caches_number = new_piece_caches.len();
302 let maybe_caches_futures = new_piece_caches
303 .into_iter()
304 .enumerate()
305 .filter_map(|(cache_index, new_cache)| {
306 let total_capacity = new_cache.max_num_elements();
307 let mut backend = CacheBackend::new(new_cache, total_capacity);
308 let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
309 warn!(
310 ?piece_caches_number,
311 "Too many piece caches provided, {cache_index} cache will be ignored",
312 );
313 return None;
314 };
315
316 if let Some(metrics) = &self.metrics {
317 metrics
318 .piece_cache_capacity_total
319 .inc_by(total_capacity as i64);
320 }
321
322 let init_fut = async move {
323 let used_capacity = &mut backend.used_capacity;
324
325 let mut maybe_contents = match backend.backend.contents().await {
329 Ok(contents) => Some(contents),
330 Err(error) => {
331 warn!(%error, "Failed to get cache contents");
332
333 None
334 }
335 };
336
337 #[allow(clippy::mutable_key_type)]
338 let mut cache_stored_pieces = HashMap::new();
339 let mut cache_free_offsets = Vec::new();
340
341 let Some(mut contents) = maybe_contents.take() else {
342 drop(maybe_contents);
343
344 return CacheState {
345 cache_stored_pieces,
346 cache_free_offsets,
347 backend,
348 };
349 };
350
351 while let Some(maybe_element_details) = contents.next().await {
352 let (piece_offset, maybe_piece_index) = match maybe_element_details {
353 Ok(element_details) => element_details,
354 Err(error) => {
355 warn!(%error, "Failed to get cache contents element details");
356 break;
357 }
358 };
359 let offset = FarmerCacheOffset::new(cache_index, piece_offset);
360 match maybe_piece_index {
361 Some(piece_index) => {
362 *used_capacity = piece_offset.0 + 1;
363 let record_key = RecordKey::from(piece_index.to_multihash());
364 let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
365 cache_stored_pieces.insert(key, offset);
366 }
367 None => {
368 cache_free_offsets.push(offset);
371 }
372 }
373
374 yield_now().await;
376 }
377
378 drop(maybe_contents);
379 drop(contents);
380
381 CacheState {
382 cache_stored_pieces,
383 cache_free_offsets,
384 backend,
385 }
386 };
387
388 Some(run_future_in_dedicated_thread(
389 move || init_fut.instrument(info_span!("", %cache_index)),
390 format!("piece-cache.{cache_index}"),
391 ))
392 })
393 .collect::<Result<Vec<_>, _>>();
394
395 let caches_futures = match maybe_caches_futures {
396 Ok(caches_futures) => caches_futures,
397 Err(error) => {
398 error!(%error, "Failed to spawn piece cache reading thread");
399
400 return;
401 }
402 };
403
404 let mut backends = Vec::with_capacity(caches_futures.len());
405 let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
406
407 while let Some(maybe_cache) = caches_futures.next().await {
408 match maybe_cache {
409 Ok(cache) => {
410 let backend = cache.backend;
411 for (key, cache_offset) in cache.cache_stored_pieces {
412 if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
413 dangling_free_offsets.push_front(old_cache_offset);
414 }
415 }
416 dangling_free_offsets.extend(
417 cache.cache_free_offsets.into_iter().filter(|free_offset| {
418 free_offset.piece_offset.0 < backend.used_capacity
419 }),
420 );
421 backends.push(backend);
422 }
423 Err(_cancelled) => {
424 error!("Piece cache reading thread panicked");
425
426 return;
427 }
428 };
429 }
430
431 let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
432
433 info!("Synchronizing piece cache");
434
435 let last_segment_index = loop {
436 match self.node_client.farmer_app_info().await {
437 Ok(farmer_app_info) => {
438 let last_segment_index =
439 farmer_app_info.protocol_info.history_size.segment_index();
440 if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
448 break last_segment_index;
449 }
450 }
451 Err(error) => {
452 error!(
453 %error,
454 "Failed to get farmer app info from node, keeping old cache state without \
455 updates"
456 );
457
458 *self.piece_caches.write().await = caches;
460 return;
461 }
462 }
463
464 tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
465 };
466
467 debug!(%last_segment_index, "Identified last segment index");
468
469 let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
471 let mut piece_indices_to_store = segment_indices
474 .into_par_iter()
475 .flat_map(|segment_index| {
476 segment_index
477 .segment_piece_indexes()
478 .into_par_iter()
479 .map(|piece_index| {
480 (
481 KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
482 piece_index,
483 )
484 })
485 })
486 .collect::<Vec<_>>();
487
488 piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
491
492 let mut piece_indices_to_store = piece_indices_to_store
494 .into_iter()
495 .take(caches.total_capacity())
496 .collect::<HashMap<_, _>>();
497
498 let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
499 caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
503
504 if let Some(metrics) = &self.metrics {
505 for offset in caches.stored_pieces_offsets() {
506 piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
507 }
508
509 for cache_used in piece_caches_capacity_used {
510 metrics
511 .piece_cache_capacity_used
512 .inc_by(i64::from(cache_used));
513 }
514 }
515
516 self.piece_caches.write().await.clone_from(&caches);
518
519 debug!(
520 count = %piece_indices_to_store.len(),
521 "Identified piece indices that should be cached",
522 );
523
524 let pieces_to_download_total = piece_indices_to_store.len();
525 let piece_indices_to_store = piece_indices_to_store
526 .into_values()
527 .collect::<Vec<_>>()
528 .chunks(SYNC_BATCH_SIZE)
532 .map(|chunk| chunk.to_vec())
533 .collect::<Vec<_>>();
534
535 let downloaded_pieces_count = AtomicUsize::new(0);
536 let caches = Mutex::new(caches);
537 self.handlers.progress.call_simple(&0.0);
538 let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
539
540 let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
541 let ignored_cache_indices = &RwLock::new(HashSet::new());
542
543 let downloading_pieces_stream =
544 stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
545 let downloaded_pieces_count = &downloaded_pieces_count;
546 let caches = &caches;
547
548 async move {
549 let mut permit = downloading_semaphore
550 .acquire_many(SYNC_BATCH_SIZE as u32)
551 .await
552 .expect("Semaphore is never closed; qed");
553 debug!(%batch, num_pieces = %piece_indices.len(), "Downloading pieces");
554
555 let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
556 Ok(pieces_stream) => pieces_stream,
557 Err(error) => {
558 error!(
559 %error,
560 "Failed to get pieces from piece getter"
561 );
562 return;
563 }
564 };
565 let mut pieces_stream = pieces_stream.enumerate();
566
567 while let Some((index, (piece_index, result))) = pieces_stream.next().await {
568 debug!(%batch, %index, %piece_index, "Downloaded piece");
569
570 let piece = match result {
571 Ok(Some(piece)) => {
572 trace!(%batch, %piece_index, "Downloaded piece successfully");
573 piece
574 }
575 Ok(None) => {
576 debug!(%batch, %piece_index, "Couldn't find piece");
577 continue;
578 }
579 Err(error) => {
580 debug!(
581 %batch,
582 %error,
583 %piece_index,
584 "Failed to get piece for piece cache"
585 );
586 continue;
587 }
588 };
589 permit.split(1);
591
592 let (offset, maybe_backend) = {
593 let mut caches = caches.lock();
594
595 let Some(offset) = caches.pop_free_offset() else {
597 error!(
598 %batch,
599 %piece_index,
600 "Failed to store piece in cache, there was no space"
601 );
602 break;
603 };
604
605 (offset, caches.get_backend(offset.cache_index).cloned())
606 };
607
608 let cache_index = offset.cache_index;
609 let piece_offset = offset.piece_offset;
610
611 let skip_write = ignored_cache_indices.read().contains(&cache_index);
612 if skip_write {
613 trace!(
614 %batch,
615 %cache_index,
616 %piece_index,
617 %piece_offset,
618 "Skipping known problematic cache index"
619 );
620 } else {
621 if let Some(backend) = maybe_backend
622 && let Err(error) =
623 backend.write_piece(piece_offset, piece_index, &piece).await
624 {
625 error!(
626 %error,
627 %batch,
628 %cache_index,
629 %piece_index,
630 %piece_offset,
631 "Failed to write piece into cache, ignoring this cache going \
632 forward"
633 );
634 ignored_cache_indices.write().insert(cache_index);
635 continue;
636 }
637
638 let key =
639 KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
640 caches.lock().push_stored_piece(key, offset);
641 }
642
643 let prev_downloaded_pieces_count =
644 downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
645 if prev_downloaded_pieces_count != pieces_to_download_total {
648 let progress = prev_downloaded_pieces_count as f32
649 / pieces_to_download_total as f32
650 * 100.0;
651 if prev_downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL
652 == 0
653 {
654 let mut piece_caches = self.piece_caches.write().await;
655 piece_caches.clone_from(&caches.lock());
656
657 info!("Piece cache sync {progress:.2}% complete");
658 }
659
660 self.handlers.progress.call_simple(&progress);
661 }
662 }
663 }
664 }));
665
666 downloading_pieces_stream
669 .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
672 .for_each(|()| async {})
674 .await;
675
676 *self.piece_caches.write().await = caches.into_inner();
677 self.handlers.progress.call_simple(&100.0);
678 *last_segment_index_internal = last_segment_index;
679
680 info!("Finished piece cache synchronization");
681 }
682
683 async fn process_segment_header<PG>(
684 &self,
685 piece_getter: &PG,
686 segment_header: SegmentHeader,
687 last_segment_index_internal: &mut SegmentIndex,
688 ) where
689 PG: PieceGetter,
690 {
691 let segment_index = segment_header.segment_index();
692 debug!(%segment_index, "Starting to process newly archived segment");
693
694 if *last_segment_index_internal < segment_index {
695 debug!(%segment_index, "Downloading potentially useful pieces");
696
697 let pieces_to_maybe_include = segment_index
701 .segment_piece_indexes()
702 .into_iter()
703 .map(|piece_index| async move {
704 let should_store_in_piece_cache = self
705 .piece_caches
706 .read()
707 .await
708 .should_include_key(self.peer_id, piece_index);
709
710 let key = RecordKey::from(piece_index.to_multihash());
711 let should_store_in_plot_cache =
712 self.plot_caches.should_store(piece_index, &key).await;
713
714 if !(should_store_in_piece_cache || should_store_in_plot_cache) {
715 trace!(%piece_index, "Piece doesn't need to be cached #1");
716
717 return None;
718 }
719
720 let maybe_piece_result =
721 self.node_client
722 .piece(piece_index)
723 .await
724 .inspect_err(|error| {
725 debug!(
726 %error,
727 %segment_index,
728 %piece_index,
729 "Failed to retrieve piece from node right after archiving"
730 );
731 });
732
733 if let Ok(Some(piece)) = maybe_piece_result {
734 return Some((piece_index, piece));
735 }
736
737 match piece_getter.get_piece(piece_index).await {
738 Ok(Some(piece)) => Some((piece_index, piece)),
739 Ok(None) => {
740 warn!(
741 %segment_index,
742 %piece_index,
743 "Failed to retrieve piece right after archiving"
744 );
745
746 None
747 }
748 Err(error) => {
749 warn!(
750 %error,
751 %segment_index,
752 %piece_index,
753 "Failed to retrieve piece right after archiving"
754 );
755
756 None
757 }
758 }
759 })
760 .collect::<FuturesUnordered<_>>()
761 .filter_map(|maybe_piece| async move { maybe_piece })
762 .collect::<Vec<_>>()
763 .await;
764
765 debug!(%segment_index, "Downloaded potentially useful pieces");
766
767 self.acknowledge_archived_segment_processing(segment_index)
768 .await;
769
770 for (piece_index, piece) in pieces_to_maybe_include {
773 if !self
774 .plot_caches
775 .store_additional_piece(piece_index, &piece)
776 .await
777 {
778 trace!(%piece_index, "Piece doesn't need to be cached in plot cache");
779 }
780
781 if !self
782 .piece_caches
783 .read()
784 .await
785 .should_include_key(self.peer_id, piece_index)
786 {
787 trace!(%piece_index, "Piece doesn't need to be cached #2");
788
789 continue;
790 }
791
792 trace!(%piece_index, "Piece needs to be cached #1");
793
794 self.persist_piece_in_cache(piece_index, piece).await;
795 }
796
797 *last_segment_index_internal = segment_index;
798 } else {
799 self.acknowledge_archived_segment_processing(segment_index)
800 .await;
801 }
802
803 debug!(%segment_index, "Finished processing newly archived segment");
804 }
805
806 async fn acknowledge_archived_segment_processing(&self, segment_index: SegmentIndex) {
807 match self
808 .node_client
809 .acknowledge_archived_segment_header(segment_index)
810 .await
811 {
812 Ok(()) => {
813 debug!(%segment_index, "Acknowledged archived segment");
814 }
815 Err(error) => {
816 error!(%segment_index, ?error, "Failed to acknowledge archived segment");
817 }
818 };
819 }
820
821 async fn keep_up_after_initial_sync<PG>(
822 &self,
823 piece_getter: &PG,
824 last_segment_index_internal: &mut SegmentIndex,
825 ) where
826 PG: PieceGetter,
827 {
828 let last_segment_index = match self.node_client.farmer_app_info().await {
829 Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
830 Err(error) => {
831 error!(
832 %error,
833 "Failed to get farmer app info from node, keeping old cache state without \
834 updates"
835 );
836 return;
837 }
838 };
839
840 if last_segment_index <= *last_segment_index_internal {
841 return;
842 }
843
844 info!(
845 "Syncing piece cache to the latest history size, this may pause block production if \
846 takes too long"
847 );
848
849 let piece_indices = (*last_segment_index_internal..=last_segment_index)
851 .flat_map(|segment_index| segment_index.segment_piece_indexes());
852
853 for piece_index in piece_indices {
855 if !self
856 .piece_caches
857 .read()
858 .await
859 .should_include_key(self.peer_id, piece_index)
860 {
861 trace!(%piece_index, "Piece doesn't need to be cached #3");
862
863 continue;
864 }
865
866 trace!(%piece_index, "Piece needs to be cached #2");
867
868 let result = piece_getter.get_piece(piece_index).await;
869
870 let piece = match result {
871 Ok(Some(piece)) => piece,
872 Ok(None) => {
873 debug!(%piece_index, "Couldn't find piece");
874 continue;
875 }
876 Err(error) => {
877 debug!(
878 %error,
879 %piece_index,
880 "Failed to get piece for piece cache"
881 );
882 continue;
883 }
884 };
885
886 self.persist_piece_in_cache(piece_index, piece).await;
887 }
888
889 info!("Finished syncing piece cache to the latest history size");
890
891 *last_segment_index_internal = last_segment_index;
892 }
893
894 async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
897 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
898 let mut caches = self.piece_caches.write().await;
899 match caches.should_replace(&key) {
900 Some((old_key, offset)) => {
902 let cache_index = offset.cache_index;
903 let piece_offset = offset.piece_offset;
904 let Some(backend) = caches.get_backend(cache_index) else {
905 warn!(
907 %cache_index,
908 %piece_index,
909 "Should have a cached backend, but it didn't exist, this is an \
910 implementation bug"
911 );
912 return;
913 };
914 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
915 error!(
916 %error,
917 %cache_index,
918 %piece_index,
919 %piece_offset,
920 "Failed to write piece into cache"
921 );
922 } else {
923 let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
924 trace!(
925 %cache_index,
926 %old_piece_index,
927 %piece_index,
928 %piece_offset,
929 "Successfully replaced old cached piece"
930 );
931 caches.push_stored_piece(key, offset);
932 }
933 }
934 None => {
936 let Some(offset) = caches.pop_free_offset() else {
937 warn!(
938 %piece_index,
939 "Should have inserted piece into cache, but it didn't happen, this is an \
940 implementation bug"
941 );
942 return;
943 };
944 let cache_index = offset.cache_index;
945 let piece_offset = offset.piece_offset;
946 let Some(backend) = caches.get_backend(cache_index) else {
947 warn!(
949 %cache_index,
950 %piece_index,
951 "Should have a cached backend, but it didn't exist, this is an \
952 implementation bug"
953 );
954 return;
955 };
956
957 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
958 error!(
959 %error,
960 %cache_index,
961 %piece_index,
962 %piece_offset,
963 "Failed to write piece into cache"
964 );
965 } else {
966 trace!(
967 %cache_index,
968 %piece_index,
969 %piece_offset,
970 "Successfully stored piece in cache"
971 );
972 if let Some(metrics) = &self.metrics {
973 metrics.piece_cache_capacity_used.inc();
974 }
975 caches.push_stored_piece(key, offset);
976 }
977 }
978 };
979 }
980}
981
982#[derive(Debug)]
983struct PlotCaches {
984 caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
986 next_plot_cache: AtomicUsize,
988}
989
990impl PlotCaches {
991 async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
992 for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
993 match cache.is_piece_maybe_stored(key).await {
994 Ok(MaybePieceStoredResult::No) => {
995 }
997 Ok(MaybePieceStoredResult::Vacant) => {
998 return true;
999 }
1000 Ok(MaybePieceStoredResult::Yes) => {
1001 return false;
1003 }
1004 Err(error) => {
1005 warn!(
1006 %cache_index,
1007 %piece_index,
1008 %error,
1009 "Failed to check piece stored in cache"
1010 );
1011 }
1012 }
1013 }
1014
1015 false
1016 }
1017
1018 async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1020 let plot_caches = self.caches.read().await;
1021 let plot_caches_len = plot_caches.len();
1022
1023 for _ in 0..plot_caches_len {
1025 let plot_cache_index =
1026 self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1027
1028 match plot_caches[plot_cache_index]
1029 .try_store_piece(piece_index, piece)
1030 .await
1031 {
1032 Ok(true) => {
1033 return false;
1034 }
1035 Ok(false) => {
1036 continue;
1037 }
1038 Err(error) => {
1039 error!(
1040 %error,
1041 %piece_index,
1042 %plot_cache_index,
1043 "Failed to store additional piece in cache"
1044 );
1045 continue;
1046 }
1047 }
1048 }
1049
1050 false
1051 }
1052}
1053
1054#[derive(Debug, Clone)]
1065pub struct FarmerCache {
1066 peer_id: PeerId,
1067 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1069 plot_caches: Arc<PlotCaches>,
1071 handlers: Arc<Handlers>,
1072 worker_sender: mpsc::Sender<WorkerCommand>,
1074 metrics: Option<Arc<FarmerCacheMetrics>>,
1075}
1076
1077impl FarmerCache {
1078 pub fn new<NC>(
1083 node_client: NC,
1084 peer_id: PeerId,
1085 registry: Option<&mut Registry>,
1086 ) -> (Self, FarmerCacheWorker<NC>)
1087 where
1088 NC: NodeClient,
1089 {
1090 let caches = Arc::default();
1091 let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1092 let handlers = Arc::new(Handlers::default());
1093
1094 let plot_caches = Arc::new(PlotCaches {
1095 caches: AsyncRwLock::default(),
1096 next_plot_cache: AtomicUsize::new(0),
1097 });
1098 let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1099
1100 let instance = Self {
1101 peer_id,
1102 piece_caches: Arc::clone(&caches),
1103 plot_caches: Arc::clone(&plot_caches),
1104 handlers: Arc::clone(&handlers),
1105 worker_sender,
1106 metrics: metrics.clone(),
1107 };
1108 let worker = FarmerCacheWorker {
1109 peer_id,
1110 node_client,
1111 piece_caches: caches,
1112 plot_caches,
1113 handlers,
1114 worker_receiver: Some(worker_receiver),
1115 metrics,
1116 };
1117
1118 (instance, worker)
1119 }
1120
1121 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1123 where
1124 RecordKey: From<Key>,
1125 {
1126 let key = RecordKey::from(key);
1127 let maybe_piece_found = {
1128 let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1129 let caches = self.piece_caches.read().await;
1130
1131 caches.get_stored_piece(&key).and_then(|offset| {
1132 let cache_index = offset.cache_index;
1133 let piece_offset = offset.piece_offset;
1134 Some((
1135 piece_offset,
1136 cache_index,
1137 caches.get_backend(cache_index)?.clone(),
1138 ))
1139 })
1140 };
1141
1142 if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1143 match backend.read_piece(piece_offset).await {
1144 Ok(maybe_piece) => {
1145 return match maybe_piece {
1146 Some((_piece_index, piece)) => {
1147 if let Some(metrics) = &self.metrics {
1148 metrics.cache_get_hit.inc();
1149 }
1150 Some(piece)
1151 }
1152 None => {
1153 error!(
1154 %cache_index,
1155 %piece_offset,
1156 ?key,
1157 "Piece was expected to be in cache, but wasn't found there"
1158 );
1159 if let Some(metrics) = &self.metrics {
1160 metrics.cache_get_error.inc();
1161 }
1162 None
1163 }
1164 };
1165 }
1166 Err(error) => {
1167 error!(
1168 %error,
1169 %cache_index,
1170 ?key,
1171 %piece_offset,
1172 "Error while reading piece from cache"
1173 );
1174
1175 if let Err(error) = self
1176 .worker_sender
1177 .clone()
1178 .send(WorkerCommand::ForgetKey { key })
1179 .await
1180 {
1181 trace!(%error, "Failed to send ForgetKey command to worker");
1182 }
1183
1184 if let Some(metrics) = &self.metrics {
1185 metrics.cache_get_error.inc();
1186 }
1187 return None;
1188 }
1189 }
1190 }
1191
1192 for cache in self.plot_caches.caches.read().await.iter() {
1193 if let Ok(Some(piece)) = cache.read_piece(&key).await {
1194 if let Some(metrics) = &self.metrics {
1195 metrics.cache_get_hit.inc();
1196 }
1197 return Some(piece);
1198 }
1199 }
1200
1201 if let Some(metrics) = &self.metrics {
1202 metrics.cache_get_miss.inc();
1203 }
1204 None
1205 }
1206
1207 pub async fn get_pieces<'a, PieceIndices>(
1211 &'a self,
1212 piece_indices: PieceIndices,
1213 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1214 where
1215 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1216 {
1217 let mut pieces_to_get_from_plot_cache = Vec::new();
1218
1219 let pieces_to_read_from_piece_cache = {
1220 let caches = self.piece_caches.read().await;
1221 let mut pieces_to_read_from_piece_cache =
1223 HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1224
1225 for piece_index in piece_indices {
1226 let key = RecordKey::from(piece_index.to_multihash());
1227
1228 let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1229 self.peer_id,
1230 key.clone(),
1231 )) {
1232 Some(offset) => offset,
1233 None => {
1234 pieces_to_get_from_plot_cache.push((piece_index, key));
1235 continue;
1236 }
1237 };
1238
1239 let cache_index = offset.cache_index;
1240 let piece_offset = offset.piece_offset;
1241
1242 match pieces_to_read_from_piece_cache.entry(cache_index) {
1243 Entry::Occupied(mut entry) => {
1244 let (_backend, pieces) = entry.get_mut();
1245 pieces.insert(piece_offset, (piece_index, key));
1246 }
1247 Entry::Vacant(entry) => {
1248 let backend = match caches.get_backend(cache_index) {
1249 Some(backend) => backend.clone(),
1250 None => {
1251 pieces_to_get_from_plot_cache.push((piece_index, key));
1252 continue;
1253 }
1254 };
1255 entry
1256 .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1257 }
1258 }
1259 }
1260
1261 pieces_to_read_from_piece_cache
1262 };
1263
1264 let (tx, mut rx) = mpsc::unbounded();
1265
1266 let fut = async move {
1267 let tx = &tx;
1268
1269 let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1270 .into_iter()
1271 .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1272 let mut pieces_stream = match backend
1273 .read_pieces(Box::new(
1274 pieces_to_get
1275 .keys()
1276 .copied()
1277 .collect::<Vec<_>>()
1278 .into_iter(),
1279 ))
1280 .await
1281 {
1282 Ok(pieces_stream) => pieces_stream,
1283 Err(error) => {
1284 error!(
1285 %error,
1286 %cache_index,
1287 "Error while reading pieces from cache"
1288 );
1289
1290 if let Some(metrics) = &self.metrics {
1291 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1292 }
1293 for (piece_index, _key) in pieces_to_get.into_values() {
1294 tx.unbounded_send((piece_index, None)).expect(
1295 "This future isn't polled after receiver is dropped; qed",
1296 );
1297 }
1298 return;
1299 }
1300 };
1301
1302 while let Some(maybe_piece) = pieces_stream.next().await {
1303 let result = match maybe_piece {
1304 Ok((piece_offset, Some((piece_index, piece)))) => {
1305 pieces_to_get.remove(&piece_offset);
1306
1307 if let Some(metrics) = &self.metrics {
1308 metrics.cache_get_hit.inc();
1309 }
1310 (piece_index, Some(piece))
1311 }
1312 Ok((piece_offset, None)) => {
1313 let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1314 else {
1315 debug!(
1316 %cache_index,
1317 %piece_offset,
1318 "Received piece offset that was not expected"
1319 );
1320 continue;
1321 };
1322
1323 error!(
1324 %cache_index,
1325 %piece_index,
1326 %piece_offset,
1327 ?key,
1328 "Piece was expected to be in cache, but wasn't found there"
1329 );
1330 if let Some(metrics) = &self.metrics {
1331 metrics.cache_get_error.inc();
1332 }
1333 (piece_index, None)
1334 }
1335 Err(error) => {
1336 error!(
1337 %error,
1338 %cache_index,
1339 "Error while reading piece from cache"
1340 );
1341
1342 if let Some(metrics) = &self.metrics {
1343 metrics.cache_get_error.inc();
1344 }
1345 continue;
1346 }
1347 };
1348
1349 tx.unbounded_send(result)
1350 .expect("This future isn't polled after receiver is dropped; qed");
1351 }
1352
1353 if pieces_to_get.is_empty() {
1354 return;
1355 }
1356
1357 if let Some(metrics) = &self.metrics {
1358 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1359 }
1360 for (piece_offset, (piece_index, key)) in pieces_to_get {
1361 error!(
1362 %cache_index,
1363 %piece_index,
1364 %piece_offset,
1365 ?key,
1366 "Piece cache didn't return an entry for offset"
1367 );
1368
1369 tx.unbounded_send((piece_index, None))
1372 .expect("This future isn't polled after receiver is dropped; qed");
1373 }
1374 })
1375 .collect::<FuturesUnordered<_>>();
1376 let reading_from_piece_cache_fut = async move {
1382 while let Some(()) = reading_from_piece_cache.next().await {
1383 }
1385 };
1386
1387 let reading_from_plot_cache_fut = async {
1388 if pieces_to_get_from_plot_cache.is_empty() {
1389 return;
1390 }
1391
1392 for cache in self.plot_caches.caches.read().await.iter() {
1393 for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1396 let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1397
1398 if let Ok(Some(piece)) = cache.read_piece(key).await {
1399 if let Some(metrics) = &self.metrics {
1400 metrics.cache_get_hit.inc();
1401 }
1402 tx.unbounded_send((*piece_index, Some(piece)))
1403 .expect("This future isn't polled after receiver is dropped; qed");
1404
1405 pieces_to_get_from_plot_cache.swap_remove(offset);
1408 }
1409 }
1410
1411 if pieces_to_get_from_plot_cache.is_empty() {
1412 return;
1413 }
1414 }
1415
1416 if let Some(metrics) = &self.metrics {
1417 metrics
1418 .cache_get_miss
1419 .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1420 }
1421
1422 for (piece_index, _key) in pieces_to_get_from_plot_cache {
1423 tx.unbounded_send((piece_index, None))
1424 .expect("This future isn't polled after receiver is dropped; qed");
1425 }
1426 };
1427
1428 join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1429 };
1430 let mut fut = Box::pin(fut.fuse());
1431
1432 stream::poll_fn(move |cx| {
1434 if !fut.is_terminated() {
1435 let _ = fut.poll_unpin(cx);
1437 }
1438
1439 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1440 return Poll::Ready(maybe_result);
1441 }
1442
1443 Poll::Pending
1445 })
1446 }
1447
1448 pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1450 let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1451 piece_indices
1452 .iter()
1453 .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1454 );
1455
1456 {
1458 let piece_caches = self.piece_caches.read().await;
1459 pieces_to_find.retain(|_piece_index, key| {
1460 let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1461 !piece_caches.contains_stored_piece(&distance_key)
1462 });
1463 }
1464
1465 if pieces_to_find.is_empty() {
1467 return piece_indices;
1468 }
1469
1470 if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1472 let plot_caches = &plot_caches;
1473 let not_found = pieces_to_find
1474 .into_iter()
1475 .map(|(piece_index, key)| async move {
1476 let key = &key;
1477
1478 let found = plot_caches
1479 .iter()
1480 .map(|plot_cache| async {
1481 matches!(
1482 plot_cache.is_piece_maybe_stored(key).await,
1483 Ok(MaybePieceStoredResult::Yes)
1484 )
1485 })
1486 .collect::<FuturesUnordered<_>>()
1487 .any(|found| async move { found })
1488 .await;
1489
1490 if found {
1491 None
1492 } else {
1493 Some(piece_index)
1494 }
1495 })
1496 .collect::<FuturesUnordered<_>>()
1497 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1498 .collect::<HashSet<_>>()
1499 .await;
1500 piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1501 }
1502 piece_indices
1503 }
1504
1505 pub async fn find_piece(
1507 &self,
1508 piece_index: PieceIndex,
1509 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1510 let caches = self.piece_caches.read().await;
1511
1512 self.find_piece_internal(&caches, piece_index)
1513 }
1514
1515 pub async fn find_pieces<PieceIndices>(
1517 &self,
1518 piece_indices: PieceIndices,
1519 ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1520 where
1521 PieceIndices: IntoIterator<Item = PieceIndex>,
1522 {
1523 let caches = self.piece_caches.read().await;
1524
1525 piece_indices
1526 .into_iter()
1527 .filter_map(|piece_index| {
1528 self.find_piece_internal(&caches, piece_index)
1529 .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1530 })
1531 .collect()
1532 }
1533
1534 fn find_piece_internal(
1535 &self,
1536 caches: &PieceCachesState,
1537 piece_index: PieceIndex,
1538 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1539 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1540
1541 let Some(offset) = caches.get_stored_piece(&key) else {
1542 if let Some(metrics) = &self.metrics {
1543 metrics.cache_find_miss.inc();
1544 }
1545
1546 return None;
1547 };
1548 let piece_offset = offset.piece_offset;
1549
1550 if let Some(backend) = caches.get_backend(offset.cache_index) {
1551 if let Some(metrics) = &self.metrics {
1552 metrics.cache_find_hit.inc();
1553 }
1554 return Some((*backend.id(), piece_offset));
1555 }
1556
1557 if let Some(metrics) = &self.metrics {
1558 metrics.cache_find_miss.inc();
1559 }
1560 None
1561 }
1562
1563 pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) {
1565 let key = RecordKey::from(piece_index.to_multihash());
1566
1567 let should_store = self.plot_caches.should_store(piece_index, &key).await;
1568
1569 if !should_store {
1570 return;
1571 }
1572
1573 self.plot_caches
1574 .store_additional_piece(piece_index, piece)
1575 .await;
1576 }
1577
1578 pub async fn replace_backing_caches(
1580 &self,
1581 new_piece_caches: Vec<Arc<dyn PieceCache>>,
1582 new_plot_caches: Vec<Arc<dyn PlotCache>>,
1583 ) {
1584 if let Err(error) = self
1585 .worker_sender
1586 .clone()
1587 .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1588 .await
1589 {
1590 warn!(%error, "Failed to replace backing caches, worker exited");
1591 }
1592
1593 *self.plot_caches.caches.write().await = new_plot_caches;
1594 }
1595
1596 pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1598 self.handlers.progress.add(callback)
1599 }
1600}
1601
1602#[derive(Debug, Clone)]
1604pub struct FarmerCaches {
1605 caches: Arc<[FarmerCache]>,
1606}
1607
1608impl From<Arc<[FarmerCache]>> for FarmerCaches {
1609 fn from(caches: Arc<[FarmerCache]>) -> Self {
1610 Self { caches }
1611 }
1612}
1613
1614impl From<FarmerCache> for FarmerCaches {
1615 fn from(cache: FarmerCache) -> Self {
1616 Self {
1617 caches: Arc::new([cache]),
1618 }
1619 }
1620}
1621
1622impl FarmerCaches {
1623 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1625 where
1626 RecordKey: From<Key>,
1627 {
1628 let farmer_cache = self.caches.choose(&mut thread_rng())?;
1629 farmer_cache.get_piece(key).await
1630 }
1631
1632 pub async fn get_pieces<'a, PieceIndices>(
1636 &'a self,
1637 piece_indices: PieceIndices,
1638 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1639 where
1640 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1641 {
1642 let Some(farmer_cache) = self.caches.choose(&mut thread_rng()) else {
1643 return Either::Left(stream::iter(
1644 piece_indices
1645 .into_iter()
1646 .map(|piece_index| (piece_index, None)),
1647 ));
1648 };
1649
1650 Either::Right(farmer_cache.get_pieces(piece_indices).await)
1651 }
1652
1653 pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1655 let Some(farmer_cache) = self.caches.choose(&mut thread_rng()) else {
1656 return Vec::new();
1657 };
1658
1659 farmer_cache.has_pieces(piece_indices).await
1660 }
1661
1662 pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) {
1664 self.caches
1665 .iter()
1666 .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1667 .collect::<FuturesUnordered<_>>()
1668 .for_each(|()| async {})
1669 .await;
1670 }
1671}
1672
1673fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1675 let len = key.as_ref().len();
1676 let s = len - PieceIndex::SIZE;
1677
1678 let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1679 piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1680
1681 PieceIndex::from_bytes(piece_index_bytes)
1682}