subspace_farmer/single_disk_farm/
piece_cache.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//! Cache implementation specific to single disk farm

use crate::disk_piece_cache::DiskPieceCache;
use crate::farm;
use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
use async_trait::async_trait;
use futures::{stream, Stream};
use subspace_core_primitives::pieces::{Piece, PieceIndex};

/// Dedicated piece cache stored on one disk, is used both to accelerate DSN queries and to plot
/// faster
#[derive(Debug, Clone)]
pub struct SingleDiskPieceCache {
    id: PieceCacheId,
    maybe_piece_cache: Option<DiskPieceCache>,
}

#[async_trait]
impl farm::PieceCache for SingleDiskPieceCache {
    fn id(&self) -> &PieceCacheId {
        &self.id
    }

    fn max_num_elements(&self) -> u32 {
        if let Some(piece_cache) = &self.maybe_piece_cache {
            piece_cache.max_num_elements()
        } else {
            0
        }
    }

    async fn contents(
        &self,
    ) -> Result<
        Box<
            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
                + Unpin
                + Send
                + '_,
        >,
        FarmError,
    > {
        if let Some(piece_cache) = &self.maybe_piece_cache {
            farm::PieceCache::contents(piece_cache).await
        } else {
            Ok(Box::new(stream::empty()))
        }
    }

    async fn write_piece(
        &self,
        offset: PieceCacheOffset,
        piece_index: PieceIndex,
        piece: &Piece,
    ) -> Result<(), FarmError> {
        if let Some(piece_cache) = &self.maybe_piece_cache {
            farm::PieceCache::write_piece(piece_cache, offset, piece_index, piece).await
        } else {
            Err("Can't write pieces into empty cache".into())
        }
    }

    async fn read_piece_index(
        &self,
        offset: PieceCacheOffset,
    ) -> Result<Option<PieceIndex>, FarmError> {
        if let Some(piece_cache) = &self.maybe_piece_cache {
            farm::PieceCache::read_piece_index(piece_cache, offset).await
        } else {
            Ok(None)
        }
    }

    async fn read_piece(
        &self,
        offset: PieceCacheOffset,
    ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
        if let Some(piece_cache) = &self.maybe_piece_cache {
            farm::PieceCache::read_piece(piece_cache, offset).await
        } else {
            Ok(None)
        }
    }

    async fn read_pieces(
        &self,
        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
    ) -> Result<
        Box<
            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
                + Send
                + Unpin
                + '_,
        >,
        FarmError,
    > {
        if let Some(piece_cache) = &self.maybe_piece_cache {
            farm::PieceCache::read_pieces(piece_cache, offsets).await
        } else {
            Ok(Box::new(stream::iter(
                offsets.map(|offset| Ok((offset, None))),
            )))
        }
    }
}

impl SingleDiskPieceCache {
    pub(crate) fn new(id: PieceCacheId, maybe_piece_cache: Option<DiskPieceCache>) -> Self {
        Self {
            id,
            maybe_piece_cache,
        }
    }
}