subspace_farmer/single_disk_farm/
direct_io_file.rs1use parking_lot::Mutex;
4use static_assertions::const_assert_eq;
5use std::fs::{File, OpenOptions};
6use std::path::Path;
7use std::{io, mem};
8use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
9use subspace_farmer_components::ReadAtSync;
10
11pub const DISK_SECTOR_SIZE: usize = 4096;
13const MAX_READ_SIZE: usize = 1024 * 1024;
15
16const_assert_eq!(MAX_READ_SIZE % DISK_SECTOR_SIZE, 0);
17
18#[derive(Debug, Copy, Clone)]
19#[repr(C, align(4096))]
20struct AlignedSectorSize([u8; DISK_SECTOR_SIZE]);
21
22const_assert_eq!(align_of::<AlignedSectorSize>(), DISK_SECTOR_SIZE);
23
24impl Default for AlignedSectorSize {
25 fn default() -> Self {
26 Self([0; DISK_SECTOR_SIZE])
27 }
28}
29
30impl AlignedSectorSize {
31 fn slice_mut_to_repr(slice: &mut [Self]) -> &mut [[u8; DISK_SECTOR_SIZE]] {
32 unsafe { mem::transmute(slice) }
34 }
35}
36
37#[derive(Debug)]
39pub struct DirectIoFile {
40 file: File,
41 scratch_buffer: Mutex<Vec<AlignedSectorSize>>,
43}
44
45impl ReadAtSync for DirectIoFile {
46 #[inline]
47 fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
48 self.read_exact_at(buf, offset)
49 }
50}
51
52impl ReadAtSync for &DirectIoFile {
53 #[inline]
54 fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
55 (*self).read_at(buf, offset)
56 }
57}
58
59impl FileExt for DirectIoFile {
60 fn size(&self) -> io::Result<u64> {
61 Ok(self.file.metadata()?.len())
62 }
63
64 fn preallocate(&self, len: u64) -> io::Result<()> {
65 self.file.preallocate(len)
66 }
67
68 fn advise_random_access(&self) -> io::Result<()> {
69 Ok(())
71 }
72
73 fn advise_sequential_access(&self) -> io::Result<()> {
74 Ok(())
76 }
77
78 fn disable_cache(&self) -> io::Result<()> {
79 Ok(())
81 }
82
83 fn read_exact_at(&self, buf: &mut [u8], mut offset: u64) -> io::Result<()> {
84 if buf.is_empty() {
85 return Ok(());
86 }
87
88 let mut scratch_buffer = self.scratch_buffer.lock();
89
90 let padding = (offset % DISK_SECTOR_SIZE as u64) as usize;
92 let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
93 let (unaligned_start, buf) = buf.split_at_mut(first_unaligned_chunk_size);
94 {
95 let bytes_to_read = unaligned_start.len();
96 unaligned_start.copy_from_slice(self.read_exact_at_internal(
97 &mut scratch_buffer,
98 bytes_to_read,
99 offset,
100 )?);
101 offset += unaligned_start.len() as u64;
102 }
103
104 if buf.is_empty() {
105 return Ok(());
106 }
107
108 for buf in buf.chunks_mut(MAX_READ_SIZE) {
110 let bytes_to_read = buf.len();
111 buf.copy_from_slice(self.read_exact_at_internal(
112 &mut scratch_buffer,
113 bytes_to_read,
114 offset,
115 )?);
116 offset += buf.len() as u64;
117 }
118
119 Ok(())
120 }
121
122 fn write_all_at(&self, buf: &[u8], mut offset: u64) -> io::Result<()> {
123 if buf.is_empty() {
124 return Ok(());
125 }
126
127 let mut scratch_buffer = self.scratch_buffer.lock();
128
129 let padding = (offset % DISK_SECTOR_SIZE as u64) as usize;
131 let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
132 let (unaligned_start, buf) = buf.split_at(first_unaligned_chunk_size);
133 {
134 self.write_all_at_internal(&mut scratch_buffer, unaligned_start, offset)?;
135 offset += unaligned_start.len() as u64;
136 }
137
138 if buf.is_empty() {
139 return Ok(());
140 }
141
142 for buf in buf.chunks(MAX_READ_SIZE) {
144 self.write_all_at_internal(&mut scratch_buffer, buf, offset)?;
145 offset += buf.len() as u64;
146 }
147
148 Ok(())
149 }
150}
151
152impl DirectIoFile {
153 pub fn open<P>(path: P) -> io::Result<Self>
158 where
159 P: AsRef<Path>,
160 {
161 let mut open_options = OpenOptions::new();
162 open_options.use_direct_io();
163 let file = open_options
164 .read(true)
165 .write(true)
166 .create(true)
167 .truncate(false)
168 .open(path)?;
169
170 file.disable_cache()?;
171
172 Ok(Self {
173 file,
174 scratch_buffer: Mutex::new(vec![
176 AlignedSectorSize::default();
177 MAX_READ_SIZE / DISK_SECTOR_SIZE
178 ]),
179 })
180 }
181
182 pub fn set_len(&self, size: u64) -> io::Result<()> {
184 self.file.set_len(size)
185 }
186
187 fn read_exact_at_internal<'a>(
188 &self,
189 scratch_buffer: &'a mut Vec<AlignedSectorSize>,
190 bytes_to_read: usize,
191 offset: u64,
192 ) -> io::Result<&'a [u8]> {
193 let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64;
194 let padding = (offset - aligned_offset) as usize;
195
196 let desired_buffer_size = (padding + bytes_to_read).div_ceil(DISK_SECTOR_SIZE);
199 if scratch_buffer.len() < desired_buffer_size {
200 scratch_buffer.resize_with(desired_buffer_size, AlignedSectorSize::default);
201 }
202 let scratch_buffer = AlignedSectorSize::slice_mut_to_repr(scratch_buffer)
203 [..desired_buffer_size]
204 .as_flattened_mut();
205
206 self.file.read_exact_at(scratch_buffer, aligned_offset)?;
207
208 Ok(&scratch_buffer[padding..][..bytes_to_read])
209 }
210
211 fn write_all_at_internal(
213 &self,
214 scratch_buffer: &mut Vec<AlignedSectorSize>,
215 bytes_to_write: &[u8],
216 offset: u64,
217 ) -> io::Result<()> {
218 assert!(
220 AlignedSectorSize::slice_mut_to_repr(scratch_buffer)
221 .as_flattened()
222 .len()
223 <= MAX_READ_SIZE
224 );
225
226 let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64;
227 let padding = (offset - aligned_offset) as usize;
228
229 let bytes_to_read =
231 (padding + bytes_to_write.len()).div_ceil(DISK_SECTOR_SIZE) * DISK_SECTOR_SIZE;
232
233 if padding == 0 && bytes_to_read == bytes_to_write.len() {
234 let scratch_buffer =
235 AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut();
236 let scratch_buffer = &mut scratch_buffer[..bytes_to_read];
237 scratch_buffer.copy_from_slice(bytes_to_write);
238 self.file.write_all_at(scratch_buffer, offset)?;
239 } else {
240 self.read_exact_at_internal(scratch_buffer, bytes_to_read, aligned_offset)?;
242 let scratch_buffer =
243 AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut();
244 let scratch_buffer = &mut scratch_buffer[..bytes_to_read];
245 scratch_buffer[padding..][..bytes_to_write.len()].copy_from_slice(bytes_to_write);
247 self.file.write_all_at(scratch_buffer, aligned_offset)?;
248 }
249
250 Ok(())
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use crate::single_disk_farm::direct_io_file::{DirectIoFile, MAX_READ_SIZE};
257 use rand::prelude::*;
258 use std::fs;
259 use subspace_farmer_components::file_ext::FileExt;
260 use tempfile::tempdir;
261
262 #[test]
263 fn basic() {
264 let tempdir = tempdir().unwrap();
265 let file_path = tempdir.as_ref().join("file.bin");
266 let mut data = vec![0u8; MAX_READ_SIZE * 5];
267 thread_rng().fill(data.as_mut_slice());
268 fs::write(&file_path, &data).unwrap();
269
270 let file = DirectIoFile::open(&file_path).unwrap();
271
272 let mut buffer = Vec::new();
273 for (offset, size) in [
274 (0_usize, 512_usize),
275 (0_usize, 4096_usize),
276 (0, 500),
277 (0, 4000),
278 (5, 50),
279 (12, 500),
280 (96, 4000),
281 (4000, 96),
282 (10000, 5),
283 (0, MAX_READ_SIZE),
284 (0, MAX_READ_SIZE * 2),
285 (5, MAX_READ_SIZE - 5),
286 (5, MAX_READ_SIZE * 2 - 5),
287 (5, MAX_READ_SIZE),
288 (5, MAX_READ_SIZE * 2),
289 (MAX_READ_SIZE, MAX_READ_SIZE),
290 (MAX_READ_SIZE, MAX_READ_SIZE * 2),
291 (MAX_READ_SIZE + 5, MAX_READ_SIZE - 5),
292 (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2 - 5),
293 (MAX_READ_SIZE + 5, MAX_READ_SIZE),
294 (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2),
295 ] {
296 let data = &mut data[offset..][..size];
297 buffer.resize(size, 0);
298 file.read_exact_at(buffer.as_mut_slice(), offset as u64)
300 .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));
301
302 assert_eq!(data, buffer.as_slice(), "Offset {offset}, size {size}");
304
305 thread_rng().fill(data);
307 file.write_all_at(data, offset as u64)
308 .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));
309
310 file.read_exact_at(buffer.as_mut_slice(), offset as u64)
312 .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));
313
314 assert_eq!(data, buffer.as_slice(), "Offset {offset}, size {size}");
316 }
317 }
318}