1mod metrics;
4#[cfg(test)]
5mod tests;
6
7use crate::disk_piece_cache::metrics::DiskPieceCacheMetrics;
8use crate::farm;
9use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
10use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE};
11use crate::utils::AsyncJoinOnDrop;
12use async_trait::async_trait;
13use bytes::BytesMut;
14use futures::channel::mpsc;
15use futures::{stream, SinkExt, Stream, StreamExt};
16use parking_lot::Mutex;
17use prometheus_client::registry::Registry;
18use std::num::NonZeroU32;
19use std::path::Path;
20use std::sync::atomic::{AtomicU8, Ordering};
21use std::sync::Arc;
22use std::task::Poll;
23use std::{fs, io};
24use subspace_core_primitives::hashes::{blake3_hash_list, Blake3Hash};
25use subspace_core_primitives::pieces::{Piece, PieceIndex};
26use subspace_farmer_components::file_ext::FileExt;
27use thiserror::Error;
28use tokio::runtime::Handle;
29use tokio::task;
30use tracing::{debug, info, warn, Span};
31
32const CONTENTS_READ_SKIP_LIMIT: usize = 3;
35const PIECES_READING_CONCURRENCY: usize = 32;
37
38#[derive(Debug, Error)]
40pub enum DiskPieceCacheError {
41 #[error("Disk piece cache I/O error: {0}")]
43 Io(#[from] io::Error),
44 #[error("Can't preallocate cache file, probably not enough space on disk: {0}")]
46 CantPreallocateCacheFile(io::Error),
47 #[error("Offset outsize of range: provided {provided}, max {max}")]
49 OffsetOutsideOfRange {
50 provided: u32,
52 max: u32,
54 },
55 #[error("Checksum mismatch")]
57 ChecksumMismatch,
58}
59
60#[derive(Debug)]
61struct FilePool {
62 files: Box<[DirectIoFile; PIECES_READING_CONCURRENCY]>,
63 cursor: AtomicU8,
64}
65
66impl FilePool {
67 fn open(path: &Path) -> io::Result<Self> {
68 let files = (0..PIECES_READING_CONCURRENCY)
69 .map(|_| DirectIoFile::open(path))
70 .collect::<Result<Box<_>, _>>()?
71 .try_into()
72 .expect("Statically correct length; qed");
73 Ok(Self {
74 files,
75 cursor: AtomicU8::new(0),
76 })
77 }
78
79 fn read(&self) -> &DirectIoFile {
80 let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
81 &self.files[position % PIECES_READING_CONCURRENCY]
82 }
83
84 fn write(&self) -> &DirectIoFile {
85 &self.files[0]
88 }
89}
90
91#[derive(Debug)]
92struct Inner {
93 id: PieceCacheId,
94 files: FilePool,
95 max_num_elements: u32,
96 metrics: Option<DiskPieceCacheMetrics>,
97}
98
99#[derive(Debug, Clone)]
104pub struct DiskPieceCache {
105 inner: Arc<Inner>,
106}
107
108#[async_trait]
109impl farm::PieceCache for DiskPieceCache {
110 fn id(&self) -> &PieceCacheId {
111 &self.inner.id
112 }
113
114 #[inline]
115 fn max_num_elements(&self) -> u32 {
116 self.inner.max_num_elements
117 }
118
119 async fn contents(
120 &self,
121 ) -> Result<
122 Box<
123 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
124 + Unpin
125 + Send
126 + '_,
127 >,
128 FarmError,
129 > {
130 let this = self.clone();
131 let (mut sender, receiver) = mpsc::channel(100_000);
132 let span = Span::current();
133 let read_contents = task::spawn_blocking(move || {
134 let _guard = span.enter();
135
136 let contents = this.contents();
137 for (piece_cache_offset, maybe_piece) in contents {
138 if let Err(error) =
139 Handle::current().block_on(sender.send(Ok((piece_cache_offset, maybe_piece))))
140 {
141 debug!(%error, "Aborting contents iteration due to receiver dropping");
142 break;
143 }
144 }
145 });
146 let read_contents = Mutex::new(Some(AsyncJoinOnDrop::new(read_contents, false)));
147 let mut receiver = receiver;
149
150 Ok(Box::new(stream::poll_fn(move |ctx| {
151 let poll_result = receiver.poll_next_unpin(ctx);
152
153 if matches!(poll_result, Poll::Ready(None)) {
154 read_contents.lock().take();
155 }
156
157 poll_result
158 })))
159 }
160
161 async fn write_piece(
162 &self,
163 offset: PieceCacheOffset,
164 piece_index: PieceIndex,
165 piece: &Piece,
166 ) -> Result<(), FarmError> {
167 let piece = piece.clone();
168 let piece_cache = self.clone();
169 Ok(AsyncJoinOnDrop::new(
170 task::spawn_blocking(move || piece_cache.write_piece(offset, piece_index, &piece)),
171 false,
172 )
173 .await??)
174 }
175
176 async fn read_piece_index(
177 &self,
178 offset: PieceCacheOffset,
179 ) -> Result<Option<PieceIndex>, FarmError> {
180 let piece_cache = self.clone();
181 let span = Span::current();
182 Ok(AsyncJoinOnDrop::new(
183 task::spawn_blocking(move || {
184 let _guard = span.enter();
185
186 piece_cache.read_piece_index(offset)
187 }),
188 false,
189 )
190 .await??)
191 }
192
193 async fn read_piece(
194 &self,
195 offset: PieceCacheOffset,
196 ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
197 let span = Span::current();
198
199 if cfg!(windows) {
206 Ok(task::block_in_place(|| {
207 let _guard = span.enter();
208
209 self.read_piece(offset)
210 })?)
211 } else {
212 let piece_cache = self.clone();
213 Ok(AsyncJoinOnDrop::new(
214 task::spawn_blocking(move || {
215 let _guard = span.enter();
216
217 piece_cache.read_piece(offset)
218 }),
219 false,
220 )
221 .await??)
222 }
223 }
224
225 async fn read_pieces(
226 &self,
227 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
228 ) -> Result<
229 Box<
230 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
231 + Send
232 + Unpin
233 + '_,
234 >,
235 FarmError,
236 > {
237 let iter = offsets.map(move |offset| async move {
238 Ok((offset, farm::PieceCache::read_piece(self, offset).await?))
239 });
240 Ok(Box::new(
241 stream::iter(iter).buffer_unordered(PIECES_READING_CONCURRENCY),
244 ))
245 }
246}
247
248impl DiskPieceCache {
249 pub(crate) const FILE_NAME: &'static str = "piece_cache.bin";
250
251 pub fn open(
253 directory: &Path,
254 capacity: NonZeroU32,
255 id: Option<PieceCacheId>,
256 registry: Option<&mut Registry>,
257 ) -> Result<Self, DiskPieceCacheError> {
258 let capacity = capacity.get();
259 let files = FilePool::open(&directory.join(Self::FILE_NAME))?;
260
261 let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
262 let expected_size =
264 expected_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
265 {
266 let file = files.write();
267 if file.size()? != expected_size {
268 file.preallocate(expected_size)
271 .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
272 file.set_len(expected_size)?;
274 }
275 }
276
277 let id = id.unwrap_or_else(PieceCacheId::new);
279 let metrics = registry.map(|registry| DiskPieceCacheMetrics::new(registry, &id, capacity));
280
281 Ok(Self {
282 inner: Arc::new(Inner {
283 id,
284 files,
285 max_num_elements: capacity,
286 metrics,
287 }),
288 })
289 }
290
291 pub const fn element_size() -> u32 {
293 (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
294 }
295
296 pub(crate) fn contents(
301 &self,
302 ) -> impl ExactSizeIterator<Item = (PieceCacheOffset, Option<PieceIndex>)> + '_ {
303 let mut element = vec![0; Self::element_size() as usize];
304 let count_total = self
305 .inner
306 .metrics
307 .as_ref()
308 .map(|metrics| {
309 metrics.contents.inc();
310 metrics.capacity_used.get() == 0
311 })
312 .unwrap_or_default();
313 let mut current_skip = 0;
314
315 (0..self.inner.max_num_elements).map(move |offset| {
317 if current_skip > CONTENTS_READ_SKIP_LIMIT {
318 return (PieceCacheOffset(offset), None);
319 }
320
321 match self.read_piece_internal(offset, &mut element) {
322 Ok(maybe_piece_index) => {
323 if maybe_piece_index.is_none() {
324 current_skip += 1;
325 } else {
326 if count_total && let Some(metrics) = &self.inner.metrics {
327 metrics.capacity_used.inc();
328 }
329 current_skip = 0;
330 }
331
332 (PieceCacheOffset(offset), maybe_piece_index)
333 }
334 Err(error) => {
335 warn!(%error, %offset, "Failed to read cache element");
336
337 current_skip += 1;
338
339 (PieceCacheOffset(offset), None)
340 }
341 }
342 })
343 }
344
345 pub(crate) fn write_piece(
350 &self,
351 offset: PieceCacheOffset,
352 piece_index: PieceIndex,
353 piece: &Piece,
354 ) -> Result<(), DiskPieceCacheError> {
355 let PieceCacheOffset(offset) = offset;
356 if offset >= self.inner.max_num_elements {
357 return Err(DiskPieceCacheError::OffsetOutsideOfRange {
358 provided: offset,
359 max: self.inner.max_num_elements - 1,
360 });
361 }
362
363 if let Some(metrics) = &self.inner.metrics {
364 metrics.write_piece.inc();
365 let capacity_used = i64::from(offset + 1);
366 if metrics.capacity_used.get() != capacity_used {
367 metrics.capacity_used.set(capacity_used);
368 }
369 }
370 let element_offset = u64::from(offset) * u64::from(Self::element_size());
371
372 let piece_index_bytes = piece_index.to_bytes();
373 let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
376 bytes.extend_from_slice(&piece_index_bytes);
377 bytes.extend_from_slice(piece.as_ref());
378 bytes.extend_from_slice(blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref());
379 self.inner
380 .files
381 .write()
382 .write_all_at(&bytes, element_offset)?;
383
384 Ok(())
385 }
386
387 pub(crate) fn read_piece_index(
394 &self,
395 offset: PieceCacheOffset,
396 ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
397 let PieceCacheOffset(offset) = offset;
398 if offset >= self.inner.max_num_elements {
399 warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
400 return Err(DiskPieceCacheError::OffsetOutsideOfRange {
401 provided: offset,
402 max: self.inner.max_num_elements - 1,
403 });
404 }
405
406 if let Some(metrics) = &self.inner.metrics {
407 metrics.read_piece_index.inc();
408 }
409 self.read_piece_internal(offset, &mut vec![0; Self::element_size() as usize])
410 }
411
412 pub(crate) fn read_piece(
419 &self,
420 offset: PieceCacheOffset,
421 ) -> Result<Option<(PieceIndex, Piece)>, DiskPieceCacheError> {
422 let PieceCacheOffset(offset) = offset;
423 if offset >= self.inner.max_num_elements {
424 warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
425 return Err(DiskPieceCacheError::OffsetOutsideOfRange {
426 provided: offset,
427 max: self.inner.max_num_elements - 1,
428 });
429 }
430
431 if let Some(metrics) = &self.inner.metrics {
432 metrics.read_piece.inc();
433 }
434 let mut element = BytesMut::zeroed(Self::element_size() as usize);
435 if let Some(piece_index) = self.read_piece_internal(offset, &mut element)? {
436 let element = element.freeze();
437 let piece =
438 Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
439 .expect("Correct length; qed");
440 Ok(Some((piece_index, piece)))
441 } else {
442 Ok(None)
443 }
444 }
445
446 fn read_piece_internal(
447 &self,
448 offset: u32,
449 element: &mut [u8],
450 ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
451 self.inner
452 .files
453 .read()
454 .read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
455
456 let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
457 let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
458
459 let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece_bytes]);
461 if *actual_checksum != *expected_checksum {
462 if element.iter().all(|&byte| byte == 0) {
463 return Ok(None);
464 }
465
466 debug!(
467 actual_checksum = %hex::encode(actual_checksum),
468 expected_checksum = %hex::encode(expected_checksum),
469 "Hash doesn't match, corrupted piece in cache"
470 );
471
472 return Err(DiskPieceCacheError::ChecksumMismatch);
473 }
474
475 let piece_index = PieceIndex::from_bytes(
476 piece_index_bytes
477 .try_into()
478 .expect("Statically known to have correct size; qed"),
479 );
480 Ok(Some(piece_index))
481 }
482
483 pub(crate) fn wipe(directory: &Path) -> io::Result<()> {
484 let piece_cache = directory.join(Self::FILE_NAME);
485 if !piece_cache.exists() {
486 return Ok(());
487 }
488 info!("Deleting piece cache file at {}", piece_cache.display());
489 fs::remove_file(piece_cache)
490 }
491}