subspace_farmer/single_disk_farm/
direct_io_file.rs

1//! Wrapper data structure for direct/unbuffered I/O
2
3use parking_lot::Mutex;
4use static_assertions::const_assert_eq;
5use std::fs::{File, OpenOptions};
6use std::path::Path;
7use std::{io, mem};
8use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
9use subspace_farmer_components::ReadAtSync;
10
11/// 4096 is as a relatively safe size due to sector size on SSDs commonly being 512 or 4096 bytes
12pub const DISK_SECTOR_SIZE: usize = 4096;
13/// Restrict how much data to read from disk in a single call to avoid very large memory usage
14const MAX_READ_SIZE: usize = 1024 * 1024;
15
16const_assert_eq!(MAX_READ_SIZE % DISK_SECTOR_SIZE, 0);
17
18#[derive(Debug, Copy, Clone)]
19#[repr(C, align(4096))]
20struct AlignedSectorSize([u8; DISK_SECTOR_SIZE]);
21
22const_assert_eq!(align_of::<AlignedSectorSize>(), DISK_SECTOR_SIZE);
23
24impl Default for AlignedSectorSize {
25    fn default() -> Self {
26        Self([0; DISK_SECTOR_SIZE])
27    }
28}
29
30impl AlignedSectorSize {
31    fn slice_mut_to_repr(slice: &mut [Self]) -> &mut [[u8; DISK_SECTOR_SIZE]] {
32        // SAFETY: `AlignedSectorSize` is `#[repr(C)]` and its alignment is larger than inner value
33        unsafe { mem::transmute(slice) }
34    }
35}
36
37/// Wrapper data structure for direct/unbuffered I/O
38#[derive(Debug)]
39pub struct DirectIoFile {
40    file: File,
41    /// Scratch buffer of aligned memory for reads and writes
42    scratch_buffer: Mutex<Vec<AlignedSectorSize>>,
43}
44
45impl ReadAtSync for DirectIoFile {
46    #[inline]
47    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
48        self.read_exact_at(buf, offset)
49    }
50}
51
52impl ReadAtSync for &DirectIoFile {
53    #[inline]
54    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
55        (*self).read_at(buf, offset)
56    }
57}
58
59impl FileExt for DirectIoFile {
60    fn size(&self) -> io::Result<u64> {
61        Ok(self.file.metadata()?.len())
62    }
63
64    fn preallocate(&self, len: u64) -> io::Result<()> {
65        self.file.preallocate(len)
66    }
67
68    fn advise_random_access(&self) -> io::Result<()> {
69        // Ignore, already set
70        Ok(())
71    }
72
73    fn advise_sequential_access(&self) -> io::Result<()> {
74        // Ignore, not supported
75        Ok(())
76    }
77
78    fn disable_cache(&self) -> io::Result<()> {
79        // Ignore, not supported
80        Ok(())
81    }
82
83    fn read_exact_at(&self, buf: &mut [u8], mut offset: u64) -> io::Result<()> {
84        if buf.is_empty() {
85            return Ok(());
86        }
87
88        let mut scratch_buffer = self.scratch_buffer.lock();
89
90        // First read up to `MAX_READ_SIZE - padding`
91        let padding = (offset % DISK_SECTOR_SIZE as u64) as usize;
92        let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
93        let (unaligned_start, buf) = buf.split_at_mut(first_unaligned_chunk_size);
94        {
95            let bytes_to_read = unaligned_start.len();
96            unaligned_start.copy_from_slice(self.read_exact_at_internal(
97                &mut scratch_buffer,
98                bytes_to_read,
99                offset,
100            )?);
101            offset += unaligned_start.len() as u64;
102        }
103
104        if buf.is_empty() {
105            return Ok(());
106        }
107
108        // Process the rest of the chunks, up to `MAX_READ_SIZE` at a time
109        for buf in buf.chunks_mut(MAX_READ_SIZE) {
110            let bytes_to_read = buf.len();
111            buf.copy_from_slice(self.read_exact_at_internal(
112                &mut scratch_buffer,
113                bytes_to_read,
114                offset,
115            )?);
116            offset += buf.len() as u64;
117        }
118
119        Ok(())
120    }
121
122    fn write_all_at(&self, buf: &[u8], mut offset: u64) -> io::Result<()> {
123        if buf.is_empty() {
124            return Ok(());
125        }
126
127        let mut scratch_buffer = self.scratch_buffer.lock();
128
129        // First write up to `MAX_READ_SIZE - padding`
130        let padding = (offset % DISK_SECTOR_SIZE as u64) as usize;
131        let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
132        let (unaligned_start, buf) = buf.split_at(first_unaligned_chunk_size);
133        {
134            self.write_all_at_internal(&mut scratch_buffer, unaligned_start, offset)?;
135            offset += unaligned_start.len() as u64;
136        }
137
138        if buf.is_empty() {
139            return Ok(());
140        }
141
142        // Process the rest of the chunks, up to `MAX_READ_SIZE` at a time
143        for buf in buf.chunks(MAX_READ_SIZE) {
144            self.write_all_at_internal(&mut scratch_buffer, buf, offset)?;
145            offset += buf.len() as u64;
146        }
147
148        Ok(())
149    }
150}
151
152impl DirectIoFile {
153    /// Open file at specified path for direct/unbuffered I/O for reads (if file doesn't exist, it
154    /// will be created).
155    ///
156    /// This is especially important on Windows to prevent huge memory usage.
157    pub fn open<P>(path: P) -> io::Result<Self>
158    where
159        P: AsRef<Path>,
160    {
161        let mut open_options = OpenOptions::new();
162        open_options.use_direct_io();
163        let file = open_options
164            .read(true)
165            .write(true)
166            .create(true)
167            .truncate(false)
168            .open(path)?;
169
170        file.disable_cache()?;
171
172        Ok(Self {
173            file,
174            // In many cases we'll want to read this much at once, so pre-allocate it right away
175            scratch_buffer: Mutex::new(vec![
176                AlignedSectorSize::default();
177                MAX_READ_SIZE / DISK_SECTOR_SIZE
178            ]),
179        })
180    }
181
182    /// Truncates or extends the underlying file, updating the size of this file to become `size`.
183    pub fn set_len(&self, size: u64) -> io::Result<()> {
184        self.file.set_len(size)
185    }
186
187    fn read_exact_at_internal<'a>(
188        &self,
189        scratch_buffer: &'a mut Vec<AlignedSectorSize>,
190        bytes_to_read: usize,
191        offset: u64,
192    ) -> io::Result<&'a [u8]> {
193        let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64;
194        let padding = (offset - aligned_offset) as usize;
195
196        // Make scratch buffer of a size that is necessary to read aligned memory, accounting
197        // for extra bytes at the beginning and the end that will be thrown away
198        let desired_buffer_size = (padding + bytes_to_read).div_ceil(DISK_SECTOR_SIZE);
199        if scratch_buffer.len() < desired_buffer_size {
200            scratch_buffer.resize_with(desired_buffer_size, AlignedSectorSize::default);
201        }
202        let scratch_buffer = AlignedSectorSize::slice_mut_to_repr(scratch_buffer)
203            [..desired_buffer_size]
204            .as_flattened_mut();
205
206        self.file.read_exact_at(scratch_buffer, aligned_offset)?;
207
208        Ok(&scratch_buffer[padding..][..bytes_to_read])
209    }
210
211    /// Panics on writes over `MAX_READ_SIZE` (including padding on both ends)
212    fn write_all_at_internal(
213        &self,
214        scratch_buffer: &mut Vec<AlignedSectorSize>,
215        bytes_to_write: &[u8],
216        offset: u64,
217    ) -> io::Result<()> {
218        // This is guaranteed by constructor
219        assert!(
220            AlignedSectorSize::slice_mut_to_repr(scratch_buffer)
221                .as_flattened()
222                .len()
223                <= MAX_READ_SIZE
224        );
225
226        let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64;
227        let padding = (offset - aligned_offset) as usize;
228
229        // Calculate the size of the read including padding on both ends
230        let bytes_to_read =
231            (padding + bytes_to_write.len()).div_ceil(DISK_SECTOR_SIZE) * DISK_SECTOR_SIZE;
232
233        if padding == 0 && bytes_to_read == bytes_to_write.len() {
234            let scratch_buffer =
235                AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut();
236            let scratch_buffer = &mut scratch_buffer[..bytes_to_read];
237            scratch_buffer.copy_from_slice(bytes_to_write);
238            self.file.write_all_at(scratch_buffer, offset)?;
239        } else {
240            // Read whole pages where `bytes_to_write` will be written
241            self.read_exact_at_internal(scratch_buffer, bytes_to_read, aligned_offset)?;
242            let scratch_buffer =
243                AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut();
244            let scratch_buffer = &mut scratch_buffer[..bytes_to_read];
245            // Update contents of existing pages and write into the file
246            scratch_buffer[padding..][..bytes_to_write.len()].copy_from_slice(bytes_to_write);
247            self.file.write_all_at(scratch_buffer, aligned_offset)?;
248        }
249
250        Ok(())
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use crate::single_disk_farm::direct_io_file::{DirectIoFile, MAX_READ_SIZE};
257    use rand::prelude::*;
258    use std::fs;
259    use subspace_farmer_components::file_ext::FileExt;
260    use tempfile::tempdir;
261
262    #[test]
263    fn basic() {
264        let tempdir = tempdir().unwrap();
265        let file_path = tempdir.as_ref().join("file.bin");
266        let mut data = vec![0u8; MAX_READ_SIZE * 5];
267        thread_rng().fill(data.as_mut_slice());
268        fs::write(&file_path, &data).unwrap();
269
270        let file = DirectIoFile::open(&file_path).unwrap();
271
272        let mut buffer = Vec::new();
273        for (offset, size) in [
274            (0_usize, 512_usize),
275            (0_usize, 4096_usize),
276            (0, 500),
277            (0, 4000),
278            (5, 50),
279            (12, 500),
280            (96, 4000),
281            (4000, 96),
282            (10000, 5),
283            (0, MAX_READ_SIZE),
284            (0, MAX_READ_SIZE * 2),
285            (5, MAX_READ_SIZE - 5),
286            (5, MAX_READ_SIZE * 2 - 5),
287            (5, MAX_READ_SIZE),
288            (5, MAX_READ_SIZE * 2),
289            (MAX_READ_SIZE, MAX_READ_SIZE),
290            (MAX_READ_SIZE, MAX_READ_SIZE * 2),
291            (MAX_READ_SIZE + 5, MAX_READ_SIZE - 5),
292            (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2 - 5),
293            (MAX_READ_SIZE + 5, MAX_READ_SIZE),
294            (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2),
295        ] {
296            let data = &mut data[offset..][..size];
297            buffer.resize(size, 0);
298            // Read contents
299            file.read_exact_at(buffer.as_mut_slice(), offset as u64)
300                .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));
301
302            // Ensure it is correct
303            assert_eq!(data, buffer.as_slice(), "Offset {offset}, size {size}");
304
305            // Update data with random contents and write
306            thread_rng().fill(data);
307            file.write_all_at(data, offset as u64)
308                .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));
309
310            // Read contents again
311            file.read_exact_at(buffer.as_mut_slice(), offset as u64)
312                .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));
313
314            // Ensure it is correct too
315            assert_eq!(data, buffer.as_slice(), "Offset {offset}, size {size}");
316        }
317    }
318}