subspace_farmer/
disk_piece_cache.rs

1//! Disk piece cache implementation
2
3mod metrics;
4#[cfg(test)]
5mod tests;
6
7use crate::disk_piece_cache::metrics::DiskPieceCacheMetrics;
8use crate::farm;
9use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
10use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE};
11use crate::utils::AsyncJoinOnDrop;
12use async_trait::async_trait;
13use bytes::BytesMut;
14use futures::channel::mpsc;
15use futures::{stream, SinkExt, Stream, StreamExt};
16use parking_lot::Mutex;
17use prometheus_client::registry::Registry;
18use std::num::NonZeroU32;
19use std::path::Path;
20use std::sync::atomic::{AtomicU8, Ordering};
21use std::sync::Arc;
22use std::task::Poll;
23use std::{fs, io};
24use subspace_core_primitives::hashes::{blake3_hash_list, Blake3Hash};
25use subspace_core_primitives::pieces::{Piece, PieceIndex};
26use subspace_farmer_components::file_ext::FileExt;
27use thiserror::Error;
28use tokio::runtime::Handle;
29use tokio::task;
30use tracing::{debug, info, warn, Span};
31
32/// How many pieces should be skipped before stopping to check the rest of contents, this allows to
33/// not miss most of the pieces after one or two corrupted pieces
34const CONTENTS_READ_SKIP_LIMIT: usize = 3;
35/// How many piece to read from disk at the same time (using tokio thread pool)
36const PIECES_READING_CONCURRENCY: usize = 32;
37
38/// Disk piece cache open error
39#[derive(Debug, Error)]
40pub enum DiskPieceCacheError {
41    /// I/O error occurred
42    #[error("Disk piece cache I/O error: {0}")]
43    Io(#[from] io::Error),
44    /// Can't preallocate cache file, probably not enough space on disk
45    #[error("Can't preallocate cache file, probably not enough space on disk: {0}")]
46    CantPreallocateCacheFile(io::Error),
47    /// Offset outsize of range
48    #[error("Offset outsize of range: provided {provided}, max {max}")]
49    OffsetOutsideOfRange {
50        /// Provided offset
51        provided: u32,
52        /// Max offset
53        max: u32,
54    },
55    /// Checksum mismatch
56    #[error("Checksum mismatch")]
57    ChecksumMismatch,
58}
59
60#[derive(Debug)]
61struct FilePool {
62    files: Box<[DirectIoFile; PIECES_READING_CONCURRENCY]>,
63    cursor: AtomicU8,
64}
65
66impl FilePool {
67    fn open(path: &Path) -> io::Result<Self> {
68        let files = (0..PIECES_READING_CONCURRENCY)
69            .map(|_| DirectIoFile::open(path))
70            .collect::<Result<Box<_>, _>>()?
71            .try_into()
72            .expect("Statically correct length; qed");
73        Ok(Self {
74            files,
75            cursor: AtomicU8::new(0),
76        })
77    }
78
79    fn read(&self) -> &DirectIoFile {
80        let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
81        &self.files[position % PIECES_READING_CONCURRENCY]
82    }
83
84    fn write(&self) -> &DirectIoFile {
85        // Always the same file or else overlapping writes will be corrupted due to
86        // read/modify/write internals, which are in turn caused by alignment requirements
87        &self.files[0]
88    }
89}
90
91#[derive(Debug)]
92struct Inner {
93    id: PieceCacheId,
94    files: FilePool,
95    max_num_elements: u32,
96    metrics: Option<DiskPieceCacheMetrics>,
97}
98
99/// Dedicated piece cache stored on one disk, is used both to accelerate DSN queries and to plot
100/// faster.
101///
102/// Implementation is backed by a file on disk.
103#[derive(Debug, Clone)]
104pub struct DiskPieceCache {
105    inner: Arc<Inner>,
106}
107
108#[async_trait]
109impl farm::PieceCache for DiskPieceCache {
110    fn id(&self) -> &PieceCacheId {
111        &self.inner.id
112    }
113
114    #[inline]
115    fn max_num_elements(&self) -> u32 {
116        self.inner.max_num_elements
117    }
118
119    async fn contents(
120        &self,
121    ) -> Result<
122        Box<
123            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
124                + Unpin
125                + Send
126                + '_,
127        >,
128        FarmError,
129    > {
130        let this = self.clone();
131        let (mut sender, receiver) = mpsc::channel(100_000);
132        let span = Span::current();
133        let read_contents = task::spawn_blocking(move || {
134            let _guard = span.enter();
135
136            let contents = this.contents();
137            for (piece_cache_offset, maybe_piece) in contents {
138                if let Err(error) =
139                    Handle::current().block_on(sender.send(Ok((piece_cache_offset, maybe_piece))))
140                {
141                    debug!(%error, "Aborting contents iteration due to receiver dropping");
142                    break;
143                }
144            }
145        });
146        let read_contents = Mutex::new(Some(AsyncJoinOnDrop::new(read_contents, false)));
147        // Change order such that in closure below `receiver` is dropped before `read_contents`
148        let mut receiver = receiver;
149
150        Ok(Box::new(stream::poll_fn(move |ctx| {
151            let poll_result = receiver.poll_next_unpin(ctx);
152
153            if matches!(poll_result, Poll::Ready(None)) {
154                read_contents.lock().take();
155            }
156
157            poll_result
158        })))
159    }
160
161    async fn write_piece(
162        &self,
163        offset: PieceCacheOffset,
164        piece_index: PieceIndex,
165        piece: &Piece,
166    ) -> Result<(), FarmError> {
167        let piece = piece.clone();
168        let piece_cache = self.clone();
169        Ok(AsyncJoinOnDrop::new(
170            task::spawn_blocking(move || piece_cache.write_piece(offset, piece_index, &piece)),
171            false,
172        )
173        .await??)
174    }
175
176    async fn read_piece_index(
177        &self,
178        offset: PieceCacheOffset,
179    ) -> Result<Option<PieceIndex>, FarmError> {
180        let piece_cache = self.clone();
181        let span = Span::current();
182        Ok(AsyncJoinOnDrop::new(
183            task::spawn_blocking(move || {
184                let _guard = span.enter();
185
186                piece_cache.read_piece_index(offset)
187            }),
188            false,
189        )
190        .await??)
191    }
192
193    async fn read_piece(
194        &self,
195        offset: PieceCacheOffset,
196    ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
197        let span = Span::current();
198
199        // TODO: On Windows spawning blocking task that allows concurrent reads causes huge memory
200        //  usage. No idea why it happens, but not spawning anything at all helps for some reason.
201        //  Someone at some point should figure it out and fix, but it will probably be not me
202        //  (Nazar).
203        //  See https://github.com/autonomys/subspace/issues/2813 and linked forum post for details.
204        //  This TODO exists in multiple files
205        if cfg!(windows) {
206            Ok(task::block_in_place(|| {
207                let _guard = span.enter();
208
209                self.read_piece(offset)
210            })?)
211        } else {
212            let piece_cache = self.clone();
213            Ok(AsyncJoinOnDrop::new(
214                task::spawn_blocking(move || {
215                    let _guard = span.enter();
216
217                    piece_cache.read_piece(offset)
218                }),
219                false,
220            )
221            .await??)
222        }
223    }
224
225    async fn read_pieces(
226        &self,
227        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
228    ) -> Result<
229        Box<
230            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
231                + Send
232                + Unpin
233                + '_,
234        >,
235        FarmError,
236    > {
237        let iter = offsets.map(move |offset| async move {
238            Ok((offset, farm::PieceCache::read_piece(self, offset).await?))
239        });
240        Ok(Box::new(
241            // Constrain concurrency to avoid excessive memory usage, while still getting
242            // performance of concurrent reads
243            stream::iter(iter).buffer_unordered(PIECES_READING_CONCURRENCY),
244        ))
245    }
246}
247
248impl DiskPieceCache {
249    pub(crate) const FILE_NAME: &'static str = "piece_cache.bin";
250
251    /// Open cache, capacity is measured in elements of [`DiskPieceCache::element_size()`] size
252    pub fn open(
253        directory: &Path,
254        capacity: NonZeroU32,
255        id: Option<PieceCacheId>,
256        registry: Option<&mut Registry>,
257    ) -> Result<Self, DiskPieceCacheError> {
258        let capacity = capacity.get();
259        let files = FilePool::open(&directory.join(Self::FILE_NAME))?;
260
261        let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
262        // Align plot file size for disk sector size
263        let expected_size =
264            expected_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
265        {
266            let file = files.write();
267            if file.size()? != expected_size {
268                // Allocating the whole file (`set_len` below can create a sparse file, which will cause
269                // writes to fail later)
270                file.preallocate(expected_size)
271                    .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
272                // Truncating file (if necessary)
273                file.set_len(expected_size)?;
274            }
275        }
276
277        // ID for cache is ephemeral unless provided explicitly
278        let id = id.unwrap_or_else(PieceCacheId::new);
279        let metrics = registry.map(|registry| DiskPieceCacheMetrics::new(registry, &id, capacity));
280
281        Ok(Self {
282            inner: Arc::new(Inner {
283                id,
284                files,
285                max_num_elements: capacity,
286                metrics,
287            }),
288        })
289    }
290
291    /// Size of a single piece cache element
292    pub const fn element_size() -> u32 {
293        (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
294    }
295
296    /// Contents of this piece cache
297    ///
298    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
299    /// doesn't happen for the same piece being accessed!
300    pub(crate) fn contents(
301        &self,
302    ) -> impl ExactSizeIterator<Item = (PieceCacheOffset, Option<PieceIndex>)> + '_ {
303        let mut element = vec![0; Self::element_size() as usize];
304        let count_total = self
305            .inner
306            .metrics
307            .as_ref()
308            .map(|metrics| {
309                metrics.contents.inc();
310                metrics.capacity_used.get() == 0
311            })
312            .unwrap_or_default();
313        let mut current_skip = 0;
314
315        // TODO: Parallelize or read in larger batches
316        (0..self.inner.max_num_elements).map(move |offset| {
317            if current_skip > CONTENTS_READ_SKIP_LIMIT {
318                return (PieceCacheOffset(offset), None);
319            }
320
321            match self.read_piece_internal(offset, &mut element) {
322                Ok(maybe_piece_index) => {
323                    if maybe_piece_index.is_none() {
324                        current_skip += 1;
325                    } else {
326                        if count_total && let Some(metrics) = &self.inner.metrics {
327                            metrics.capacity_used.inc();
328                        }
329                        current_skip = 0;
330                    }
331
332                    (PieceCacheOffset(offset), maybe_piece_index)
333                }
334                Err(error) => {
335                    warn!(%error, %offset, "Failed to read cache element");
336
337                    current_skip += 1;
338
339                    (PieceCacheOffset(offset), None)
340                }
341            }
342        })
343    }
344
345    /// Store piece in cache at specified offset, replacing existing piece if there is any
346    ///
347    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
348    /// doesn't happen for the same piece being accessed!
349    pub(crate) fn write_piece(
350        &self,
351        offset: PieceCacheOffset,
352        piece_index: PieceIndex,
353        piece: &Piece,
354    ) -> Result<(), DiskPieceCacheError> {
355        let PieceCacheOffset(offset) = offset;
356        if offset >= self.inner.max_num_elements {
357            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
358                provided: offset,
359                max: self.inner.max_num_elements - 1,
360            });
361        }
362
363        if let Some(metrics) = &self.inner.metrics {
364            metrics.write_piece.inc();
365            let capacity_used = i64::from(offset + 1);
366            if metrics.capacity_used.get() != capacity_used {
367                metrics.capacity_used.set(capacity_used);
368            }
369        }
370        let element_offset = u64::from(offset) * u64::from(Self::element_size());
371
372        let piece_index_bytes = piece_index.to_bytes();
373        // File writes are read/write/modify internally, so combine all data here for more efficient
374        // write
375        let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
376        bytes.extend_from_slice(&piece_index_bytes);
377        bytes.extend_from_slice(piece.as_ref());
378        bytes.extend_from_slice(blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref());
379        self.inner
380            .files
381            .write()
382            .write_all_at(&bytes, element_offset)?;
383
384        Ok(())
385    }
386
387    /// Read piece index from cache at specified offset.
388    ///
389    /// Returns `None` if offset is out of range.
390    ///
391    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
392    /// doesn't happen for the same piece being accessed!
393    pub(crate) fn read_piece_index(
394        &self,
395        offset: PieceCacheOffset,
396    ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
397        let PieceCacheOffset(offset) = offset;
398        if offset >= self.inner.max_num_elements {
399            warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
400            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
401                provided: offset,
402                max: self.inner.max_num_elements - 1,
403            });
404        }
405
406        if let Some(metrics) = &self.inner.metrics {
407            metrics.read_piece_index.inc();
408        }
409        self.read_piece_internal(offset, &mut vec![0; Self::element_size() as usize])
410    }
411
412    /// Read piece from cache at specified offset.
413    ///
414    /// Returns `None` if offset is out of range.
415    ///
416    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
417    /// doesn't happen for the same piece being accessed!
418    pub(crate) fn read_piece(
419        &self,
420        offset: PieceCacheOffset,
421    ) -> Result<Option<(PieceIndex, Piece)>, DiskPieceCacheError> {
422        let PieceCacheOffset(offset) = offset;
423        if offset >= self.inner.max_num_elements {
424            warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
425            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
426                provided: offset,
427                max: self.inner.max_num_elements - 1,
428            });
429        }
430
431        if let Some(metrics) = &self.inner.metrics {
432            metrics.read_piece.inc();
433        }
434        let mut element = BytesMut::zeroed(Self::element_size() as usize);
435        if let Some(piece_index) = self.read_piece_internal(offset, &mut element)? {
436            let element = element.freeze();
437            let piece =
438                Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
439                    .expect("Correct length; qed");
440            Ok(Some((piece_index, piece)))
441        } else {
442            Ok(None)
443        }
444    }
445
446    fn read_piece_internal(
447        &self,
448        offset: u32,
449        element: &mut [u8],
450    ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
451        self.inner
452            .files
453            .read()
454            .read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
455
456        let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
457        let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
458
459        // Verify checksum
460        let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece_bytes]);
461        if *actual_checksum != *expected_checksum {
462            if element.iter().all(|&byte| byte == 0) {
463                return Ok(None);
464            }
465
466            debug!(
467                actual_checksum = %hex::encode(actual_checksum),
468                expected_checksum = %hex::encode(expected_checksum),
469                "Hash doesn't match, corrupted piece in cache"
470            );
471
472            return Err(DiskPieceCacheError::ChecksumMismatch);
473        }
474
475        let piece_index = PieceIndex::from_bytes(
476            piece_index_bytes
477                .try_into()
478                .expect("Statically known to have correct size; qed"),
479        );
480        Ok(Some(piece_index))
481    }
482
483    pub(crate) fn wipe(directory: &Path) -> io::Result<()> {
484        let piece_cache = directory.join(Self::FILE_NAME);
485        if !piece_cache.exists() {
486            return Ok(());
487        }
488        info!("Deleting piece cache file at {}", piece_cache.display());
489        fs::remove_file(piece_cache)
490    }
491}