subspace_farmer/single_disk_farm/farming/
rayon_files.rs

1//! Files abstraction that allows reading concurrently using thread pool
2
3use std::fs::{File, OpenOptions};
4use std::io;
5use std::path::Path;
6use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
7use subspace_farmer_components::ReadAtSync;
8
9/// Wrapper data structure for multiple files to be used with [`rayon`] thread pool, where the same
10/// file is opened multiple times, once for each thread for faster concurrent reads
11#[derive(Debug)]
12pub struct RayonFiles<File> {
13    files: Vec<File>,
14}
15
16impl<File> ReadAtSync for RayonFiles<File>
17where
18    File: ReadAtSync,
19{
20    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
21        let thread_index = rayon::current_thread_index().unwrap_or_default();
22        let file = self
23            .files
24            .get(thread_index)
25            .ok_or_else(|| io::Error::other("No files entry for this rayon thread"))?;
26
27        file.read_at(buf, offset)
28    }
29}
30
31impl<File> ReadAtSync for &RayonFiles<File>
32where
33    File: ReadAtSync,
34{
35    #[inline]
36    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
37        (*self).read_at(buf, offset)
38    }
39}
40
41impl RayonFiles<File> {
42    /// Open file at specified path as many times as there is number of threads in current [`rayon`]
43    /// thread pool.
44    pub fn open<P>(path: P) -> io::Result<Self>
45    where
46        P: AsRef<Path>,
47    {
48        let files = (0..rayon::current_num_threads())
49            .map(|_| {
50                let file = OpenOptions::new()
51                    .read(true)
52                    .advise_random_access()
53                    .open(path.as_ref())?;
54                file.advise_random_access()?;
55
56                Ok::<_, io::Error>(file)
57            })
58            .collect::<Result<Vec<_>, _>>()?;
59
60        Ok(Self { files })
61    }
62}
63
64impl<File> RayonFiles<File>
65where
66    File: ReadAtSync,
67{
68    /// Open file at specified path as many times as there is number of threads in current [`rayon`]
69    /// thread pool with a provided function
70    pub fn open_with<P>(path: P, open: fn(&Path) -> io::Result<File>) -> io::Result<Self>
71    where
72        P: AsRef<Path>,
73    {
74        let files = (0..rayon::current_num_threads())
75            .map(|_| open(path.as_ref()))
76            .collect::<Result<Vec<_>, _>>()?;
77
78        Ok(Self { files })
79    }
80}