1pub mod direct_io_file;
8pub mod farming;
9pub mod identity;
10mod metrics;
11pub mod piece_cache;
12pub mod piece_reader;
13pub mod plot_cache;
14mod plotted_sectors;
15mod plotting;
16mod reward_signing;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20 Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21 PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::direct_io_file::{DISK_SECTOR_SIZE, DirectIoFile};
26use crate::single_disk_farm::farming::rayon_files::RayonFiles;
27use crate::single_disk_farm::farming::{
28 FarmingOptions, PlotAudit, farming, slot_notification_forwarder,
29};
30use crate::single_disk_farm::identity::{Identity, IdentityError};
31use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
32use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
33use crate::single_disk_farm::piece_reader::DiskPieceReader;
34use crate::single_disk_farm::plot_cache::DiskPlotCache;
35use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
36pub use crate::single_disk_farm::plotting::PlottingError;
37use crate::single_disk_farm::plotting::{
38 PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, plotting, plotting_scheduler,
39};
40use crate::single_disk_farm::reward_signing::reward_signing;
41use crate::utils::tokio_rayon_spawn_handler;
42use crate::{KNOWN_PEERS_CACHE_SIZE, farm};
43use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
44use async_trait::async_trait;
45use event_listener_primitives::{Bag, HandlerId};
46use futures::channel::{mpsc, oneshot};
47use futures::stream::FuturesUnordered;
48use futures::{FutureExt, StreamExt, select};
49use parity_scale_codec::{Decode, Encode};
50use parking_lot::Mutex;
51use prometheus_client::registry::Registry;
52use rayon::prelude::*;
53use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
54use serde::{Deserialize, Serialize};
55use static_assertions::const_assert;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::future::Future;
59use std::io::Write;
60use std::num::{NonZeroU32, NonZeroUsize};
61use std::path::{Path, PathBuf};
62use std::pin::Pin;
63use std::str::FromStr;
64use std::sync::Arc;
65use std::sync::atomic::{AtomicUsize, Ordering};
66use std::time::Duration;
67use std::{fmt, fs, io, mem};
68use subspace_core_primitives::PublicKey;
69use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash};
70use subspace_core_primitives::pieces::Record;
71use subspace_core_primitives::sectors::SectorIndex;
72use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
73use subspace_erasure_coding::ErasureCoding;
74use subspace_farmer_components::FarmerProtocolInfo;
75use subspace_farmer_components::file_ext::FileExt;
76use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
77use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed, sector_size};
78use subspace_kzg::Kzg;
79use subspace_networking::KnownPeersManager;
80use subspace_process::AsyncJoinOnDrop;
81use subspace_proof_of_space::Table;
82use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
83use thiserror::Error;
84use tokio::runtime::Handle;
85use tokio::sync::broadcast;
86use tokio::task;
87use tracing::{Instrument, Span, error, info, trace, warn};
88
89const_assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
92
93const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
95const RESERVED_FARM_INFO: u64 = 1024 * 1024;
97const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
98
99#[derive(Debug)]
101#[must_use = "Lock file must be kept around or as long as farm is used"]
102pub struct SingleDiskFarmInfoLock {
103 _file: File,
104}
105
106#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "camelCase")]
109pub enum SingleDiskFarmInfo {
110 #[serde(rename_all = "camelCase")]
112 V0 {
113 id: FarmId,
115 #[serde(with = "hex")]
117 genesis_hash: [u8; 32],
118 public_key: PublicKey,
120 pieces_in_sector: u16,
122 allocated_space: u64,
124 },
125}
126
127impl SingleDiskFarmInfo {
128 const FILE_NAME: &'static str = "single_disk_farm.json";
129
130 pub fn new(
132 id: FarmId,
133 genesis_hash: [u8; 32],
134 public_key: PublicKey,
135 pieces_in_sector: u16,
136 allocated_space: u64,
137 ) -> Self {
138 Self::V0 {
139 id,
140 genesis_hash,
141 public_key,
142 pieces_in_sector,
143 allocated_space,
144 }
145 }
146
147 pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
150 let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
151 Ok(bytes) => bytes,
152 Err(error) => {
153 return if error.kind() == io::ErrorKind::NotFound {
154 Ok(None)
155 } else {
156 Err(error)
157 };
158 }
159 };
160
161 serde_json::from_slice(&bytes)
162 .map(Some)
163 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
164 }
165
166 pub fn store_to(
170 &self,
171 directory: &Path,
172 lock: bool,
173 ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
174 let mut file = OpenOptions::new()
175 .write(true)
176 .create(true)
177 .truncate(false)
178 .open(directory.join(Self::FILE_NAME))?;
179 if lock {
180 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
181 }
182 file.set_len(0)?;
183 file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
184
185 Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
186 }
187
188 pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
191 let file = File::open(directory.join(Self::FILE_NAME))?;
192 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
193
194 Ok(SingleDiskFarmInfoLock { _file: file })
195 }
196
197 pub fn id(&self) -> &FarmId {
199 let Self::V0 { id, .. } = self;
200 id
201 }
202
203 pub fn genesis_hash(&self) -> &[u8; 32] {
205 let Self::V0 { genesis_hash, .. } = self;
206 genesis_hash
207 }
208
209 pub fn public_key(&self) -> &PublicKey {
211 let Self::V0 { public_key, .. } = self;
212 public_key
213 }
214
215 pub fn pieces_in_sector(&self) -> u16 {
217 match self {
218 SingleDiskFarmInfo::V0 {
219 pieces_in_sector, ..
220 } => *pieces_in_sector,
221 }
222 }
223
224 pub fn allocated_space(&self) -> u64 {
226 match self {
227 SingleDiskFarmInfo::V0 {
228 allocated_space, ..
229 } => *allocated_space,
230 }
231 }
232}
233
234#[derive(Debug)]
236pub enum SingleDiskFarmSummary {
237 Found {
239 info: SingleDiskFarmInfo,
241 directory: PathBuf,
243 },
244 NotFound {
246 directory: PathBuf,
248 },
249 Error {
251 directory: PathBuf,
253 error: io::Error,
255 },
256}
257
258#[derive(Debug, Encode, Decode)]
259struct PlotMetadataHeader {
260 version: u8,
261 plotted_sector_count: SectorIndex,
262}
263
264impl PlotMetadataHeader {
265 #[inline]
266 fn encoded_size() -> usize {
267 let default = PlotMetadataHeader {
268 version: 0,
269 plotted_sector_count: 0,
270 };
271
272 default.encoded_size()
273 }
274}
275
276#[derive(Debug)]
278pub struct SingleDiskFarmOptions<'a, NC>
279where
280 NC: Clone,
281{
282 pub directory: PathBuf,
284 pub farmer_app_info: FarmerAppInfo,
286 pub allocated_space: u64,
288 pub max_pieces_in_sector: u16,
290 pub node_client: NC,
292 pub reward_address: PublicKey,
294 pub plotter: Arc<dyn Plotter + Send + Sync>,
296 pub kzg: Kzg,
298 pub erasure_coding: ErasureCoding,
300 pub cache_percentage: u8,
302 pub farming_thread_pool_size: usize,
305 pub plotting_delay: Option<oneshot::Receiver<()>>,
308 pub global_mutex: Arc<AsyncMutex<()>>,
312 pub max_plotting_sectors_per_farm: NonZeroUsize,
314 pub disable_farm_locking: bool,
316 pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
318 pub registry: Option<&'a Mutex<&'a mut Registry>>,
320 pub create: bool,
322}
323
324#[derive(Debug, Error)]
326pub enum SingleDiskFarmError {
327 #[error("Failed to open or create identity: {0}")]
329 FailedToOpenIdentity(#[from] IdentityError),
330 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
332 LikelyAlreadyInUse(io::Error),
333 #[error("Single disk farm I/O error: {0}")]
335 Io(#[from] io::Error),
336 #[error("Failed to spawn task for blocking thread: {0}")]
338 TokioJoinError(#[from] task::JoinError),
339 #[error("Piece cache error: {0}")]
341 PieceCacheError(#[from] DiskPieceCacheError),
342 #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
344 CantPreallocateMetadataFile(io::Error),
345 #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
347 CantPreallocatePlotFile(io::Error),
348 #[error(
350 "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
351 created, it is not possible to use farm on a different chain"
352 )]
353 WrongChain {
354 id: FarmId,
356 correct_chain: String,
359 wrong_chain: String,
361 },
362 #[error(
364 "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
365 farm was created, something went wrong, likely due to manual edits"
366 )]
367 IdentityMismatch {
368 id: FarmId,
370 correct_public_key: PublicKey,
372 wrong_public_key: PublicKey,
374 },
375 #[error(
377 "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
378 {initialized_with}"
379 )]
380 InvalidPiecesInSector {
381 id: FarmId,
383 max_supported: u16,
385 initialized_with: u16,
387 },
388 #[error("Failed to decode metadata header: {0}")]
390 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
391 #[error("Unexpected metadata version {0}")]
393 UnexpectedMetadataVersion(u8),
394 #[error(
396 "Allocated space is not enough for one sector. \
397 The lowest acceptable value for allocated space is {min_space} bytes, \
398 provided {allocated_space} bytes."
399 )]
400 InsufficientAllocatedSpace {
401 min_space: u64,
403 allocated_space: u64,
405 },
406 #[error(
408 "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
409 supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
410 instead."
411 )]
412 FarmTooLarge {
413 allocated_space: u64,
415 allocated_sectors: u64,
417 max_space: u64,
419 max_sectors: u16,
421 },
422 #[error("Failed to create thread pool: {0}")]
424 FailedToCreateThreadPool(ThreadPoolBuildError),
425}
426
427#[derive(Debug, Error)]
429pub enum SingleDiskFarmScrubError {
430 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
432 LikelyAlreadyInUse(io::Error),
433 #[error("Failed to file size of {file}: {error}")]
435 FailedToDetermineFileSize {
436 file: PathBuf,
438 error: io::Error,
440 },
441 #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
443 FailedToReadBytes {
444 file: PathBuf,
446 size: u64,
448 offset: u64,
450 error: io::Error,
452 },
453 #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
455 FailedToWriteBytes {
456 file: PathBuf,
458 size: u64,
460 offset: u64,
462 error: io::Error,
464 },
465 #[error("Farm info file does not exist at {file}")]
467 FarmInfoFileDoesNotExist {
468 file: PathBuf,
470 },
471 #[error("Farm info at {file} can't be opened: {error}")]
473 FarmInfoCantBeOpened {
474 file: PathBuf,
476 error: io::Error,
478 },
479 #[error("Identity file does not exist at {file}")]
481 IdentityFileDoesNotExist {
482 file: PathBuf,
484 },
485 #[error("Identity at {file} can't be opened: {error}")]
487 IdentityCantBeOpened {
488 file: PathBuf,
490 error: IdentityError,
492 },
493 #[error("Identity public key {identity} doesn't match public key in the disk farm info {info}")]
495 PublicKeyMismatch {
496 identity: PublicKey,
498 info: PublicKey,
500 },
501 #[error("Metadata file does not exist at {file}")]
503 MetadataFileDoesNotExist {
504 file: PathBuf,
506 },
507 #[error("Metadata at {file} can't be opened: {error}")]
509 MetadataCantBeOpened {
510 file: PathBuf,
512 error: io::Error,
514 },
515 #[error(
517 "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
518 is {size}"
519 )]
520 MetadataFileTooSmall {
521 file: PathBuf,
523 reserved_size: u64,
525 size: u64,
527 },
528 #[error("Failed to decode metadata header: {0}")]
530 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
531 #[error("Unexpected metadata version {0}")]
533 UnexpectedMetadataVersion(u8),
534 #[error("Cache at {file} can't be opened: {error}")]
536 CacheCantBeOpened {
537 file: PathBuf,
539 error: io::Error,
541 },
542}
543
544#[derive(Debug, Error)]
546pub enum BackgroundTaskError {
547 #[error(transparent)]
549 Plotting(#[from] PlottingError),
550 #[error(transparent)]
552 Farming(#[from] FarmingError),
553 #[error(transparent)]
555 RewardSigning(#[from] anyhow::Error),
556 #[error("Background task {task} panicked")]
558 BackgroundTaskPanicked {
559 task: String,
561 },
562}
563
564type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
565
566#[derive(Debug, Copy, Clone)]
568pub enum ScrubTarget {
569 All,
571 Metadata,
573 Plot,
575 Cache,
577}
578
579impl fmt::Display for ScrubTarget {
580 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
581 match self {
582 Self::All => f.write_str("all"),
583 Self::Metadata => f.write_str("metadata"),
584 Self::Plot => f.write_str("plot"),
585 Self::Cache => f.write_str("cache"),
586 }
587 }
588}
589
590impl FromStr for ScrubTarget {
591 type Err = String;
592
593 fn from_str(s: &str) -> Result<Self, Self::Err> {
594 match s {
595 "all" => Ok(Self::All),
596 "metadata" => Ok(Self::Metadata),
597 "plot" => Ok(Self::Plot),
598 "cache" => Ok(Self::Cache),
599 s => Err(format!("Can't parse {s} as `ScrubTarget`")),
600 }
601 }
602}
603
604impl ScrubTarget {
605 fn metadata(&self) -> bool {
606 match self {
607 Self::All | Self::Metadata | Self::Plot => true,
608 Self::Cache => false,
609 }
610 }
611
612 fn plot(&self) -> bool {
613 match self {
614 Self::All | Self::Plot => true,
615 Self::Metadata | Self::Cache => false,
616 }
617 }
618
619 fn cache(&self) -> bool {
620 match self {
621 Self::All | Self::Cache => true,
622 Self::Metadata | Self::Plot => false,
623 }
624 }
625}
626
627struct AllocatedSpaceDistribution {
628 piece_cache_file_size: u64,
629 piece_cache_capacity: u32,
630 plot_file_size: u64,
631 target_sector_count: u16,
632 metadata_file_size: u64,
633}
634
635impl AllocatedSpaceDistribution {
636 fn new(
637 allocated_space: u64,
638 sector_size: u64,
639 cache_percentage: u8,
640 sector_metadata_size: u64,
641 ) -> Result<Self, SingleDiskFarmError> {
642 let single_sector_overhead = sector_size + sector_metadata_size;
643 let fixed_space_usage = RESERVED_PLOT_METADATA
645 + RESERVED_FARM_INFO
646 + Identity::file_size() as u64
647 + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
648 let target_sector_count = {
650 let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
651 / 100
652 * (100 - u64::from(cache_percentage));
653 (potentially_plottable_space - DISK_SECTOR_SIZE as u64) / single_sector_overhead
656 };
657
658 if target_sector_count == 0 {
659 let mut single_plot_with_cache_space =
660 single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
661 if single_plot_with_cache_space - single_sector_overhead
664 < DiskPieceCache::element_size() as u64
665 {
666 single_plot_with_cache_space =
667 single_sector_overhead + DiskPieceCache::element_size() as u64;
668 }
669
670 return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
671 min_space: fixed_space_usage + single_plot_with_cache_space,
672 allocated_space,
673 });
674 }
675 let plot_file_size = target_sector_count * sector_size;
676 let plot_file_size =
678 plot_file_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
679
680 let piece_cache_capacity = if cache_percentage > 0 {
682 let cache_space = allocated_space
683 - fixed_space_usage
684 - plot_file_size
685 - (sector_metadata_size * target_sector_count);
686 (cache_space / u64::from(DiskPieceCache::element_size())) as u32
687 } else {
688 0
689 };
690 let target_sector_count = match SectorIndex::try_from(target_sector_count) {
691 Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
692 target_sector_count
693 }
694 _ => {
695 let max_sectors = SectorIndex::MAX - 1;
698 return Err(SingleDiskFarmError::FarmTooLarge {
699 allocated_space: target_sector_count * sector_size,
700 allocated_sectors: target_sector_count,
701 max_space: max_sectors as u64 * sector_size,
702 max_sectors,
703 });
704 }
705 };
706
707 Ok(Self {
708 piece_cache_file_size: u64::from(piece_cache_capacity)
709 * u64::from(DiskPieceCache::element_size()),
710 piece_cache_capacity,
711 plot_file_size,
712 target_sector_count,
713 metadata_file_size: RESERVED_PLOT_METADATA
714 + sector_metadata_size * u64::from(target_sector_count),
715 })
716 }
717}
718
719type Handler<A> = Bag<HandlerFn<A>, A>;
720
721#[derive(Default, Debug)]
722struct Handlers {
723 sector_update: Handler<(SectorIndex, SectorUpdate)>,
724 farming_notification: Handler<FarmingNotification>,
725 solution: Handler<SolutionResponse>,
726}
727
728struct SingleDiskFarmInit {
729 identity: Identity,
730 single_disk_farm_info: SingleDiskFarmInfo,
731 single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
732 plot_file: Arc<DirectIoFile>,
733 metadata_file: DirectIoFile,
734 metadata_header: PlotMetadataHeader,
735 target_sector_count: u16,
736 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
737 piece_cache_capacity: u32,
738 plot_cache: DiskPlotCache,
739}
740
741#[derive(Debug)]
746#[must_use = "Plot does not function properly unless run() method is called"]
747pub struct SingleDiskFarm {
748 farmer_protocol_info: FarmerProtocolInfo,
749 single_disk_farm_info: SingleDiskFarmInfo,
750 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
752 pieces_in_sector: u16,
753 total_sectors_count: SectorIndex,
754 span: Span,
755 tasks: FuturesUnordered<BackgroundTask>,
756 handlers: Arc<Handlers>,
757 piece_cache: SingleDiskPieceCache,
758 plot_cache: DiskPlotCache,
759 piece_reader: DiskPieceReader,
760 start_sender: Option<broadcast::Sender<()>>,
762 stop_sender: Option<broadcast::Sender<()>>,
764 _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
765}
766
767impl Drop for SingleDiskFarm {
768 #[inline]
769 fn drop(&mut self) {
770 self.piece_reader.close_all_readers();
771 self.start_sender.take();
773 self.stop_sender.take();
775 }
776}
777
778#[async_trait(?Send)]
779impl Farm for SingleDiskFarm {
780 fn id(&self) -> &FarmId {
781 self.id()
782 }
783
784 fn total_sectors_count(&self) -> SectorIndex {
785 self.total_sectors_count
786 }
787
788 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
789 Arc::new(self.plotted_sectors())
790 }
791
792 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
793 Arc::new(self.piece_reader())
794 }
795
796 fn on_sector_update(
797 &self,
798 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
799 ) -> Box<dyn farm::HandlerId> {
800 Box::new(self.on_sector_update(callback))
801 }
802
803 fn on_farming_notification(
804 &self,
805 callback: HandlerFn<FarmingNotification>,
806 ) -> Box<dyn farm::HandlerId> {
807 Box::new(self.on_farming_notification(callback))
808 }
809
810 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
811 Box::new(self.on_solution(callback))
812 }
813
814 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
815 Box::pin((*self).run())
816 }
817}
818
819impl SingleDiskFarm {
820 pub const PLOT_FILE: &'static str = "plot.bin";
822 pub const METADATA_FILE: &'static str = "metadata.bin";
824 const SUPPORTED_PLOT_VERSION: u8 = 0;
825
826 pub async fn new<NC, PosTable>(
828 options: SingleDiskFarmOptions<'_, NC>,
829 farm_index: usize,
830 ) -> Result<Self, SingleDiskFarmError>
831 where
832 NC: NodeClient + Clone,
833 PosTable: Table,
834 {
835 let span = Span::current();
836
837 let SingleDiskFarmOptions {
838 directory,
839 farmer_app_info,
840 allocated_space,
841 max_pieces_in_sector,
842 node_client,
843 reward_address,
844 plotter,
845 kzg,
846 erasure_coding,
847 cache_percentage,
848 farming_thread_pool_size,
849 plotting_delay,
850 global_mutex,
851 max_plotting_sectors_per_farm,
852 disable_farm_locking,
853 read_sector_record_chunks_mode,
854 registry,
855 create,
856 } = options;
857
858 let single_disk_farm_init_fut = task::spawn_blocking({
859 let directory = directory.clone();
860 let farmer_app_info = farmer_app_info.clone();
861 let span = span.clone();
862
863 move || {
864 let _span_guard = span.enter();
865 Self::init(
866 &directory,
867 &farmer_app_info,
868 allocated_space,
869 max_pieces_in_sector,
870 cache_percentage,
871 disable_farm_locking,
872 create,
873 )
874 }
875 });
876
877 let single_disk_farm_init =
878 AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
879
880 let SingleDiskFarmInit {
881 identity,
882 single_disk_farm_info,
883 single_disk_farm_info_lock,
884 plot_file,
885 metadata_file,
886 metadata_header,
887 target_sector_count,
888 sectors_metadata,
889 piece_cache_capacity,
890 plot_cache,
891 } = single_disk_farm_init;
892
893 let piece_cache = {
894 let FarmId::Ulid(id) = *single_disk_farm_info.id();
896 let id = PieceCacheId::Ulid(id);
897
898 SingleDiskPieceCache::new(
899 id,
900 if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
901 Some(task::block_in_place(|| {
902 if let Some(registry) = registry {
903 DiskPieceCache::open(
904 &directory,
905 piece_cache_capacity,
906 Some(id),
907 Some(*registry.lock()),
908 )
909 } else {
910 DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
911 }
912 })?)
913 } else {
914 None
915 },
916 )
917 };
918
919 let public_key = *single_disk_farm_info.public_key();
920 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
921 let sector_size = sector_size(pieces_in_sector);
922
923 let metrics = registry.map(|registry| {
924 Arc::new(SingleDiskFarmMetrics::new(
925 *registry.lock(),
926 single_disk_farm_info.id(),
927 target_sector_count,
928 sectors_metadata.read_blocking().len() as SectorIndex,
929 ))
930 });
931
932 let (error_sender, error_receiver) = oneshot::channel();
933 let error_sender = Arc::new(Mutex::new(Some(error_sender)));
934
935 let tasks = FuturesUnordered::<BackgroundTask>::new();
936
937 tasks.push(Box::pin(async move {
938 if let Ok(error) = error_receiver.await {
939 return Err(error);
940 }
941
942 Ok(())
943 }));
944
945 let handlers = Arc::<Handlers>::default();
946 let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
947 let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
948 let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
949 let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
950 let sectors_indices_left_to_plot =
952 metadata_header.plotted_sector_count..target_sector_count;
953
954 let farming_thread_pool = ThreadPoolBuilder::new()
955 .thread_name(move |thread_index| format!("farming-{farm_index:02}.{thread_index:02}"))
956 .num_threads(farming_thread_pool_size)
957 .spawn_handler(tokio_rayon_spawn_handler())
958 .build()
959 .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
960 let farming_plot_fut = task::spawn_blocking(|| {
961 farming_thread_pool
962 .install(move || {
963 RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
964 DirectIoFile::open(path)
965 })
966 })
967 .map(|farming_plot| (farming_plot, farming_thread_pool))
968 });
969
970 let (farming_plot, farming_thread_pool) =
971 AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
972
973 let plotting_join_handle = task::spawn_blocking({
974 let sectors_metadata = Arc::clone(§ors_metadata);
975 let handlers = Arc::clone(&handlers);
976 let sectors_being_modified = Arc::clone(§ors_being_modified);
977 let node_client = node_client.clone();
978 let plot_file = Arc::clone(&plot_file);
979 let error_sender = Arc::clone(&error_sender);
980 let span = span.clone();
981 let global_mutex = Arc::clone(&global_mutex);
982 let metrics = metrics.clone();
983
984 move || {
985 let _span_guard = span.enter();
986
987 let plotting_options = PlottingOptions {
988 metadata_header,
989 sectors_metadata: §ors_metadata,
990 sectors_being_modified: §ors_being_modified,
991 sectors_to_plot_receiver,
992 sector_plotting_options: SectorPlottingOptions {
993 public_key,
994 node_client: &node_client,
995 pieces_in_sector,
996 sector_size,
997 plot_file,
998 metadata_file: Arc::new(metadata_file),
999 handlers: &handlers,
1000 global_mutex: &global_mutex,
1001 plotter,
1002 metrics,
1003 },
1004 max_plotting_sectors_per_farm,
1005 };
1006
1007 let plotting_fut = async {
1008 if start_receiver.recv().await.is_err() {
1009 return Ok(());
1011 }
1012
1013 if let Some(plotting_delay) = plotting_delay
1014 && plotting_delay.await.is_err()
1015 {
1016 return Ok(());
1018 }
1019
1020 plotting(plotting_options).await
1021 };
1022
1023 Handle::current().block_on(async {
1024 select! {
1025 plotting_result = plotting_fut.fuse() => {
1026 if let Err(error) = plotting_result
1027 && let Some(error_sender) = error_sender.lock().take()
1028 && let Err(error) = error_sender.send(error.into())
1029 {
1030 error!(
1031 %error,
1032 "Plotting failed to send error to background task"
1033 );
1034 }
1035 }
1036 _ = stop_receiver.recv().fuse() => {
1037 }
1039 }
1040 });
1041 }
1042 });
1043 let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1044
1045 tasks.push(Box::pin(async move {
1046 plotting_join_handle.await.map_err(|_error| {
1048 BackgroundTaskError::BackgroundTaskPanicked {
1049 task: format!("plotting-{farm_index}"),
1050 }
1051 })
1052 }));
1053
1054 let plotting_scheduler_options = PlottingSchedulerOptions {
1055 public_key_hash: public_key.hash(),
1056 sectors_indices_left_to_plot,
1057 target_sector_count,
1058 last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1059 min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1060 node_client: node_client.clone(),
1061 handlers: Arc::clone(&handlers),
1062 sectors_metadata: Arc::clone(§ors_metadata),
1063 sectors_to_plot_sender,
1064 new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1065 metrics: metrics.clone(),
1066 };
1067 tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1068
1069 let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1070
1071 tasks.push(Box::pin({
1072 let node_client = node_client.clone();
1073 let metrics = metrics.clone();
1074
1075 async move {
1076 slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1077 .await
1078 .map_err(BackgroundTaskError::Farming)
1079 }
1080 }));
1081
1082 let farming_join_handle = task::spawn_blocking({
1083 let erasure_coding = erasure_coding.clone();
1084 let handlers = Arc::clone(&handlers);
1085 let sectors_being_modified = Arc::clone(§ors_being_modified);
1086 let sectors_metadata = Arc::clone(§ors_metadata);
1087 let mut start_receiver = start_sender.subscribe();
1088 let mut stop_receiver = stop_sender.subscribe();
1089 let node_client = node_client.clone();
1090 let span = span.clone();
1091 let global_mutex = Arc::clone(&global_mutex);
1092
1093 move || {
1094 let _span_guard = span.enter();
1095
1096 let farming_fut = async move {
1097 if start_receiver.recv().await.is_err() {
1098 return Ok(());
1100 }
1101
1102 let plot_audit = PlotAudit::new(&farming_plot);
1103
1104 let farming_options = FarmingOptions {
1105 public_key,
1106 reward_address,
1107 node_client,
1108 plot_audit,
1109 sectors_metadata,
1110 kzg,
1111 erasure_coding,
1112 handlers,
1113 sectors_being_modified,
1114 slot_info_notifications: slot_info_forwarder_receiver,
1115 thread_pool: farming_thread_pool,
1116 read_sector_record_chunks_mode,
1117 global_mutex,
1118 metrics,
1119 };
1120 farming::<PosTable, _, _>(farming_options).await
1121 };
1122
1123 Handle::current().block_on(async {
1124 select! {
1125 farming_result = farming_fut.fuse() => {
1126 if let Err(error) = farming_result
1127 && let Some(error_sender) = error_sender.lock().take()
1128 && let Err(error) = error_sender.send(error.into())
1129 {
1130 error!(
1131 %error,
1132 "Farming failed to send error to background task",
1133 );
1134 }
1135 }
1136 _ = stop_receiver.recv().fuse() => {
1137 }
1139 }
1140 });
1141 }
1142 });
1143 let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1144
1145 tasks.push(Box::pin(async move {
1146 farming_join_handle.await.map_err(|_error| {
1148 BackgroundTaskError::BackgroundTaskPanicked {
1149 task: format!("farming-{farm_index}"),
1150 }
1151 })
1152 }));
1153
1154 let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1155 public_key,
1156 pieces_in_sector,
1157 plot_file,
1158 Arc::clone(§ors_metadata),
1159 erasure_coding,
1160 sectors_being_modified,
1161 read_sector_record_chunks_mode,
1162 global_mutex,
1163 );
1164
1165 let reading_join_handle = task::spawn_blocking({
1166 let mut stop_receiver = stop_sender.subscribe();
1167 let reading_fut = reading_fut.instrument(span.clone());
1168
1169 move || {
1170 Handle::current().block_on(async {
1171 select! {
1172 _ = reading_fut.fuse() => {
1173 }
1175 _ = stop_receiver.recv().fuse() => {
1176 }
1178 }
1179 });
1180 }
1181 });
1182
1183 let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1184
1185 tasks.push(Box::pin(async move {
1186 reading_join_handle.await.map_err(|_error| {
1188 BackgroundTaskError::BackgroundTaskPanicked {
1189 task: format!("reading-{farm_index}"),
1190 }
1191 })
1192 }));
1193
1194 tasks.push(Box::pin(async move {
1195 match reward_signing(node_client, identity).await {
1196 Ok(reward_signing_fut) => {
1197 reward_signing_fut.await;
1198 }
1199 Err(error) => {
1200 return Err(BackgroundTaskError::RewardSigning(anyhow::anyhow!(
1201 "Failed to subscribe to reward signing notifications: {error}"
1202 )));
1203 }
1204 }
1205
1206 Ok(())
1207 }));
1208
1209 let farm = Self {
1210 farmer_protocol_info: farmer_app_info.protocol_info,
1211 single_disk_farm_info,
1212 sectors_metadata,
1213 pieces_in_sector,
1214 total_sectors_count: target_sector_count,
1215 span,
1216 tasks,
1217 handlers,
1218 piece_cache,
1219 plot_cache,
1220 piece_reader,
1221 start_sender: Some(start_sender),
1222 stop_sender: Some(stop_sender),
1223 _single_disk_farm_info_lock: single_disk_farm_info_lock,
1224 };
1225 Ok(farm)
1226 }
1227
1228 fn init(
1229 directory: &PathBuf,
1230 farmer_app_info: &FarmerAppInfo,
1231 allocated_space: u64,
1232 max_pieces_in_sector: u16,
1233 cache_percentage: u8,
1234 disable_farm_locking: bool,
1235 create: bool,
1236 ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1237 fs::create_dir_all(directory)?;
1238
1239 let identity = if create {
1240 Identity::open_or_create(directory)?
1241 } else {
1242 Identity::open(directory)?.ok_or_else(|| {
1243 IdentityError::Io(io::Error::new(
1244 io::ErrorKind::NotFound,
1245 "Farm does not exist and creation was explicitly disabled",
1246 ))
1247 })?
1248 };
1249 let public_key = identity.public_key().to_bytes().into();
1250
1251 let (single_disk_farm_info, single_disk_farm_info_lock) =
1252 match SingleDiskFarmInfo::load_from(directory)? {
1253 Some(mut single_disk_farm_info) => {
1254 if &farmer_app_info.genesis_hash != single_disk_farm_info.genesis_hash() {
1255 return Err(SingleDiskFarmError::WrongChain {
1256 id: *single_disk_farm_info.id(),
1257 correct_chain: hex::encode(single_disk_farm_info.genesis_hash()),
1258 wrong_chain: hex::encode(farmer_app_info.genesis_hash),
1259 });
1260 }
1261
1262 if &public_key != single_disk_farm_info.public_key() {
1263 return Err(SingleDiskFarmError::IdentityMismatch {
1264 id: *single_disk_farm_info.id(),
1265 correct_public_key: *single_disk_farm_info.public_key(),
1266 wrong_public_key: public_key,
1267 });
1268 }
1269
1270 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1271
1272 if max_pieces_in_sector < pieces_in_sector {
1273 return Err(SingleDiskFarmError::InvalidPiecesInSector {
1274 id: *single_disk_farm_info.id(),
1275 max_supported: max_pieces_in_sector,
1276 initialized_with: pieces_in_sector,
1277 });
1278 }
1279
1280 if max_pieces_in_sector > pieces_in_sector {
1281 info!(
1282 pieces_in_sector,
1283 max_pieces_in_sector,
1284 "Farm initialized with smaller number of pieces in sector, farm needs \
1285 to be re-created for increase"
1286 );
1287 }
1288
1289 let mut single_disk_farm_info_lock = None;
1290
1291 if allocated_space != single_disk_farm_info.allocated_space() {
1292 info!(
1293 old_space = %bytesize::to_string(single_disk_farm_info.allocated_space(), true),
1294 new_space = %bytesize::to_string(allocated_space, true),
1295 "Farm size has changed"
1296 );
1297
1298 let new_allocated_space = allocated_space;
1299 match &mut single_disk_farm_info {
1300 SingleDiskFarmInfo::V0 {
1301 allocated_space, ..
1302 } => {
1303 *allocated_space = new_allocated_space;
1304 }
1305 }
1306
1307 single_disk_farm_info_lock =
1308 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1309 } else if !disable_farm_locking {
1310 single_disk_farm_info_lock = Some(
1311 SingleDiskFarmInfo::try_lock(directory)
1312 .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1313 );
1314 }
1315
1316 (single_disk_farm_info, single_disk_farm_info_lock)
1317 }
1318 None => {
1319 let single_disk_farm_info = SingleDiskFarmInfo::new(
1320 FarmId::new(),
1321 farmer_app_info.genesis_hash,
1322 public_key,
1323 max_pieces_in_sector,
1324 allocated_space,
1325 );
1326
1327 let single_disk_farm_info_lock =
1328 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1329
1330 (single_disk_farm_info, single_disk_farm_info_lock)
1331 }
1332 };
1333
1334 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1335 let sector_size = sector_size(pieces_in_sector) as u64;
1336 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1337 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1338 allocated_space,
1339 sector_size,
1340 cache_percentage,
1341 sector_metadata_size as u64,
1342 )?;
1343 let target_sector_count = allocated_space_distribution.target_sector_count;
1344
1345 let metadata_file_path = directory.join(Self::METADATA_FILE);
1346 let metadata_file = DirectIoFile::open(&metadata_file_path)?;
1347
1348 let metadata_size = metadata_file.size()?;
1349 let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1350 let expected_metadata_size =
1352 expected_metadata_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
1353 let metadata_header = if metadata_size == 0 {
1354 let metadata_header = PlotMetadataHeader {
1355 version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1356 plotted_sector_count: 0,
1357 };
1358
1359 metadata_file
1360 .preallocate(expected_metadata_size)
1361 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1362 metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1363
1364 metadata_header
1365 } else {
1366 if metadata_size != expected_metadata_size {
1367 metadata_file
1370 .preallocate(expected_metadata_size)
1371 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1372 metadata_file.set_len(expected_metadata_size)?;
1374 }
1375
1376 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1377 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1378
1379 let mut metadata_header =
1380 PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1381 .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1382
1383 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1384 return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1385 metadata_header.version,
1386 ));
1387 }
1388
1389 if metadata_header.plotted_sector_count > target_sector_count {
1390 metadata_header.plotted_sector_count = target_sector_count;
1391 metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1392 }
1393
1394 metadata_header
1395 };
1396
1397 let sectors_metadata = {
1398 let mut sectors_metadata =
1399 Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1400
1401 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1402 for sector_index in 0..metadata_header.plotted_sector_count {
1403 let sector_offset =
1404 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1405 metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1406
1407 let sector_metadata =
1408 match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1409 Ok(sector_metadata) => sector_metadata,
1410 Err(error) => {
1411 warn!(
1412 path = %metadata_file_path.display(),
1413 %error,
1414 %sector_index,
1415 "Failed to decode sector metadata, replacing with dummy expired \
1416 sector metadata"
1417 );
1418
1419 let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1420 sector_index,
1421 pieces_in_sector,
1422 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1423 history_size: HistorySize::from(SegmentIndex::ZERO),
1424 });
1425 metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1426
1427 dummy_sector
1428 }
1429 };
1430 sectors_metadata.push(sector_metadata);
1431 }
1432
1433 Arc::new(AsyncRwLock::new(sectors_metadata))
1434 };
1435
1436 let plot_file = DirectIoFile::open(directory.join(Self::PLOT_FILE))?;
1437
1438 if plot_file.size()? != allocated_space_distribution.plot_file_size {
1439 plot_file
1442 .preallocate(allocated_space_distribution.plot_file_size)
1443 .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1444 plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1446 }
1447
1448 let plot_file = Arc::new(plot_file);
1449
1450 let plot_cache = DiskPlotCache::new(
1451 &plot_file,
1452 §ors_metadata,
1453 target_sector_count,
1454 sector_size,
1455 );
1456
1457 Ok(SingleDiskFarmInit {
1458 identity,
1459 single_disk_farm_info,
1460 single_disk_farm_info_lock,
1461 plot_file,
1462 metadata_file,
1463 metadata_header,
1464 target_sector_count,
1465 sectors_metadata,
1466 piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1467 plot_cache,
1468 })
1469 }
1470
1471 pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1473 let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1474 Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1475 Ok(None) => {
1476 return SingleDiskFarmSummary::NotFound { directory };
1477 }
1478 Err(error) => {
1479 return SingleDiskFarmSummary::Error { directory, error };
1480 }
1481 };
1482
1483 SingleDiskFarmSummary::Found {
1484 info: single_disk_farm_info,
1485 directory,
1486 }
1487 }
1488
1489 pub fn effective_disk_usage(
1495 directory: &Path,
1496 cache_percentage: u8,
1497 ) -> Result<u64, SingleDiskFarmError> {
1498 let mut effective_disk_usage;
1499 match SingleDiskFarmInfo::load_from(directory)? {
1500 Some(single_disk_farm_info) => {
1501 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1502 single_disk_farm_info.allocated_space(),
1503 sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1504 cache_percentage,
1505 SectorMetadataChecksummed::encoded_size() as u64,
1506 )?;
1507
1508 effective_disk_usage = single_disk_farm_info.allocated_space();
1509 effective_disk_usage -= Identity::file_size() as u64;
1510 effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1511 effective_disk_usage -= allocated_space_distribution.plot_file_size;
1512 effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1513 }
1514 None => {
1515 effective_disk_usage = 0;
1517 }
1518 };
1519
1520 if Identity::open(directory)?.is_some() {
1521 effective_disk_usage += Identity::file_size() as u64;
1522 }
1523
1524 match OpenOptions::new()
1525 .read(true)
1526 .open(directory.join(Self::METADATA_FILE))
1527 {
1528 Ok(metadata_file) => {
1529 effective_disk_usage += metadata_file.size()?;
1530 }
1531 Err(error) => {
1532 if error.kind() == io::ErrorKind::NotFound {
1533 } else {
1535 return Err(error.into());
1536 }
1537 }
1538 };
1539
1540 match OpenOptions::new()
1541 .read(true)
1542 .open(directory.join(Self::PLOT_FILE))
1543 {
1544 Ok(plot_file) => {
1545 effective_disk_usage += plot_file.size()?;
1546 }
1547 Err(error) => {
1548 if error.kind() == io::ErrorKind::NotFound {
1549 } else {
1551 return Err(error.into());
1552 }
1553 }
1554 };
1555
1556 match OpenOptions::new()
1557 .read(true)
1558 .open(directory.join(DiskPieceCache::FILE_NAME))
1559 {
1560 Ok(piece_cache) => {
1561 effective_disk_usage += piece_cache.size()?;
1562 }
1563 Err(error) => {
1564 if error.kind() == io::ErrorKind::NotFound {
1565 } else {
1567 return Err(error.into());
1568 }
1569 }
1570 };
1571
1572 Ok(effective_disk_usage)
1573 }
1574
1575 pub fn read_all_sectors_metadata(
1577 directory: &Path,
1578 ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1579 let metadata_file = DirectIoFile::open(directory.join(Self::METADATA_FILE))?;
1580
1581 let metadata_size = metadata_file.size()?;
1582 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1583
1584 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1585 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1586
1587 let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1588 .map_err(|error| {
1589 io::Error::other(format!("Failed to decode metadata header: {error}"))
1590 })?;
1591
1592 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1593 return Err(io::Error::other(format!(
1594 "Unsupported metadata version {}",
1595 metadata_header.version
1596 )));
1597 }
1598
1599 let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1600 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1601 );
1602
1603 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1604 for sector_index in 0..metadata_header.plotted_sector_count {
1605 metadata_file.read_exact_at(
1606 &mut sector_metadata_bytes,
1607 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1608 )?;
1609 sectors_metadata.push(
1610 SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1611 |error| io::Error::other(format!("Failed to decode sector metadata: {error}")),
1612 )?,
1613 );
1614 }
1615
1616 Ok(sectors_metadata)
1617 }
1618
1619 pub fn id(&self) -> &FarmId {
1621 self.single_disk_farm_info.id()
1622 }
1623
1624 pub fn info(&self) -> &SingleDiskFarmInfo {
1626 &self.single_disk_farm_info
1627 }
1628
1629 pub fn total_sectors_count(&self) -> SectorIndex {
1631 self.total_sectors_count
1632 }
1633
1634 pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1636 SingleDiskPlottedSectors {
1637 public_key: *self.single_disk_farm_info.public_key(),
1638 pieces_in_sector: self.pieces_in_sector,
1639 farmer_protocol_info: self.farmer_protocol_info,
1640 sectors_metadata: Arc::clone(&self.sectors_metadata),
1641 }
1642 }
1643
1644 pub fn piece_cache(&self) -> SingleDiskPieceCache {
1646 self.piece_cache.clone()
1647 }
1648
1649 pub fn plot_cache(&self) -> DiskPlotCache {
1651 self.plot_cache.clone()
1652 }
1653
1654 pub fn piece_reader(&self) -> DiskPieceReader {
1656 self.piece_reader.clone()
1657 }
1658
1659 pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1661 self.handlers.sector_update.add(callback)
1662 }
1663
1664 pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1666 self.handlers.farming_notification.add(callback)
1667 }
1668
1669 pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1671 self.handlers.solution.add(callback)
1672 }
1673
1674 pub async fn run(mut self) -> anyhow::Result<()> {
1676 if let Some(start_sender) = self.start_sender.take() {
1677 let _ = start_sender.send(());
1679 }
1680
1681 while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1682 result?;
1683 }
1684
1685 Ok(())
1686 }
1687
1688 pub fn wipe(directory: &Path) -> io::Result<()> {
1690 let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1691 match SingleDiskFarmInfo::load_from(directory) {
1692 Ok(Some(single_disk_farm_info)) => {
1693 info!("Found single disk farm {}", single_disk_farm_info.id());
1694 }
1695 Ok(None) => {
1696 return Err(io::Error::new(
1697 io::ErrorKind::NotFound,
1698 format!(
1699 "Single disk farm info not found at {}",
1700 single_disk_info_info_path.display()
1701 ),
1702 ));
1703 }
1704 Err(error) => {
1705 warn!("Found unknown single disk farm: {}", error);
1706 }
1707 }
1708
1709 {
1710 let plot = directory.join(Self::PLOT_FILE);
1711 if plot.exists() {
1712 info!("Deleting plot file at {}", plot.display());
1713 fs::remove_file(plot)?;
1714 }
1715 }
1716 {
1717 let metadata = directory.join(Self::METADATA_FILE);
1718 if metadata.exists() {
1719 info!("Deleting metadata file at {}", metadata.display());
1720 fs::remove_file(metadata)?;
1721 }
1722 }
1723 {
1726 let identity = directory.join("identity.bin");
1727 if identity.exists() {
1728 info!("Deleting identity file at {}", identity.display());
1729 fs::remove_file(identity)?;
1730 }
1731 }
1732
1733 DiskPieceCache::wipe(directory)?;
1734
1735 info!(
1736 "Deleting info file at {}",
1737 single_disk_info_info_path.display()
1738 );
1739 fs::remove_file(single_disk_info_info_path)
1740 }
1741
1742 pub fn scrub(
1745 directory: &Path,
1746 disable_farm_locking: bool,
1747 target: ScrubTarget,
1748 dry_run: bool,
1749 ) -> Result<(), SingleDiskFarmScrubError> {
1750 let span = Span::current();
1751
1752 if dry_run {
1753 info!("Dry run is used, no changes will be written to disk");
1754 }
1755
1756 if target.metadata() || target.plot() {
1757 let info = {
1758 let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1759 info!(path = %file.display(), "Checking info file");
1760
1761 match SingleDiskFarmInfo::load_from(directory) {
1762 Ok(Some(info)) => info,
1763 Ok(None) => {
1764 return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1765 }
1766 Err(error) => {
1767 return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1768 }
1769 }
1770 };
1771
1772 let _single_disk_farm_info_lock = if disable_farm_locking {
1773 None
1774 } else {
1775 Some(
1776 SingleDiskFarmInfo::try_lock(directory)
1777 .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1778 )
1779 };
1780
1781 let identity = {
1782 let file = directory.join(Identity::FILE_NAME);
1783 info!(path = %file.display(), "Checking identity file");
1784
1785 match Identity::open(directory) {
1786 Ok(Some(identity)) => identity,
1787 Ok(None) => {
1788 return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1789 }
1790 Err(error) => {
1791 return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1792 }
1793 }
1794 };
1795
1796 if PublicKey::from(identity.public.to_bytes()) != *info.public_key() {
1797 return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1798 identity: PublicKey::from(identity.public.to_bytes()),
1799 info: *info.public_key(),
1800 });
1801 }
1802
1803 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1804
1805 let metadata_file_path = directory.join(Self::METADATA_FILE);
1806 let (metadata_file, mut metadata_header) = {
1807 info!(path = %metadata_file_path.display(), "Checking metadata file");
1808
1809 let metadata_file = match OpenOptions::new()
1810 .read(true)
1811 .write(!dry_run)
1812 .open(&metadata_file_path)
1813 {
1814 Ok(metadata_file) => metadata_file,
1815 Err(error) => {
1816 return Err(if error.kind() == io::ErrorKind::NotFound {
1817 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1818 file: metadata_file_path,
1819 }
1820 } else {
1821 SingleDiskFarmScrubError::MetadataCantBeOpened {
1822 file: metadata_file_path,
1823 error,
1824 }
1825 });
1826 }
1827 };
1828
1829 let _ = metadata_file.advise_sequential_access();
1831
1832 let metadata_size = match metadata_file.size() {
1833 Ok(metadata_size) => metadata_size,
1834 Err(error) => {
1835 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1836 file: metadata_file_path,
1837 error,
1838 });
1839 }
1840 };
1841
1842 if metadata_size < RESERVED_PLOT_METADATA {
1843 return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1844 file: metadata_file_path,
1845 reserved_size: RESERVED_PLOT_METADATA,
1846 size: metadata_size,
1847 });
1848 }
1849
1850 let mut metadata_header = {
1851 let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1852
1853 if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1854 return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1855 file: metadata_file_path,
1856 size: RESERVED_PLOT_METADATA,
1857 offset: 0,
1858 error,
1859 });
1860 }
1861
1862 PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1863 .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1864 };
1865
1866 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1867 return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1868 metadata_header.version,
1869 ));
1870 }
1871
1872 let plotted_sector_count = metadata_header.plotted_sector_count;
1873
1874 let expected_metadata_size = RESERVED_PLOT_METADATA
1875 + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1876
1877 if metadata_size < expected_metadata_size {
1878 warn!(
1879 %metadata_size,
1880 %expected_metadata_size,
1881 "Metadata file size is smaller than expected, shrinking number of plotted \
1882 sectors to correct value"
1883 );
1884
1885 metadata_header.plotted_sector_count =
1886 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1887 as SectorIndex;
1888 let metadata_header_bytes = metadata_header.encode();
1889
1890 if !dry_run
1891 && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1892 {
1893 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1894 file: metadata_file_path,
1895 size: metadata_header_bytes.len() as u64,
1896 offset: 0,
1897 error,
1898 });
1899 }
1900 }
1901
1902 (metadata_file, metadata_header)
1903 };
1904
1905 let pieces_in_sector = info.pieces_in_sector();
1906 let sector_size = sector_size(pieces_in_sector) as u64;
1907
1908 let plot_file_path = directory.join(Self::PLOT_FILE);
1909 let plot_file = {
1910 let plot_file_path = directory.join(Self::PLOT_FILE);
1911 info!(path = %plot_file_path.display(), "Checking plot file");
1912
1913 let plot_file = match OpenOptions::new()
1914 .read(true)
1915 .write(!dry_run)
1916 .open(&plot_file_path)
1917 {
1918 Ok(plot_file) => plot_file,
1919 Err(error) => {
1920 return Err(if error.kind() == io::ErrorKind::NotFound {
1921 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1922 file: plot_file_path,
1923 }
1924 } else {
1925 SingleDiskFarmScrubError::MetadataCantBeOpened {
1926 file: plot_file_path,
1927 error,
1928 }
1929 });
1930 }
1931 };
1932
1933 let _ = plot_file.advise_sequential_access();
1935
1936 let plot_size = match plot_file.size() {
1937 Ok(metadata_size) => metadata_size,
1938 Err(error) => {
1939 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1940 file: plot_file_path,
1941 error,
1942 });
1943 }
1944 };
1945
1946 let min_expected_plot_size =
1947 u64::from(metadata_header.plotted_sector_count) * sector_size;
1948 if plot_size < min_expected_plot_size {
1949 warn!(
1950 %plot_size,
1951 %min_expected_plot_size,
1952 "Plot file size is smaller than expected, shrinking number of plotted \
1953 sectors to correct value"
1954 );
1955
1956 metadata_header.plotted_sector_count = (plot_size / sector_size) as SectorIndex;
1957 let metadata_header_bytes = metadata_header.encode();
1958
1959 if !dry_run
1960 && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1961 {
1962 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1963 file: plot_file_path,
1964 size: metadata_header_bytes.len() as u64,
1965 offset: 0,
1966 error,
1967 });
1968 }
1969 }
1970
1971 plot_file
1972 };
1973
1974 let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1975
1976 info!("Checking sectors and corresponding metadata");
1977 (0..metadata_header.plotted_sector_count)
1978 .into_par_iter()
1979 .map_init(
1980 || vec![0u8; Record::SIZE],
1981 |scratch_buffer, sector_index| {
1982 let _span_guard = span.enter();
1983
1984 let offset = RESERVED_PLOT_METADATA
1985 + u64::from(sector_index) * sector_metadata_size as u64;
1986 if let Err(error) = metadata_file
1987 .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
1988 {
1989 warn!(
1990 path = %metadata_file_path.display(),
1991 %error,
1992 %offset,
1993 size = %sector_metadata_size,
1994 %sector_index,
1995 "Failed to read sector metadata, replacing with dummy expired \
1996 sector metadata"
1997 );
1998
1999 if !dry_run {
2000 write_dummy_sector_metadata(
2001 &metadata_file,
2002 &metadata_file_path,
2003 sector_index,
2004 pieces_in_sector,
2005 )?;
2006 }
2007 return Ok(());
2008 }
2009
2010 let sector_metadata = match SectorMetadataChecksummed::decode(
2011 &mut &scratch_buffer[..sector_metadata_size],
2012 ) {
2013 Ok(sector_metadata) => sector_metadata,
2014 Err(error) => {
2015 warn!(
2016 path = %metadata_file_path.display(),
2017 %error,
2018 %sector_index,
2019 "Failed to decode sector metadata, replacing with dummy \
2020 expired sector metadata"
2021 );
2022
2023 if !dry_run {
2024 write_dummy_sector_metadata(
2025 &metadata_file,
2026 &metadata_file_path,
2027 sector_index,
2028 pieces_in_sector,
2029 )?;
2030 }
2031 return Ok(());
2032 }
2033 };
2034
2035 if sector_metadata.sector_index != sector_index {
2036 warn!(
2037 path = %metadata_file_path.display(),
2038 %sector_index,
2039 found_sector_index = sector_metadata.sector_index,
2040 "Sector index mismatch, replacing with dummy expired sector \
2041 metadata"
2042 );
2043
2044 if !dry_run {
2045 write_dummy_sector_metadata(
2046 &metadata_file,
2047 &metadata_file_path,
2048 sector_index,
2049 pieces_in_sector,
2050 )?;
2051 }
2052 return Ok(());
2053 }
2054
2055 if sector_metadata.pieces_in_sector != pieces_in_sector {
2056 warn!(
2057 path = %metadata_file_path.display(),
2058 %sector_index,
2059 %pieces_in_sector,
2060 found_pieces_in_sector = sector_metadata.pieces_in_sector,
2061 "Pieces in sector mismatch, replacing with dummy expired sector \
2062 metadata"
2063 );
2064
2065 if !dry_run {
2066 write_dummy_sector_metadata(
2067 &metadata_file,
2068 &metadata_file_path,
2069 sector_index,
2070 pieces_in_sector,
2071 )?;
2072 }
2073 return Ok(());
2074 }
2075
2076 if target.plot() {
2077 let mut hasher = blake3::Hasher::new();
2078 for offset_in_sector in
2080 sector_bytes_range.clone().step_by(scratch_buffer.len())
2081 {
2082 let offset =
2083 u64::from(sector_index) * sector_size + offset_in_sector as u64;
2084 let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2085 .min(sector_bytes_range.end)
2086 - offset_in_sector;
2087
2088 let bytes = &mut scratch_buffer[..bytes_to_read];
2089
2090 if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2091 warn!(
2092 path = %plot_file_path.display(),
2093 %error,
2094 %sector_index,
2095 %offset,
2096 size = %bytes.len() as u64,
2097 "Failed to read sector bytes"
2098 );
2099
2100 continue;
2101 }
2102
2103 hasher.update(bytes);
2104 }
2105
2106 let actual_checksum = *hasher.finalize().as_bytes();
2107 let mut expected_checksum = [0; Blake3Hash::SIZE];
2108 {
2109 let offset = u64::from(sector_index) * sector_size
2110 + sector_bytes_range.end as u64;
2111 if let Err(error) =
2112 plot_file.read_exact_at(&mut expected_checksum, offset)
2113 {
2114 warn!(
2115 path = %plot_file_path.display(),
2116 %error,
2117 %sector_index,
2118 %offset,
2119 size = %expected_checksum.len() as u64,
2120 "Failed to read sector checksum bytes"
2121 );
2122 }
2123 }
2124
2125 if actual_checksum != expected_checksum {
2127 warn!(
2128 path = %plot_file_path.display(),
2129 %sector_index,
2130 actual_checksum = %hex::encode(actual_checksum),
2131 expected_checksum = %hex::encode(expected_checksum),
2132 "Plotted sector checksum mismatch, replacing with dummy \
2133 expired sector"
2134 );
2135
2136 if !dry_run {
2137 write_dummy_sector_metadata(
2138 &metadata_file,
2139 &metadata_file_path,
2140 sector_index,
2141 pieces_in_sector,
2142 )?;
2143 }
2144
2145 scratch_buffer.fill(0);
2146
2147 hasher.reset();
2148 for offset_in_sector in
2150 sector_bytes_range.clone().step_by(scratch_buffer.len())
2151 {
2152 let offset = u64::from(sector_index) * sector_size
2153 + offset_in_sector as u64;
2154 let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2155 .min(sector_bytes_range.end)
2156 - offset_in_sector;
2157 let bytes = &mut scratch_buffer[..bytes_to_write];
2158
2159 if !dry_run
2160 && let Err(error) = plot_file.write_all_at(bytes, offset)
2161 {
2162 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2163 file: plot_file_path.clone(),
2164 size: scratch_buffer.len() as u64,
2165 offset,
2166 error,
2167 });
2168 }
2169
2170 hasher.update(bytes);
2171 }
2172 {
2174 let checksum = *hasher.finalize().as_bytes();
2175 let offset = u64::from(sector_index) * sector_size
2176 + sector_bytes_range.end as u64;
2177 if !dry_run
2178 && let Err(error) =
2179 plot_file.write_all_at(&checksum, offset)
2180 {
2181 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2182 file: plot_file_path.clone(),
2183 size: checksum.len() as u64,
2184 offset,
2185 error,
2186 });
2187 }
2188 }
2189
2190 return Ok(());
2191 }
2192 }
2193
2194 trace!(%sector_index, "Sector is in good shape");
2195
2196 Ok(())
2197 },
2198 )
2199 .try_for_each({
2200 let span = &span;
2201 let checked_sectors = AtomicUsize::new(0);
2202
2203 move |result| {
2204 let _span_guard = span.enter();
2205
2206 let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2207 if checked_sectors > 1 && checked_sectors % 10 == 0 {
2208 info!(
2209 "Checked {}/{} sectors",
2210 checked_sectors, metadata_header.plotted_sector_count
2211 );
2212 }
2213
2214 result
2215 }
2216 })?;
2217 }
2218
2219 if target.cache() {
2220 Self::scrub_cache(directory, dry_run)?;
2221 }
2222
2223 info!("Farm check completed");
2224
2225 Ok(())
2226 }
2227
2228 fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2229 let span = Span::current();
2230
2231 let file = directory.join(DiskPieceCache::FILE_NAME);
2232 info!(path = %file.display(), "Checking cache file");
2233
2234 let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2235 Ok(plot_file) => plot_file,
2236 Err(error) => {
2237 return if error.kind() == io::ErrorKind::NotFound {
2238 warn!(
2239 file = %file.display(),
2240 "Cache file does not exist, this is expected in farming cluster"
2241 );
2242 Ok(())
2243 } else {
2244 Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2245 };
2246 }
2247 };
2248
2249 let _ = cache_file.advise_sequential_access();
2251
2252 let cache_size = match cache_file.size() {
2253 Ok(cache_size) => cache_size,
2254 Err(error) => {
2255 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2256 }
2257 };
2258
2259 let element_size = DiskPieceCache::element_size();
2260 let number_of_cached_elements = cache_size / u64::from(element_size);
2261 let dummy_element = vec![0; element_size as usize];
2262 (0..number_of_cached_elements)
2263 .into_par_iter()
2264 .map_with(vec![0; element_size as usize], |element, cache_offset| {
2265 let _span_guard = span.enter();
2266
2267 let offset = cache_offset * u64::from(element_size);
2268 if let Err(error) = cache_file.read_exact_at(element, offset) {
2269 warn!(
2270 path = %file.display(),
2271 %cache_offset,
2272 size = %element.len() as u64,
2273 %offset,
2274 %error,
2275 "Failed to read cached piece, replacing with dummy element"
2276 );
2277
2278 if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2279 {
2280 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2281 file: file.clone(),
2282 size: u64::from(element_size),
2283 offset,
2284 error,
2285 });
2286 }
2287
2288 return Ok(());
2289 }
2290
2291 let (index_and_piece_bytes, expected_checksum) =
2292 element.split_at(element_size as usize - Blake3Hash::SIZE);
2293 let actual_checksum = blake3_hash(index_and_piece_bytes);
2294 if *actual_checksum != *expected_checksum && element != &dummy_element {
2295 warn!(
2296 %cache_offset,
2297 actual_checksum = %hex::encode(actual_checksum),
2298 expected_checksum = %hex::encode(expected_checksum),
2299 "Cached piece checksum mismatch, replacing with dummy element"
2300 );
2301
2302 if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2303 {
2304 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2305 file: file.clone(),
2306 size: u64::from(element_size),
2307 offset,
2308 error,
2309 });
2310 }
2311
2312 return Ok(());
2313 }
2314
2315 Ok(())
2316 })
2317 .try_for_each({
2318 let span = &span;
2319 let checked_elements = AtomicUsize::new(0);
2320
2321 move |result| {
2322 let _span_guard = span.enter();
2323
2324 let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2325 if checked_elements > 1 && checked_elements % 1000 == 0 {
2326 info!(
2327 "Checked {}/{} cache elements",
2328 checked_elements, number_of_cached_elements
2329 );
2330 }
2331
2332 result
2333 }
2334 })?;
2335
2336 Ok(())
2337 }
2338}
2339
2340fn write_dummy_sector_metadata(
2341 metadata_file: &File,
2342 metadata_file_path: &Path,
2343 sector_index: SectorIndex,
2344 pieces_in_sector: u16,
2345) -> Result<(), SingleDiskFarmScrubError> {
2346 let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2347 sector_index,
2348 pieces_in_sector,
2349 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2350 history_size: HistorySize::from(SegmentIndex::ZERO),
2351 })
2352 .encode();
2353 let sector_offset = RESERVED_PLOT_METADATA
2354 + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2355 metadata_file
2356 .write_all_at(&dummy_sector_bytes, sector_offset)
2357 .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2358 file: metadata_file_path.to_path_buf(),
2359 size: dummy_sector_bytes.len() as u64,
2360 offset: sector_offset,
2361 error,
2362 })
2363}