subspace_farmer/single_disk_farm/
plot_cache.rs1#[cfg(test)]
4mod tests;
5
6use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache};
7use crate::single_disk_farm::direct_io_file::DirectIoFile;
8use crate::utils::AsyncJoinOnDrop;
9use async_lock::RwLock as AsyncRwLock;
10use async_trait::async_trait;
11use bytes::BytesMut;
12use parking_lot::RwLock;
13use std::collections::HashMap;
14use std::sync::{Arc, Weak};
15use std::{io, mem};
16use subspace_core_primitives::hashes::{blake3_hash_list, Blake3Hash};
17use subspace_core_primitives::pieces::{Piece, PieceIndex};
18use subspace_core_primitives::sectors::SectorIndex;
19use subspace_farmer_components::file_ext::FileExt;
20use subspace_farmer_components::sector::SectorMetadataChecksummed;
21use subspace_networking::libp2p::kad::RecordKey;
22use subspace_networking::utils::multihash::ToMultihash;
23use thiserror::Error;
24use tokio::task;
25use tracing::{debug, info, warn};
26
27#[derive(Debug, Error)]
29pub enum DiskPlotCacheError {
30 #[error("Plot cache I/O error: {0}")]
32 Io(#[from] io::Error),
33 #[error("Failed to spawn task for blocking thread: {0}")]
35 TokioJoinError(#[from] tokio::task::JoinError),
36 #[error("Checksum mismatch")]
38 ChecksumMismatch,
39}
40
41#[derive(Debug)]
42struct CachedPieces {
43 map: HashMap<RecordKey, u32>,
45 next_offset: Option<u32>,
46}
47
48#[derive(Debug, Clone)]
50pub struct DiskPlotCache {
51 file: Weak<DirectIoFile>,
52 sectors_metadata: Weak<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
53 cached_pieces: Arc<RwLock<CachedPieces>>,
54 target_sector_count: SectorIndex,
55 sector_size: u64,
56}
57
58#[async_trait]
59impl PlotCache for DiskPlotCache {
60 async fn is_piece_maybe_stored(
61 &self,
62 key: &RecordKey,
63 ) -> Result<MaybePieceStoredResult, FarmError> {
64 Ok(self.is_piece_maybe_stored(key))
65 }
66
67 async fn try_store_piece(
68 &self,
69 piece_index: PieceIndex,
70 piece: &Piece,
71 ) -> Result<bool, FarmError> {
72 Ok(self.try_store_piece(piece_index, piece).await?)
73 }
74
75 async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError> {
76 Ok(self.read_piece(key).await)
77 }
78}
79
80impl DiskPlotCache {
81 pub(crate) fn new(
82 file: &Arc<DirectIoFile>,
83 sectors_metadata: &Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
84 target_sector_count: SectorIndex,
85 sector_size: u64,
86 ) -> Self {
87 info!("Checking plot cache contents, this can take a while");
88 let cached_pieces = {
89 let sectors_metadata = sectors_metadata.read_blocking();
90 let mut element = vec![0; Self::element_size() as usize];
91 #[allow(clippy::mutable_key_type)]
93 let mut map = HashMap::new();
94 let mut next_offset = None;
95
96 let file_size = sector_size * u64::from(target_sector_count);
97 let plotted_size = sector_size * sectors_metadata.len() as u64;
98
99 let from_offset = (plotted_size / Self::element_size() as u64) as u32;
101 let to_offset = (file_size / Self::element_size() as u64) as u32;
102 for offset in (from_offset..to_offset).rev() {
104 match Self::read_piece_internal(file, offset, &mut element) {
105 Ok(maybe_piece_index) => match maybe_piece_index {
106 Some(piece_index) => {
107 map.insert(RecordKey::from(piece_index.to_multihash()), offset);
108 }
109 None => {
110 next_offset.replace(offset);
111 break;
112 }
113 },
114 Err(DiskPlotCacheError::ChecksumMismatch) => {
115 next_offset.replace(offset);
116 break;
117 }
118 Err(error) => {
119 warn!(%error, %offset, "Failed to read plot cache element");
120 break;
121 }
122 }
123 }
124
125 CachedPieces { map, next_offset }
126 };
127
128 info!("Finished checking plot cache contents");
129
130 Self {
131 file: Arc::downgrade(file),
132 sectors_metadata: Arc::downgrade(sectors_metadata),
133 cached_pieces: Arc::new(RwLock::new(cached_pieces)),
134 target_sector_count,
135 sector_size,
136 }
137 }
138
139 pub(crate) const fn element_size() -> u32 {
141 (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
142 }
143
144 pub(crate) fn is_piece_maybe_stored(&self, key: &RecordKey) -> MaybePieceStoredResult {
147 let offset = {
148 let cached_pieces = self.cached_pieces.read();
149
150 let Some(offset) = cached_pieces.map.get(key).copied() else {
151 return if cached_pieces.next_offset.is_some() {
152 MaybePieceStoredResult::Vacant
153 } else {
154 MaybePieceStoredResult::No
155 };
156 };
157
158 offset
159 };
160
161 let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
162 return MaybePieceStoredResult::No;
163 };
164
165 let element_offset = u64::from(offset) * u64::from(Self::element_size());
166 let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64;
168
169 if element_offset < plotted_bytes {
171 self.cached_pieces.write().map.remove(key);
173 MaybePieceStoredResult::No
174 } else {
175 MaybePieceStoredResult::Yes
176 }
177 }
178
179 pub(crate) async fn try_store_piece(
181 &self,
182 piece_index: PieceIndex,
183 piece: &Piece,
184 ) -> Result<bool, DiskPlotCacheError> {
185 let offset = {
186 let mut cached_pieces = self.cached_pieces.write();
187 let Some(next_offset) = cached_pieces.next_offset else {
188 return Ok(false);
189 };
190
191 let offset = next_offset;
192 cached_pieces.next_offset = offset.checked_sub(1);
193 offset
194 };
195
196 let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
197 return Ok(false);
198 };
199
200 let element_offset = u64::from(offset) * u64::from(Self::element_size());
201 let sectors_metadata = sectors_metadata.read().await;
202 let plotted_sectors_count = sectors_metadata.len() as SectorIndex;
203 let plotted_bytes = self.sector_size * u64::from(plotted_sectors_count);
204
205 if element_offset < plotted_bytes {
207 drop(sectors_metadata);
209 let mut cached_pieces = self.cached_pieces.write();
210 cached_pieces.next_offset.take();
212 if plotted_sectors_count == self.target_sector_count {
213 mem::take(&mut cached_pieces.map);
215 }
216 return Ok(false);
217 }
218
219 let Some(file) = self.file.upgrade() else {
220 return Ok(false);
221 };
222
223 let write_fut = tokio::task::spawn_blocking({
224 let piece_index_bytes = piece_index.to_bytes();
225 let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
228 bytes.extend_from_slice(&piece_index_bytes);
229 bytes.extend_from_slice(piece.as_ref());
230 bytes.extend_from_slice(
231 blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
232 );
233
234 move || file.write_all_at(&bytes, element_offset)
235 });
236
237 AsyncJoinOnDrop::new(write_fut, false).await??;
238
239 drop(sectors_metadata);
241 self.cached_pieces
243 .write()
244 .map
245 .insert(RecordKey::from(piece_index.to_multihash()), offset);
246
247 Ok(true)
248 }
249
250 pub(crate) async fn read_piece(&self, key: &RecordKey) -> Option<Piece> {
254 let offset = self.cached_pieces.read().map.get(key).copied()?;
255
256 let file = self.file.upgrade()?;
257
258 let read_fn = move || {
259 let mut element = BytesMut::zeroed(Self::element_size() as usize);
260 if let Ok(Some(_piece_index)) = Self::read_piece_internal(&file, offset, &mut element) {
261 let element = element.freeze();
262 let piece =
263 Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
264 .expect("Correct length; qed");
265 Some(piece)
266 } else {
267 None
268 }
269 };
270 let maybe_piece = if cfg!(windows) {
277 task::block_in_place(read_fn)
278 } else {
279 let read_fut = task::spawn_blocking(read_fn);
280
281 AsyncJoinOnDrop::new(read_fut, false)
282 .await
283 .unwrap_or_default()
284 };
285
286 if maybe_piece.is_none()
287 && let Some(sectors_metadata) = self.sectors_metadata.upgrade()
288 {
289 let plotted_sectors_count = sectors_metadata.read().await.len() as SectorIndex;
290
291 let mut cached_pieces = self.cached_pieces.write();
292 if plotted_sectors_count == self.target_sector_count {
293 mem::take(&mut cached_pieces.map);
295 } else {
296 cached_pieces.map.remove(key);
298 }
299 }
300
301 maybe_piece
302 }
303
304 fn read_piece_internal(
305 file: &DirectIoFile,
306 offset: u32,
307 element: &mut [u8],
308 ) -> Result<Option<PieceIndex>, DiskPlotCacheError> {
309 file.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
310
311 let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
312 let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
313
314 let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece_bytes]);
316 if *actual_checksum != *expected_checksum {
317 if element.iter().all(|&byte| byte == 0) {
318 return Ok(None);
319 }
320
321 debug!(
322 actual_checksum = %hex::encode(actual_checksum),
323 expected_checksum = %hex::encode(expected_checksum),
324 "Hash doesn't match, corrupted or overridden piece in cache"
325 );
326
327 return Err(DiskPlotCacheError::ChecksumMismatch);
328 }
329
330 let piece_index = PieceIndex::from_bytes(
331 piece_index_bytes
332 .try_into()
333 .expect("Statically known to have correct size; qed"),
334 );
335 Ok(Some(piece_index))
336 }
337}