subspace_farmer/single_disk_farm/
piece_cache.rs

1//! Cache implementation specific to single disk farm
2
3use crate::disk_piece_cache::DiskPieceCache;
4use crate::farm;
5use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
6use async_trait::async_trait;
7use futures::{stream, Stream};
8use subspace_core_primitives::pieces::{Piece, PieceIndex};
9
10/// Dedicated piece cache stored on one disk, is used both to accelerate DSN queries and to plot
11/// faster
12#[derive(Debug, Clone)]
13pub struct SingleDiskPieceCache {
14    id: PieceCacheId,
15    maybe_piece_cache: Option<DiskPieceCache>,
16}
17
18#[async_trait]
19impl farm::PieceCache for SingleDiskPieceCache {
20    fn id(&self) -> &PieceCacheId {
21        &self.id
22    }
23
24    fn max_num_elements(&self) -> u32 {
25        if let Some(piece_cache) = &self.maybe_piece_cache {
26            piece_cache.max_num_elements()
27        } else {
28            0
29        }
30    }
31
32    async fn contents(
33        &self,
34    ) -> Result<
35        Box<
36            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
37                + Unpin
38                + Send
39                + '_,
40        >,
41        FarmError,
42    > {
43        if let Some(piece_cache) = &self.maybe_piece_cache {
44            farm::PieceCache::contents(piece_cache).await
45        } else {
46            Ok(Box::new(stream::empty()))
47        }
48    }
49
50    async fn write_piece(
51        &self,
52        offset: PieceCacheOffset,
53        piece_index: PieceIndex,
54        piece: &Piece,
55    ) -> Result<(), FarmError> {
56        if let Some(piece_cache) = &self.maybe_piece_cache {
57            farm::PieceCache::write_piece(piece_cache, offset, piece_index, piece).await
58        } else {
59            Err("Can't write pieces into empty cache".into())
60        }
61    }
62
63    async fn read_piece_index(
64        &self,
65        offset: PieceCacheOffset,
66    ) -> Result<Option<PieceIndex>, FarmError> {
67        if let Some(piece_cache) = &self.maybe_piece_cache {
68            farm::PieceCache::read_piece_index(piece_cache, offset).await
69        } else {
70            Ok(None)
71        }
72    }
73
74    async fn read_piece(
75        &self,
76        offset: PieceCacheOffset,
77    ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
78        if let Some(piece_cache) = &self.maybe_piece_cache {
79            farm::PieceCache::read_piece(piece_cache, offset).await
80        } else {
81            Ok(None)
82        }
83    }
84
85    async fn read_pieces(
86        &self,
87        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
88    ) -> Result<
89        Box<
90            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
91                + Send
92                + Unpin
93                + '_,
94        >,
95        FarmError,
96    > {
97        if let Some(piece_cache) = &self.maybe_piece_cache {
98            farm::PieceCache::read_pieces(piece_cache, offsets).await
99        } else {
100            Ok(Box::new(stream::iter(
101                offsets.map(|offset| Ok((offset, None))),
102            )))
103        }
104    }
105}
106
107impl SingleDiskPieceCache {
108    pub(crate) fn new(id: PieceCacheId, maybe_piece_cache: Option<DiskPieceCache>) -> Self {
109        Self {
110            id,
111            maybe_piece_cache,
112        }
113    }
114}