subspace_farmer/single_disk_farm/
plot_cache.rs

1//! Plot cache for single disk farm
2
3#[cfg(test)]
4mod tests;
5
6use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache};
7use crate::single_disk_farm::direct_io_file::DirectIoFile;
8use async_lock::RwLock as AsyncRwLock;
9use async_trait::async_trait;
10use bytes::BytesMut;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::{Arc, Weak};
14use std::{io, mem};
15use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash_list};
16use subspace_core_primitives::pieces::{Piece, PieceIndex};
17use subspace_core_primitives::sectors::SectorIndex;
18use subspace_farmer_components::file_ext::FileExt;
19use subspace_farmer_components::sector::SectorMetadataChecksummed;
20use subspace_networking::libp2p::kad::RecordKey;
21use subspace_networking::utils::multihash::ToMultihash;
22use subspace_process::AsyncJoinOnDrop;
23use thiserror::Error;
24use tokio::task;
25use tracing::{debug, info, trace, warn};
26
27/// Disk plot cache open error
28#[derive(Debug, Error)]
29pub enum DiskPlotCacheError {
30    /// I/O error occurred
31    #[error("Plot cache I/O error: {0}")]
32    Io(#[from] io::Error),
33    /// Failed to spawn task for blocking thread
34    #[error("Failed to spawn task for blocking thread: {0}")]
35    TokioJoinError(#[from] tokio::task::JoinError),
36    /// Checksum mismatch
37    #[error("Checksum mismatch")]
38    ChecksumMismatch,
39}
40
41#[derive(Debug)]
42struct CachedPieces {
43    /// Map of piece index into offset
44    map: HashMap<RecordKey, u32>,
45    next_offset: Option<u32>,
46}
47
48/// Additional piece cache that exploit part of the plot that does not contain sectors yet
49#[derive(Debug, Clone)]
50pub struct DiskPlotCache {
51    file: Weak<DirectIoFile>,
52    sectors_metadata: Weak<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
53    cached_pieces: Arc<RwLock<CachedPieces>>,
54    target_sector_count: SectorIndex,
55    sector_size: u64,
56}
57
58#[async_trait]
59impl PlotCache for DiskPlotCache {
60    async fn is_piece_maybe_stored(
61        &self,
62        key: &RecordKey,
63    ) -> Result<MaybePieceStoredResult, FarmError> {
64        Ok(self.is_piece_maybe_stored(key))
65    }
66
67    /// Store piece in cache if there is free space, and return `Ok(true)`.
68    /// Returns `Ok(false)` if there is no free space, or the farm or process is shutting down.
69    async fn try_store_piece(
70        &self,
71        piece_index: PieceIndex,
72        piece: &Piece,
73    ) -> Result<bool, FarmError> {
74        Ok(self.try_store_piece(piece_index, piece).await?)
75    }
76
77    async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError> {
78        Ok(self.read_piece(key).await)
79    }
80}
81
82impl DiskPlotCache {
83    pub(crate) fn new(
84        file: &Arc<DirectIoFile>,
85        sectors_metadata: &Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
86        target_sector_count: SectorIndex,
87        sector_size: u64,
88    ) -> Self {
89        info!("Checking plot cache contents, this can take a while");
90        let cached_pieces = {
91            let sectors_metadata = sectors_metadata.read_blocking();
92            let mut element = vec![0; Self::element_size() as usize];
93            // Clippy complains about `RecordKey`, but it is not changing here, so it is fine
94            #[allow(clippy::mutable_key_type)]
95            let mut map = HashMap::new();
96            let mut next_offset = None;
97
98            let file_size = sector_size * u64::from(target_sector_count);
99            let plotted_size = sector_size * sectors_metadata.len() as u64;
100
101            // Step over all free potential offsets for pieces that could have been cached
102            let from_offset = (plotted_size / Self::element_size() as u64) as u32;
103            let to_offset = (file_size / Self::element_size() as u64) as u32;
104            // TODO: Parallelize or read in larger batches
105            for offset in (from_offset..to_offset).rev() {
106                match Self::read_piece_internal(file, offset, &mut element) {
107                    Ok(maybe_piece_index) => match maybe_piece_index {
108                        Some(piece_index) => {
109                            map.insert(RecordKey::from(piece_index.to_multihash()), offset);
110                        }
111                        None => {
112                            next_offset.replace(offset);
113                            break;
114                        }
115                    },
116                    Err(DiskPlotCacheError::ChecksumMismatch) => {
117                        next_offset.replace(offset);
118                        break;
119                    }
120                    Err(error) => {
121                        warn!(%error, %offset, "Failed to read plot cache element");
122                        break;
123                    }
124                }
125            }
126
127            CachedPieces { map, next_offset }
128        };
129
130        info!("Finished checking plot cache contents");
131
132        Self {
133            file: Arc::downgrade(file),
134            sectors_metadata: Arc::downgrade(sectors_metadata),
135            cached_pieces: Arc::new(RwLock::new(cached_pieces)),
136            target_sector_count,
137            sector_size,
138        }
139    }
140
141    /// Size of a single plot cache element
142    pub(crate) const fn element_size() -> u32 {
143        (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
144    }
145
146    /// Check if piece is potentially stored in this cache (not guaranteed to be because it might be
147    /// overridden with sector any time)
148    pub(crate) fn is_piece_maybe_stored(&self, key: &RecordKey) -> MaybePieceStoredResult {
149        let offset = {
150            let cached_pieces = self.cached_pieces.read();
151
152            let Some(offset) = cached_pieces.map.get(key).copied() else {
153                return if cached_pieces.next_offset.is_some() {
154                    MaybePieceStoredResult::Vacant
155                } else {
156                    MaybePieceStoredResult::No
157                };
158            };
159
160            offset
161        };
162
163        let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
164            return MaybePieceStoredResult::No;
165        };
166
167        let element_offset = u64::from(offset) * u64::from(Self::element_size());
168        // Blocking read is fine because writes in farmer are very rare and very brief
169        let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64;
170
171        // Make sure offset is after anything that is already plotted
172        if element_offset < plotted_bytes {
173            // Remove entry since it was overwritten with a sector already
174            self.cached_pieces.write().map.remove(key);
175            MaybePieceStoredResult::No
176        } else {
177            MaybePieceStoredResult::Yes
178        }
179    }
180
181    /// Store piece in cache if there is free space, and return `Ok(true)`.
182    /// Returns `Ok(false)` if there is no free space, or the farm or process is shutting down.
183    pub(crate) async fn try_store_piece(
184        &self,
185        piece_index: PieceIndex,
186        piece: &Piece,
187    ) -> Result<bool, DiskPlotCacheError> {
188        let offset = {
189            // First, do a quick concurrent check for free space with a read lock, dropping it
190            // immediately.
191            if self.cached_pieces.read().next_offset.is_none() {
192                return Ok(false);
193            };
194
195            // Then, if there was free space, acquire a write lock, and check for intervening
196            // writes.
197            let mut cached_pieces = self.cached_pieces.write();
198            let Some(next_offset) = cached_pieces.next_offset else {
199                return Ok(false);
200            };
201
202            let offset = next_offset;
203            cached_pieces.next_offset = offset.checked_sub(1);
204            offset
205        };
206
207        let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
208            // Metadata has been dropped, farm or process is shutting down
209            return Ok(false);
210        };
211
212        let element_offset = u64::from(offset) * u64::from(Self::element_size());
213        let sectors_metadata = sectors_metadata.read().await;
214        let plotted_sectors_count = sectors_metadata.len() as SectorIndex;
215        let plotted_bytes = self.sector_size * u64::from(plotted_sectors_count);
216
217        // Make sure offset is after anything that is already plotted
218        if element_offset < plotted_bytes {
219            // Just to be safe, avoid any overlap of read and write locks
220            drop(sectors_metadata);
221            let mut cached_pieces = self.cached_pieces.write();
222            // No space to store more pieces anymore
223            cached_pieces.next_offset.take();
224            if plotted_sectors_count == self.target_sector_count {
225                // Free allocated memory once fully plotted
226                mem::take(&mut cached_pieces.map);
227            }
228            return Ok(false);
229        }
230
231        let Some(file) = self.file.upgrade() else {
232            // File has been dropped, farm or process is shutting down
233            return Ok(false);
234        };
235
236        trace!(
237            %offset,
238            ?piece_index,
239            %plotted_sectors_count,
240            "Found available piece cache free space offset, writing piece",
241        );
242
243        let write_fut = tokio::task::spawn_blocking({
244            let piece_index_bytes = piece_index.to_bytes();
245            // File writes are read/write/modify internally, so combine all data here for more
246            // efficient write
247            let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
248            bytes.extend_from_slice(&piece_index_bytes);
249            bytes.extend_from_slice(piece.as_ref());
250            bytes.extend_from_slice(
251                blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
252            );
253
254            move || file.write_all_at(&bytes, element_offset)
255        });
256
257        AsyncJoinOnDrop::new(write_fut, false).await??;
258
259        // Just to be safe, avoid any overlap of read and write locks
260        drop(sectors_metadata);
261        // Store newly written piece in the map
262        self.cached_pieces
263            .write()
264            .map
265            .insert(RecordKey::from(piece_index.to_multihash()), offset);
266
267        Ok(true)
268    }
269
270    /// Read piece from cache.
271    ///
272    /// Returns `None` if not cached.
273    pub(crate) async fn read_piece(&self, key: &RecordKey) -> Option<Piece> {
274        let offset = self.cached_pieces.read().map.get(key).copied()?;
275
276        let file = self.file.upgrade()?;
277
278        let read_fn = move || {
279            let mut element = BytesMut::zeroed(Self::element_size() as usize);
280            if let Ok(Some(_piece_index)) = Self::read_piece_internal(&file, offset, &mut element) {
281                let element = element.freeze();
282                let piece =
283                    Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
284                        .expect("Correct length; qed");
285                Some(piece)
286            } else {
287                None
288            }
289        };
290        // TODO: On Windows spawning blocking task that allows concurrent reads causes huge memory
291        //  usage. No idea why it happens, but not spawning anything at all helps for some reason.
292        //  Someone at some point should figure it out and fix, but it will probably be not me
293        //  (Nazar).
294        //  See https://github.com/autonomys/subspace/issues/2813 and linked forum post for details.
295        //  This TODO exists in multiple files
296        let maybe_piece = if cfg!(windows) {
297            task::block_in_place(read_fn)
298        } else {
299            let read_fut = task::spawn_blocking(read_fn);
300
301            AsyncJoinOnDrop::new(read_fut, false)
302                .await
303                .unwrap_or_default()
304        };
305
306        if maybe_piece.is_none()
307            && let Some(sectors_metadata) = self.sectors_metadata.upgrade()
308        {
309            let plotted_sectors_count = sectors_metadata.read().await.len() as SectorIndex;
310
311            let mut cached_pieces = self.cached_pieces.write();
312            if plotted_sectors_count == self.target_sector_count {
313                // Free allocated memory once fully plotted
314                mem::take(&mut cached_pieces.map);
315            } else {
316                // Remove entry just in case it was overridden with a sector already
317                cached_pieces.map.remove(key);
318            }
319        }
320
321        maybe_piece
322    }
323
324    fn read_piece_internal(
325        file: &DirectIoFile,
326        offset: u32,
327        element: &mut [u8],
328    ) -> Result<Option<PieceIndex>, DiskPlotCacheError> {
329        file.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
330
331        let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
332        let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
333
334        // Verify checksum
335        let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece_bytes]);
336        if *actual_checksum != *expected_checksum {
337            if element.iter().all(|&byte| byte == 0) {
338                return Ok(None);
339            }
340
341            debug!(
342                actual_checksum = %hex::encode(actual_checksum),
343                expected_checksum = %hex::encode(expected_checksum),
344                "Hash doesn't match, corrupted or overridden piece in cache"
345            );
346
347            return Err(DiskPlotCacheError::ChecksumMismatch);
348        }
349
350        let piece_index = PieceIndex::from_bytes(
351            piece_index_bytes
352                .try_into()
353                .expect("Statically known to have correct size; qed"),
354        );
355        Ok(Some(piece_index))
356    }
357}