subspace_farmer/single_disk_farm/
plot_cache.rs

1//! Plot cache for single disk farm
2
3#[cfg(test)]
4mod tests;
5
6use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache};
7use crate::single_disk_farm::direct_io_file::DirectIoFile;
8use crate::utils::AsyncJoinOnDrop;
9use async_lock::RwLock as AsyncRwLock;
10use async_trait::async_trait;
11use bytes::BytesMut;
12use parking_lot::RwLock;
13use std::collections::HashMap;
14use std::sync::{Arc, Weak};
15use std::{io, mem};
16use subspace_core_primitives::hashes::{blake3_hash_list, Blake3Hash};
17use subspace_core_primitives::pieces::{Piece, PieceIndex};
18use subspace_core_primitives::sectors::SectorIndex;
19use subspace_farmer_components::file_ext::FileExt;
20use subspace_farmer_components::sector::SectorMetadataChecksummed;
21use subspace_networking::libp2p::kad::RecordKey;
22use subspace_networking::utils::multihash::ToMultihash;
23use thiserror::Error;
24use tokio::task;
25use tracing::{debug, info, warn};
26
27/// Disk plot cache open error
28#[derive(Debug, Error)]
29pub enum DiskPlotCacheError {
30    /// I/O error occurred
31    #[error("Plot cache I/O error: {0}")]
32    Io(#[from] io::Error),
33    /// Failed to spawn task for blocking thread
34    #[error("Failed to spawn task for blocking thread: {0}")]
35    TokioJoinError(#[from] tokio::task::JoinError),
36    /// Checksum mismatch
37    #[error("Checksum mismatch")]
38    ChecksumMismatch,
39}
40
41#[derive(Debug)]
42struct CachedPieces {
43    /// Map of piece index into offset
44    map: HashMap<RecordKey, u32>,
45    next_offset: Option<u32>,
46}
47
48/// Additional piece cache that exploit part of the plot that does not contain sectors yet
49#[derive(Debug, Clone)]
50pub struct DiskPlotCache {
51    file: Weak<DirectIoFile>,
52    sectors_metadata: Weak<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
53    cached_pieces: Arc<RwLock<CachedPieces>>,
54    target_sector_count: SectorIndex,
55    sector_size: u64,
56}
57
58#[async_trait]
59impl PlotCache for DiskPlotCache {
60    async fn is_piece_maybe_stored(
61        &self,
62        key: &RecordKey,
63    ) -> Result<MaybePieceStoredResult, FarmError> {
64        Ok(self.is_piece_maybe_stored(key))
65    }
66
67    async fn try_store_piece(
68        &self,
69        piece_index: PieceIndex,
70        piece: &Piece,
71    ) -> Result<bool, FarmError> {
72        Ok(self.try_store_piece(piece_index, piece).await?)
73    }
74
75    async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError> {
76        Ok(self.read_piece(key).await)
77    }
78}
79
80impl DiskPlotCache {
81    pub(crate) fn new(
82        file: &Arc<DirectIoFile>,
83        sectors_metadata: &Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
84        target_sector_count: SectorIndex,
85        sector_size: u64,
86    ) -> Self {
87        info!("Checking plot cache contents, this can take a while");
88        let cached_pieces = {
89            let sectors_metadata = sectors_metadata.read_blocking();
90            let mut element = vec![0; Self::element_size() as usize];
91            // Clippy complains about `RecordKey`, but it is not changing here, so it is fine
92            #[allow(clippy::mutable_key_type)]
93            let mut map = HashMap::new();
94            let mut next_offset = None;
95
96            let file_size = sector_size * u64::from(target_sector_count);
97            let plotted_size = sector_size * sectors_metadata.len() as u64;
98
99            // Step over all free potential offsets for pieces that could have been cached
100            let from_offset = (plotted_size / Self::element_size() as u64) as u32;
101            let to_offset = (file_size / Self::element_size() as u64) as u32;
102            // TODO: Parallelize or read in larger batches
103            for offset in (from_offset..to_offset).rev() {
104                match Self::read_piece_internal(file, offset, &mut element) {
105                    Ok(maybe_piece_index) => match maybe_piece_index {
106                        Some(piece_index) => {
107                            map.insert(RecordKey::from(piece_index.to_multihash()), offset);
108                        }
109                        None => {
110                            next_offset.replace(offset);
111                            break;
112                        }
113                    },
114                    Err(DiskPlotCacheError::ChecksumMismatch) => {
115                        next_offset.replace(offset);
116                        break;
117                    }
118                    Err(error) => {
119                        warn!(%error, %offset, "Failed to read plot cache element");
120                        break;
121                    }
122                }
123            }
124
125            CachedPieces { map, next_offset }
126        };
127
128        info!("Finished checking plot cache contents");
129
130        Self {
131            file: Arc::downgrade(file),
132            sectors_metadata: Arc::downgrade(sectors_metadata),
133            cached_pieces: Arc::new(RwLock::new(cached_pieces)),
134            target_sector_count,
135            sector_size,
136        }
137    }
138
139    /// Size of a single plot cache element
140    pub(crate) const fn element_size() -> u32 {
141        (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
142    }
143
144    /// Check if piece is potentially stored in this cache (not guaranteed to be because it might be
145    /// overridden with sector any time)
146    pub(crate) fn is_piece_maybe_stored(&self, key: &RecordKey) -> MaybePieceStoredResult {
147        let offset = {
148            let cached_pieces = self.cached_pieces.read();
149
150            let Some(offset) = cached_pieces.map.get(key).copied() else {
151                return if cached_pieces.next_offset.is_some() {
152                    MaybePieceStoredResult::Vacant
153                } else {
154                    MaybePieceStoredResult::No
155                };
156            };
157
158            offset
159        };
160
161        let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
162            return MaybePieceStoredResult::No;
163        };
164
165        let element_offset = u64::from(offset) * u64::from(Self::element_size());
166        // Blocking read is fine because writes in farmer are very rare and very brief
167        let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64;
168
169        // Make sure offset is after anything that is already plotted
170        if element_offset < plotted_bytes {
171            // Remove entry since it was overridden with a sector already
172            self.cached_pieces.write().map.remove(key);
173            MaybePieceStoredResult::No
174        } else {
175            MaybePieceStoredResult::Yes
176        }
177    }
178
179    /// Store piece in cache if there is free space, otherwise `Ok(false)` is returned
180    pub(crate) async fn try_store_piece(
181        &self,
182        piece_index: PieceIndex,
183        piece: &Piece,
184    ) -> Result<bool, DiskPlotCacheError> {
185        let offset = {
186            let mut cached_pieces = self.cached_pieces.write();
187            let Some(next_offset) = cached_pieces.next_offset else {
188                return Ok(false);
189            };
190
191            let offset = next_offset;
192            cached_pieces.next_offset = offset.checked_sub(1);
193            offset
194        };
195
196        let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
197            return Ok(false);
198        };
199
200        let element_offset = u64::from(offset) * u64::from(Self::element_size());
201        let sectors_metadata = sectors_metadata.read().await;
202        let plotted_sectors_count = sectors_metadata.len() as SectorIndex;
203        let plotted_bytes = self.sector_size * u64::from(plotted_sectors_count);
204
205        // Make sure offset is after anything that is already plotted
206        if element_offset < plotted_bytes {
207            // Just to be safe, avoid any overlap of write locks
208            drop(sectors_metadata);
209            let mut cached_pieces = self.cached_pieces.write();
210            // No space to store more pieces anymore
211            cached_pieces.next_offset.take();
212            if plotted_sectors_count == self.target_sector_count {
213                // Free allocated memory once fully plotted
214                mem::take(&mut cached_pieces.map);
215            }
216            return Ok(false);
217        }
218
219        let Some(file) = self.file.upgrade() else {
220            return Ok(false);
221        };
222
223        let write_fut = tokio::task::spawn_blocking({
224            let piece_index_bytes = piece_index.to_bytes();
225            // File writes are read/write/modify internally, so combine all data here for more
226            // efficient write
227            let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
228            bytes.extend_from_slice(&piece_index_bytes);
229            bytes.extend_from_slice(piece.as_ref());
230            bytes.extend_from_slice(
231                blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
232            );
233
234            move || file.write_all_at(&bytes, element_offset)
235        });
236
237        AsyncJoinOnDrop::new(write_fut, false).await??;
238
239        // Just to be safe, avoid any overlap of write locks
240        drop(sectors_metadata);
241        // Store newly written piece in the map
242        self.cached_pieces
243            .write()
244            .map
245            .insert(RecordKey::from(piece_index.to_multihash()), offset);
246
247        Ok(true)
248    }
249
250    /// Read piece from cache.
251    ///
252    /// Returns `None` if not cached.
253    pub(crate) async fn read_piece(&self, key: &RecordKey) -> Option<Piece> {
254        let offset = self.cached_pieces.read().map.get(key).copied()?;
255
256        let file = self.file.upgrade()?;
257
258        let read_fn = move || {
259            let mut element = BytesMut::zeroed(Self::element_size() as usize);
260            if let Ok(Some(_piece_index)) = Self::read_piece_internal(&file, offset, &mut element) {
261                let element = element.freeze();
262                let piece =
263                    Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
264                        .expect("Correct length; qed");
265                Some(piece)
266            } else {
267                None
268            }
269        };
270        // TODO: On Windows spawning blocking task that allows concurrent reads causes huge memory
271        //  usage. No idea why it happens, but not spawning anything at all helps for some reason.
272        //  Someone at some point should figure it out and fix, but it will probably be not me
273        //  (Nazar).
274        //  See https://github.com/autonomys/subspace/issues/2813 and linked forum post for details.
275        //  This TODO exists in multiple files
276        let maybe_piece = if cfg!(windows) {
277            task::block_in_place(read_fn)
278        } else {
279            let read_fut = task::spawn_blocking(read_fn);
280
281            AsyncJoinOnDrop::new(read_fut, false)
282                .await
283                .unwrap_or_default()
284        };
285
286        if maybe_piece.is_none()
287            && let Some(sectors_metadata) = self.sectors_metadata.upgrade()
288        {
289            let plotted_sectors_count = sectors_metadata.read().await.len() as SectorIndex;
290
291            let mut cached_pieces = self.cached_pieces.write();
292            if plotted_sectors_count == self.target_sector_count {
293                // Free allocated memory once fully plotted
294                mem::take(&mut cached_pieces.map);
295            } else {
296                // Remove entry just in case it was overridden with a sector already
297                cached_pieces.map.remove(key);
298            }
299        }
300
301        maybe_piece
302    }
303
304    fn read_piece_internal(
305        file: &DirectIoFile,
306        offset: u32,
307        element: &mut [u8],
308    ) -> Result<Option<PieceIndex>, DiskPlotCacheError> {
309        file.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
310
311        let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
312        let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
313
314        // Verify checksum
315        let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece_bytes]);
316        if *actual_checksum != *expected_checksum {
317            if element.iter().all(|&byte| byte == 0) {
318                return Ok(None);
319            }
320
321            debug!(
322                actual_checksum = %hex::encode(actual_checksum),
323                expected_checksum = %hex::encode(expected_checksum),
324                "Hash doesn't match, corrupted or overridden piece in cache"
325            );
326
327            return Err(DiskPlotCacheError::ChecksumMismatch);
328        }
329
330        let piece_index = PieceIndex::from_bytes(
331            piece_index_bytes
332                .try_into()
333                .expect("Statically known to have correct size; qed"),
334        );
335        Ok(Some(piece_index))
336    }
337}