subspace_farmer/single_disk_farm/
plot_cache.rs1#[cfg(test)]
4mod tests;
5
6use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache};
7use crate::single_disk_farm::direct_io_file::DirectIoFile;
8use async_lock::RwLock as AsyncRwLock;
9use async_trait::async_trait;
10use bytes::BytesMut;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::{Arc, Weak};
14use std::{io, mem};
15use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash_list};
16use subspace_core_primitives::pieces::{Piece, PieceIndex};
17use subspace_core_primitives::sectors::SectorIndex;
18use subspace_farmer_components::file_ext::FileExt;
19use subspace_farmer_components::sector::SectorMetadataChecksummed;
20use subspace_networking::libp2p::kad::RecordKey;
21use subspace_networking::utils::multihash::ToMultihash;
22use subspace_process::AsyncJoinOnDrop;
23use thiserror::Error;
24use tokio::task;
25use tracing::{debug, info, trace, warn};
26
27#[derive(Debug, Error)]
29pub enum DiskPlotCacheError {
30 #[error("Plot cache I/O error: {0}")]
32 Io(#[from] io::Error),
33 #[error("Failed to spawn task for blocking thread: {0}")]
35 TokioJoinError(#[from] tokio::task::JoinError),
36 #[error("Checksum mismatch")]
38 ChecksumMismatch,
39}
40
41#[derive(Debug)]
42struct CachedPieces {
43 map: HashMap<RecordKey, u32>,
45 next_offset: Option<u32>,
46}
47
48#[derive(Debug, Clone)]
50pub struct DiskPlotCache {
51 file: Weak<DirectIoFile>,
52 sectors_metadata: Weak<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
53 cached_pieces: Arc<RwLock<CachedPieces>>,
54 target_sector_count: SectorIndex,
55 sector_size: u64,
56}
57
58#[async_trait]
59impl PlotCache for DiskPlotCache {
60 async fn is_piece_maybe_stored(
61 &self,
62 key: &RecordKey,
63 ) -> Result<MaybePieceStoredResult, FarmError> {
64 Ok(self.is_piece_maybe_stored(key))
65 }
66
67 async fn try_store_piece(
70 &self,
71 piece_index: PieceIndex,
72 piece: &Piece,
73 ) -> Result<bool, FarmError> {
74 Ok(self.try_store_piece(piece_index, piece).await?)
75 }
76
77 async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError> {
78 Ok(self.read_piece(key).await)
79 }
80}
81
82impl DiskPlotCache {
83 pub(crate) fn new(
84 file: &Arc<DirectIoFile>,
85 sectors_metadata: &Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
86 target_sector_count: SectorIndex,
87 sector_size: u64,
88 ) -> Self {
89 info!("Checking plot cache contents, this can take a while");
90 let cached_pieces = {
91 let sectors_metadata = sectors_metadata.read_blocking();
92 let mut element = vec![0; Self::element_size() as usize];
93 #[allow(clippy::mutable_key_type)]
95 let mut map = HashMap::new();
96 let mut next_offset = None;
97
98 let file_size = sector_size * u64::from(target_sector_count);
99 let plotted_size = sector_size * sectors_metadata.len() as u64;
100
101 let from_offset = (plotted_size / Self::element_size() as u64) as u32;
103 let to_offset = (file_size / Self::element_size() as u64) as u32;
104 for offset in (from_offset..to_offset).rev() {
106 match Self::read_piece_internal(file, offset, &mut element) {
107 Ok(maybe_piece_index) => match maybe_piece_index {
108 Some(piece_index) => {
109 map.insert(RecordKey::from(piece_index.to_multihash()), offset);
110 }
111 None => {
112 next_offset.replace(offset);
113 break;
114 }
115 },
116 Err(DiskPlotCacheError::ChecksumMismatch) => {
117 next_offset.replace(offset);
118 break;
119 }
120 Err(error) => {
121 warn!(%error, %offset, "Failed to read plot cache element");
122 break;
123 }
124 }
125 }
126
127 CachedPieces { map, next_offset }
128 };
129
130 info!("Finished checking plot cache contents");
131
132 Self {
133 file: Arc::downgrade(file),
134 sectors_metadata: Arc::downgrade(sectors_metadata),
135 cached_pieces: Arc::new(RwLock::new(cached_pieces)),
136 target_sector_count,
137 sector_size,
138 }
139 }
140
141 pub(crate) const fn element_size() -> u32 {
143 (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
144 }
145
146 pub(crate) fn is_piece_maybe_stored(&self, key: &RecordKey) -> MaybePieceStoredResult {
149 let offset = {
150 let cached_pieces = self.cached_pieces.read();
151
152 let Some(offset) = cached_pieces.map.get(key).copied() else {
153 return if cached_pieces.next_offset.is_some() {
154 MaybePieceStoredResult::Vacant
155 } else {
156 MaybePieceStoredResult::No
157 };
158 };
159
160 offset
161 };
162
163 let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
164 return MaybePieceStoredResult::No;
165 };
166
167 let element_offset = u64::from(offset) * u64::from(Self::element_size());
168 let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64;
170
171 if element_offset < plotted_bytes {
173 self.cached_pieces.write().map.remove(key);
175 MaybePieceStoredResult::No
176 } else {
177 MaybePieceStoredResult::Yes
178 }
179 }
180
181 pub(crate) async fn try_store_piece(
184 &self,
185 piece_index: PieceIndex,
186 piece: &Piece,
187 ) -> Result<bool, DiskPlotCacheError> {
188 let offset = {
189 if self.cached_pieces.read().next_offset.is_none() {
192 return Ok(false);
193 };
194
195 let mut cached_pieces = self.cached_pieces.write();
198 let Some(next_offset) = cached_pieces.next_offset else {
199 return Ok(false);
200 };
201
202 let offset = next_offset;
203 cached_pieces.next_offset = offset.checked_sub(1);
204 offset
205 };
206
207 let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
208 return Ok(false);
210 };
211
212 let element_offset = u64::from(offset) * u64::from(Self::element_size());
213 let sectors_metadata = sectors_metadata.read().await;
214 let plotted_sectors_count = sectors_metadata.len() as SectorIndex;
215 let plotted_bytes = self.sector_size * u64::from(plotted_sectors_count);
216
217 if element_offset < plotted_bytes {
219 drop(sectors_metadata);
221 let mut cached_pieces = self.cached_pieces.write();
222 cached_pieces.next_offset.take();
224 if plotted_sectors_count == self.target_sector_count {
225 mem::take(&mut cached_pieces.map);
227 }
228 return Ok(false);
229 }
230
231 let Some(file) = self.file.upgrade() else {
232 return Ok(false);
234 };
235
236 trace!(
237 %offset,
238 ?piece_index,
239 %plotted_sectors_count,
240 "Found available piece cache free space offset, writing piece",
241 );
242
243 let write_fut = tokio::task::spawn_blocking({
244 let piece_index_bytes = piece_index.to_bytes();
245 let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
248 bytes.extend_from_slice(&piece_index_bytes);
249 bytes.extend_from_slice(piece.as_ref());
250 bytes.extend_from_slice(
251 blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
252 );
253
254 move || file.write_all_at(&bytes, element_offset)
255 });
256
257 AsyncJoinOnDrop::new(write_fut, false).await??;
258
259 drop(sectors_metadata);
261 self.cached_pieces
263 .write()
264 .map
265 .insert(RecordKey::from(piece_index.to_multihash()), offset);
266
267 Ok(true)
268 }
269
270 pub(crate) async fn read_piece(&self, key: &RecordKey) -> Option<Piece> {
274 let offset = self.cached_pieces.read().map.get(key).copied()?;
275
276 let file = self.file.upgrade()?;
277
278 let read_fn = move || {
279 let mut element = BytesMut::zeroed(Self::element_size() as usize);
280 if let Ok(Some(_piece_index)) = Self::read_piece_internal(&file, offset, &mut element) {
281 let element = element.freeze();
282 let piece =
283 Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
284 .expect("Correct length; qed");
285 Some(piece)
286 } else {
287 None
288 }
289 };
290 let maybe_piece = if cfg!(windows) {
297 task::block_in_place(read_fn)
298 } else {
299 let read_fut = task::spawn_blocking(read_fn);
300
301 AsyncJoinOnDrop::new(read_fut, false)
302 .await
303 .unwrap_or_default()
304 };
305
306 if maybe_piece.is_none()
307 && let Some(sectors_metadata) = self.sectors_metadata.upgrade()
308 {
309 let plotted_sectors_count = sectors_metadata.read().await.len() as SectorIndex;
310
311 let mut cached_pieces = self.cached_pieces.write();
312 if plotted_sectors_count == self.target_sector_count {
313 mem::take(&mut cached_pieces.map);
315 } else {
316 cached_pieces.map.remove(key);
318 }
319 }
320
321 maybe_piece
322 }
323
324 fn read_piece_internal(
325 file: &DirectIoFile,
326 offset: u32,
327 element: &mut [u8],
328 ) -> Result<Option<PieceIndex>, DiskPlotCacheError> {
329 file.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
330
331 let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
332 let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
333
334 let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece_bytes]);
336 if *actual_checksum != *expected_checksum {
337 if element.iter().all(|&byte| byte == 0) {
338 return Ok(None);
339 }
340
341 debug!(
342 actual_checksum = %hex::encode(actual_checksum),
343 expected_checksum = %hex::encode(expected_checksum),
344 "Hash doesn't match, corrupted or overridden piece in cache"
345 );
346
347 return Err(DiskPlotCacheError::ChecksumMismatch);
348 }
349
350 let piece_index = PieceIndex::from_bytes(
351 piece_index_bytes
352 .try_into()
353 .expect("Statically known to have correct size; qed"),
354 );
355 Ok(Some(piece_index))
356 }
357}