1pub mod direct_io_file;
8pub mod farming;
9pub mod identity;
10mod metrics;
11pub mod piece_cache;
12pub mod piece_reader;
13pub mod plot_cache;
14mod plotted_sectors;
15mod plotting;
16mod reward_signing;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20 Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21 PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::direct_io_file::{DISK_SECTOR_SIZE, DirectIoFile};
26use crate::single_disk_farm::farming::rayon_files::RayonFiles;
27use crate::single_disk_farm::farming::{
28 FarmingOptions, PlotAudit, farming, slot_notification_forwarder,
29};
30use crate::single_disk_farm::identity::{Identity, IdentityError};
31use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
32use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
33use crate::single_disk_farm::piece_reader::DiskPieceReader;
34use crate::single_disk_farm::plot_cache::DiskPlotCache;
35use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
36pub use crate::single_disk_farm::plotting::PlottingError;
37use crate::single_disk_farm::plotting::{
38 PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, plotting, plotting_scheduler,
39};
40use crate::single_disk_farm::reward_signing::reward_signing;
41use crate::utils::{AsyncJoinOnDrop, tokio_rayon_spawn_handler};
42use crate::{KNOWN_PEERS_CACHE_SIZE, farm};
43use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
44use async_trait::async_trait;
45use event_listener_primitives::{Bag, HandlerId};
46use futures::channel::{mpsc, oneshot};
47use futures::stream::FuturesUnordered;
48use futures::{FutureExt, StreamExt, select};
49use parity_scale_codec::{Decode, Encode};
50use parking_lot::Mutex;
51use prometheus_client::registry::Registry;
52use rayon::prelude::*;
53use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
54use serde::{Deserialize, Serialize};
55use static_assertions::const_assert;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::future::Future;
59use std::io::Write;
60use std::num::{NonZeroU32, NonZeroUsize};
61use std::path::{Path, PathBuf};
62use std::pin::Pin;
63use std::str::FromStr;
64use std::sync::Arc;
65use std::sync::atomic::{AtomicUsize, Ordering};
66use std::time::Duration;
67use std::{fmt, fs, io, mem};
68use subspace_core_primitives::PublicKey;
69use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash};
70use subspace_core_primitives::pieces::Record;
71use subspace_core_primitives::sectors::SectorIndex;
72use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
73use subspace_erasure_coding::ErasureCoding;
74use subspace_farmer_components::FarmerProtocolInfo;
75use subspace_farmer_components::file_ext::FileExt;
76use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
77use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed, sector_size};
78use subspace_kzg::Kzg;
79use subspace_networking::KnownPeersManager;
80use subspace_proof_of_space::Table;
81use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
82use thiserror::Error;
83use tokio::runtime::Handle;
84use tokio::sync::broadcast;
85use tokio::task;
86use tracing::{Instrument, Span, error, info, trace, warn};
87
88const_assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
91
92const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
94const RESERVED_FARM_INFO: u64 = 1024 * 1024;
96const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
97
98#[derive(Debug)]
100#[must_use = "Lock file must be kept around or as long as farm is used"]
101pub struct SingleDiskFarmInfoLock {
102 _file: File,
103}
104
105#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")]
108pub enum SingleDiskFarmInfo {
109 #[serde(rename_all = "camelCase")]
111 V0 {
112 id: FarmId,
114 #[serde(with = "hex")]
116 genesis_hash: [u8; 32],
117 public_key: PublicKey,
119 pieces_in_sector: u16,
121 allocated_space: u64,
123 },
124}
125
126impl SingleDiskFarmInfo {
127 const FILE_NAME: &'static str = "single_disk_farm.json";
128
129 pub fn new(
131 id: FarmId,
132 genesis_hash: [u8; 32],
133 public_key: PublicKey,
134 pieces_in_sector: u16,
135 allocated_space: u64,
136 ) -> Self {
137 Self::V0 {
138 id,
139 genesis_hash,
140 public_key,
141 pieces_in_sector,
142 allocated_space,
143 }
144 }
145
146 pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
149 let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
150 Ok(bytes) => bytes,
151 Err(error) => {
152 return if error.kind() == io::ErrorKind::NotFound {
153 Ok(None)
154 } else {
155 Err(error)
156 };
157 }
158 };
159
160 serde_json::from_slice(&bytes)
161 .map(Some)
162 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
163 }
164
165 pub fn store_to(
169 &self,
170 directory: &Path,
171 lock: bool,
172 ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
173 let mut file = OpenOptions::new()
174 .write(true)
175 .create(true)
176 .truncate(false)
177 .open(directory.join(Self::FILE_NAME))?;
178 if lock {
179 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
180 }
181 file.set_len(0)?;
182 file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
183
184 Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
185 }
186
187 pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
190 let file = File::open(directory.join(Self::FILE_NAME))?;
191 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
192
193 Ok(SingleDiskFarmInfoLock { _file: file })
194 }
195
196 pub fn id(&self) -> &FarmId {
198 let Self::V0 { id, .. } = self;
199 id
200 }
201
202 pub fn genesis_hash(&self) -> &[u8; 32] {
204 let Self::V0 { genesis_hash, .. } = self;
205 genesis_hash
206 }
207
208 pub fn public_key(&self) -> &PublicKey {
210 let Self::V0 { public_key, .. } = self;
211 public_key
212 }
213
214 pub fn pieces_in_sector(&self) -> u16 {
216 match self {
217 SingleDiskFarmInfo::V0 {
218 pieces_in_sector, ..
219 } => *pieces_in_sector,
220 }
221 }
222
223 pub fn allocated_space(&self) -> u64 {
225 match self {
226 SingleDiskFarmInfo::V0 {
227 allocated_space, ..
228 } => *allocated_space,
229 }
230 }
231}
232
233#[derive(Debug)]
235pub enum SingleDiskFarmSummary {
236 Found {
238 info: SingleDiskFarmInfo,
240 directory: PathBuf,
242 },
243 NotFound {
245 directory: PathBuf,
247 },
248 Error {
250 directory: PathBuf,
252 error: io::Error,
254 },
255}
256
257#[derive(Debug, Encode, Decode)]
258struct PlotMetadataHeader {
259 version: u8,
260 plotted_sector_count: SectorIndex,
261}
262
263impl PlotMetadataHeader {
264 #[inline]
265 fn encoded_size() -> usize {
266 let default = PlotMetadataHeader {
267 version: 0,
268 plotted_sector_count: 0,
269 };
270
271 default.encoded_size()
272 }
273}
274
275#[derive(Debug)]
277pub struct SingleDiskFarmOptions<'a, NC>
278where
279 NC: Clone,
280{
281 pub directory: PathBuf,
283 pub farmer_app_info: FarmerAppInfo,
285 pub allocated_space: u64,
287 pub max_pieces_in_sector: u16,
289 pub node_client: NC,
291 pub reward_address: PublicKey,
293 pub plotter: Arc<dyn Plotter + Send + Sync>,
295 pub kzg: Kzg,
297 pub erasure_coding: ErasureCoding,
299 pub cache_percentage: u8,
301 pub farming_thread_pool_size: usize,
304 pub plotting_delay: Option<oneshot::Receiver<()>>,
307 pub global_mutex: Arc<AsyncMutex<()>>,
311 pub max_plotting_sectors_per_farm: NonZeroUsize,
313 pub disable_farm_locking: bool,
315 pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
317 pub registry: Option<&'a Mutex<&'a mut Registry>>,
319 pub create: bool,
321}
322
323#[derive(Debug, Error)]
325pub enum SingleDiskFarmError {
326 #[error("Failed to open or create identity: {0}")]
328 FailedToOpenIdentity(#[from] IdentityError),
329 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
331 LikelyAlreadyInUse(io::Error),
332 #[error("Single disk farm I/O error: {0}")]
334 Io(#[from] io::Error),
335 #[error("Failed to spawn task for blocking thread: {0}")]
337 TokioJoinError(#[from] task::JoinError),
338 #[error("Piece cache error: {0}")]
340 PieceCacheError(#[from] DiskPieceCacheError),
341 #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
343 CantPreallocateMetadataFile(io::Error),
344 #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
346 CantPreallocatePlotFile(io::Error),
347 #[error(
349 "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
350 created, it is not possible to use farm on a different chain"
351 )]
352 WrongChain {
353 id: FarmId,
355 correct_chain: String,
358 wrong_chain: String,
360 },
361 #[error(
363 "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
364 farm was created, something went wrong, likely due to manual edits"
365 )]
366 IdentityMismatch {
367 id: FarmId,
369 correct_public_key: PublicKey,
371 wrong_public_key: PublicKey,
373 },
374 #[error(
376 "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
377 {initialized_with}"
378 )]
379 InvalidPiecesInSector {
380 id: FarmId,
382 max_supported: u16,
384 initialized_with: u16,
386 },
387 #[error("Failed to decode metadata header: {0}")]
389 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
390 #[error("Unexpected metadata version {0}")]
392 UnexpectedMetadataVersion(u8),
393 #[error(
395 "Allocated space is not enough for one sector. \
396 The lowest acceptable value for allocated space is {min_space} bytes, \
397 provided {allocated_space} bytes."
398 )]
399 InsufficientAllocatedSpace {
400 min_space: u64,
402 allocated_space: u64,
404 },
405 #[error(
407 "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
408 supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
409 instead."
410 )]
411 FarmTooLarge {
412 allocated_space: u64,
414 allocated_sectors: u64,
416 max_space: u64,
418 max_sectors: u16,
420 },
421 #[error("Failed to create thread pool: {0}")]
423 FailedToCreateThreadPool(ThreadPoolBuildError),
424}
425
426#[derive(Debug, Error)]
428pub enum SingleDiskFarmScrubError {
429 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
431 LikelyAlreadyInUse(io::Error),
432 #[error("Failed to file size of {file}: {error}")]
434 FailedToDetermineFileSize {
435 file: PathBuf,
437 error: io::Error,
439 },
440 #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
442 FailedToReadBytes {
443 file: PathBuf,
445 size: u64,
447 offset: u64,
449 error: io::Error,
451 },
452 #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
454 FailedToWriteBytes {
455 file: PathBuf,
457 size: u64,
459 offset: u64,
461 error: io::Error,
463 },
464 #[error("Farm info file does not exist at {file}")]
466 FarmInfoFileDoesNotExist {
467 file: PathBuf,
469 },
470 #[error("Farm info at {file} can't be opened: {error}")]
472 FarmInfoCantBeOpened {
473 file: PathBuf,
475 error: io::Error,
477 },
478 #[error("Identity file does not exist at {file}")]
480 IdentityFileDoesNotExist {
481 file: PathBuf,
483 },
484 #[error("Identity at {file} can't be opened: {error}")]
486 IdentityCantBeOpened {
487 file: PathBuf,
489 error: IdentityError,
491 },
492 #[error("Identity public key {identity} doesn't match public key in the disk farm info {info}")]
494 PublicKeyMismatch {
495 identity: PublicKey,
497 info: PublicKey,
499 },
500 #[error("Metadata file does not exist at {file}")]
502 MetadataFileDoesNotExist {
503 file: PathBuf,
505 },
506 #[error("Metadata at {file} can't be opened: {error}")]
508 MetadataCantBeOpened {
509 file: PathBuf,
511 error: io::Error,
513 },
514 #[error(
516 "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
517 is {size}"
518 )]
519 MetadataFileTooSmall {
520 file: PathBuf,
522 reserved_size: u64,
524 size: u64,
526 },
527 #[error("Failed to decode metadata header: {0}")]
529 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
530 #[error("Unexpected metadata version {0}")]
532 UnexpectedMetadataVersion(u8),
533 #[error("Cache at {file} can't be opened: {error}")]
535 CacheCantBeOpened {
536 file: PathBuf,
538 error: io::Error,
540 },
541}
542
543#[derive(Debug, Error)]
545pub enum BackgroundTaskError {
546 #[error(transparent)]
548 Plotting(#[from] PlottingError),
549 #[error(transparent)]
551 Farming(#[from] FarmingError),
552 #[error(transparent)]
554 RewardSigning(#[from] anyhow::Error),
555 #[error("Background task {task} panicked")]
557 BackgroundTaskPanicked {
558 task: String,
560 },
561}
562
563type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
564
565#[derive(Debug, Copy, Clone)]
567pub enum ScrubTarget {
568 All,
570 Metadata,
572 Plot,
574 Cache,
576}
577
578impl fmt::Display for ScrubTarget {
579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580 match self {
581 Self::All => f.write_str("all"),
582 Self::Metadata => f.write_str("metadata"),
583 Self::Plot => f.write_str("plot"),
584 Self::Cache => f.write_str("cache"),
585 }
586 }
587}
588
589impl FromStr for ScrubTarget {
590 type Err = String;
591
592 fn from_str(s: &str) -> Result<Self, Self::Err> {
593 match s {
594 "all" => Ok(Self::All),
595 "metadata" => Ok(Self::Metadata),
596 "plot" => Ok(Self::Plot),
597 "cache" => Ok(Self::Cache),
598 s => Err(format!("Can't parse {s} as `ScrubTarget`")),
599 }
600 }
601}
602
603impl ScrubTarget {
604 fn metadata(&self) -> bool {
605 match self {
606 Self::All | Self::Metadata | Self::Plot => true,
607 Self::Cache => false,
608 }
609 }
610
611 fn plot(&self) -> bool {
612 match self {
613 Self::All | Self::Plot => true,
614 Self::Metadata | Self::Cache => false,
615 }
616 }
617
618 fn cache(&self) -> bool {
619 match self {
620 Self::All | Self::Cache => true,
621 Self::Metadata | Self::Plot => false,
622 }
623 }
624}
625
626struct AllocatedSpaceDistribution {
627 piece_cache_file_size: u64,
628 piece_cache_capacity: u32,
629 plot_file_size: u64,
630 target_sector_count: u16,
631 metadata_file_size: u64,
632}
633
634impl AllocatedSpaceDistribution {
635 fn new(
636 allocated_space: u64,
637 sector_size: u64,
638 cache_percentage: u8,
639 sector_metadata_size: u64,
640 ) -> Result<Self, SingleDiskFarmError> {
641 let single_sector_overhead = sector_size + sector_metadata_size;
642 let fixed_space_usage = RESERVED_PLOT_METADATA
644 + RESERVED_FARM_INFO
645 + Identity::file_size() as u64
646 + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
647 let target_sector_count = {
649 let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
650 / 100
651 * (100 - u64::from(cache_percentage));
652 (potentially_plottable_space - DISK_SECTOR_SIZE as u64) / single_sector_overhead
655 };
656
657 if target_sector_count == 0 {
658 let mut single_plot_with_cache_space =
659 single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
660 if single_plot_with_cache_space - single_sector_overhead
663 < DiskPieceCache::element_size() as u64
664 {
665 single_plot_with_cache_space =
666 single_sector_overhead + DiskPieceCache::element_size() as u64;
667 }
668
669 return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
670 min_space: fixed_space_usage + single_plot_with_cache_space,
671 allocated_space,
672 });
673 }
674 let plot_file_size = target_sector_count * sector_size;
675 let plot_file_size =
677 plot_file_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
678
679 let piece_cache_capacity = if cache_percentage > 0 {
681 let cache_space = allocated_space
682 - fixed_space_usage
683 - plot_file_size
684 - (sector_metadata_size * target_sector_count);
685 (cache_space / u64::from(DiskPieceCache::element_size())) as u32
686 } else {
687 0
688 };
689 let target_sector_count = match SectorIndex::try_from(target_sector_count) {
690 Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
691 target_sector_count
692 }
693 _ => {
694 let max_sectors = SectorIndex::MAX - 1;
697 return Err(SingleDiskFarmError::FarmTooLarge {
698 allocated_space: target_sector_count * sector_size,
699 allocated_sectors: target_sector_count,
700 max_space: max_sectors as u64 * sector_size,
701 max_sectors,
702 });
703 }
704 };
705
706 Ok(Self {
707 piece_cache_file_size: u64::from(piece_cache_capacity)
708 * u64::from(DiskPieceCache::element_size()),
709 piece_cache_capacity,
710 plot_file_size,
711 target_sector_count,
712 metadata_file_size: RESERVED_PLOT_METADATA
713 + sector_metadata_size * u64::from(target_sector_count),
714 })
715 }
716}
717
718type Handler<A> = Bag<HandlerFn<A>, A>;
719
720#[derive(Default, Debug)]
721struct Handlers {
722 sector_update: Handler<(SectorIndex, SectorUpdate)>,
723 farming_notification: Handler<FarmingNotification>,
724 solution: Handler<SolutionResponse>,
725}
726
727struct SingleDiskFarmInit {
728 identity: Identity,
729 single_disk_farm_info: SingleDiskFarmInfo,
730 single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
731 plot_file: Arc<DirectIoFile>,
732 metadata_file: DirectIoFile,
733 metadata_header: PlotMetadataHeader,
734 target_sector_count: u16,
735 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
736 piece_cache_capacity: u32,
737 plot_cache: DiskPlotCache,
738}
739
740#[derive(Debug)]
745#[must_use = "Plot does not function properly unless run() method is called"]
746pub struct SingleDiskFarm {
747 farmer_protocol_info: FarmerProtocolInfo,
748 single_disk_farm_info: SingleDiskFarmInfo,
749 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
751 pieces_in_sector: u16,
752 total_sectors_count: SectorIndex,
753 span: Span,
754 tasks: FuturesUnordered<BackgroundTask>,
755 handlers: Arc<Handlers>,
756 piece_cache: SingleDiskPieceCache,
757 plot_cache: DiskPlotCache,
758 piece_reader: DiskPieceReader,
759 start_sender: Option<broadcast::Sender<()>>,
761 stop_sender: Option<broadcast::Sender<()>>,
763 _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
764}
765
766impl Drop for SingleDiskFarm {
767 #[inline]
768 fn drop(&mut self) {
769 self.piece_reader.close_all_readers();
770 self.start_sender.take();
772 self.stop_sender.take();
774 }
775}
776
777#[async_trait(?Send)]
778impl Farm for SingleDiskFarm {
779 fn id(&self) -> &FarmId {
780 self.id()
781 }
782
783 fn total_sectors_count(&self) -> SectorIndex {
784 self.total_sectors_count
785 }
786
787 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
788 Arc::new(self.plotted_sectors())
789 }
790
791 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
792 Arc::new(self.piece_reader())
793 }
794
795 fn on_sector_update(
796 &self,
797 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
798 ) -> Box<dyn farm::HandlerId> {
799 Box::new(self.on_sector_update(callback))
800 }
801
802 fn on_farming_notification(
803 &self,
804 callback: HandlerFn<FarmingNotification>,
805 ) -> Box<dyn farm::HandlerId> {
806 Box::new(self.on_farming_notification(callback))
807 }
808
809 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
810 Box::new(self.on_solution(callback))
811 }
812
813 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
814 Box::pin((*self).run())
815 }
816}
817
818impl SingleDiskFarm {
819 pub const PLOT_FILE: &'static str = "plot.bin";
821 pub const METADATA_FILE: &'static str = "metadata.bin";
823 const SUPPORTED_PLOT_VERSION: u8 = 0;
824
825 pub async fn new<NC, PosTable>(
827 options: SingleDiskFarmOptions<'_, NC>,
828 farm_index: usize,
829 ) -> Result<Self, SingleDiskFarmError>
830 where
831 NC: NodeClient + Clone,
832 PosTable: Table,
833 {
834 let span = Span::current();
835
836 let SingleDiskFarmOptions {
837 directory,
838 farmer_app_info,
839 allocated_space,
840 max_pieces_in_sector,
841 node_client,
842 reward_address,
843 plotter,
844 kzg,
845 erasure_coding,
846 cache_percentage,
847 farming_thread_pool_size,
848 plotting_delay,
849 global_mutex,
850 max_plotting_sectors_per_farm,
851 disable_farm_locking,
852 read_sector_record_chunks_mode,
853 registry,
854 create,
855 } = options;
856
857 let single_disk_farm_init_fut = task::spawn_blocking({
858 let directory = directory.clone();
859 let farmer_app_info = farmer_app_info.clone();
860 let span = span.clone();
861
862 move || {
863 let _span_guard = span.enter();
864 Self::init(
865 &directory,
866 &farmer_app_info,
867 allocated_space,
868 max_pieces_in_sector,
869 cache_percentage,
870 disable_farm_locking,
871 create,
872 )
873 }
874 });
875
876 let single_disk_farm_init =
877 AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
878
879 let SingleDiskFarmInit {
880 identity,
881 single_disk_farm_info,
882 single_disk_farm_info_lock,
883 plot_file,
884 metadata_file,
885 metadata_header,
886 target_sector_count,
887 sectors_metadata,
888 piece_cache_capacity,
889 plot_cache,
890 } = single_disk_farm_init;
891
892 let piece_cache = {
893 let FarmId::Ulid(id) = *single_disk_farm_info.id();
895 let id = PieceCacheId::Ulid(id);
896
897 SingleDiskPieceCache::new(
898 id,
899 if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
900 Some(task::block_in_place(|| {
901 if let Some(registry) = registry {
902 DiskPieceCache::open(
903 &directory,
904 piece_cache_capacity,
905 Some(id),
906 Some(*registry.lock()),
907 )
908 } else {
909 DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
910 }
911 })?)
912 } else {
913 None
914 },
915 )
916 };
917
918 let public_key = *single_disk_farm_info.public_key();
919 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
920 let sector_size = sector_size(pieces_in_sector);
921
922 let metrics = registry.map(|registry| {
923 Arc::new(SingleDiskFarmMetrics::new(
924 *registry.lock(),
925 single_disk_farm_info.id(),
926 target_sector_count,
927 sectors_metadata.read_blocking().len() as SectorIndex,
928 ))
929 });
930
931 let (error_sender, error_receiver) = oneshot::channel();
932 let error_sender = Arc::new(Mutex::new(Some(error_sender)));
933
934 let tasks = FuturesUnordered::<BackgroundTask>::new();
935
936 tasks.push(Box::pin(async move {
937 if let Ok(error) = error_receiver.await {
938 return Err(error);
939 }
940
941 Ok(())
942 }));
943
944 let handlers = Arc::<Handlers>::default();
945 let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
946 let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
947 let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
948 let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
949 let sectors_indices_left_to_plot =
951 metadata_header.plotted_sector_count..target_sector_count;
952
953 let farming_thread_pool = ThreadPoolBuilder::new()
954 .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}"))
955 .num_threads(farming_thread_pool_size)
956 .spawn_handler(tokio_rayon_spawn_handler())
957 .build()
958 .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
959 let farming_plot_fut = task::spawn_blocking(|| {
960 farming_thread_pool
961 .install(move || {
962 RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
963 DirectIoFile::open(path)
964 })
965 })
966 .map(|farming_plot| (farming_plot, farming_thread_pool))
967 });
968
969 let (farming_plot, farming_thread_pool) =
970 AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
971
972 let plotting_join_handle = task::spawn_blocking({
973 let sectors_metadata = Arc::clone(§ors_metadata);
974 let handlers = Arc::clone(&handlers);
975 let sectors_being_modified = Arc::clone(§ors_being_modified);
976 let node_client = node_client.clone();
977 let plot_file = Arc::clone(&plot_file);
978 let error_sender = Arc::clone(&error_sender);
979 let span = span.clone();
980 let global_mutex = Arc::clone(&global_mutex);
981 let metrics = metrics.clone();
982
983 move || {
984 let _span_guard = span.enter();
985
986 let plotting_options = PlottingOptions {
987 metadata_header,
988 sectors_metadata: §ors_metadata,
989 sectors_being_modified: §ors_being_modified,
990 sectors_to_plot_receiver,
991 sector_plotting_options: SectorPlottingOptions {
992 public_key,
993 node_client: &node_client,
994 pieces_in_sector,
995 sector_size,
996 plot_file,
997 metadata_file: Arc::new(metadata_file),
998 handlers: &handlers,
999 global_mutex: &global_mutex,
1000 plotter,
1001 metrics,
1002 },
1003 max_plotting_sectors_per_farm,
1004 };
1005
1006 let plotting_fut = async {
1007 if start_receiver.recv().await.is_err() {
1008 return Ok(());
1010 }
1011
1012 if let Some(plotting_delay) = plotting_delay
1013 && plotting_delay.await.is_err()
1014 {
1015 return Ok(());
1017 }
1018
1019 plotting(plotting_options).await
1020 };
1021
1022 Handle::current().block_on(async {
1023 select! {
1024 plotting_result = plotting_fut.fuse() => {
1025 if let Err(error) = plotting_result
1026 && let Some(error_sender) = error_sender.lock().take()
1027 && let Err(error) = error_sender.send(error.into())
1028 {
1029 error!(
1030 %error,
1031 "Plotting failed to send error to background task"
1032 );
1033 }
1034 }
1035 _ = stop_receiver.recv().fuse() => {
1036 }
1038 }
1039 });
1040 }
1041 });
1042 let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1043
1044 tasks.push(Box::pin(async move {
1045 plotting_join_handle.await.map_err(|_error| {
1047 BackgroundTaskError::BackgroundTaskPanicked {
1048 task: format!("plotting-{farm_index}"),
1049 }
1050 })
1051 }));
1052
1053 let plotting_scheduler_options = PlottingSchedulerOptions {
1054 public_key_hash: public_key.hash(),
1055 sectors_indices_left_to_plot,
1056 target_sector_count,
1057 last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1058 min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1059 node_client: node_client.clone(),
1060 handlers: Arc::clone(&handlers),
1061 sectors_metadata: Arc::clone(§ors_metadata),
1062 sectors_to_plot_sender,
1063 new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1064 metrics: metrics.clone(),
1065 };
1066 tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1067
1068 let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1069
1070 tasks.push(Box::pin({
1071 let node_client = node_client.clone();
1072 let metrics = metrics.clone();
1073
1074 async move {
1075 slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1076 .await
1077 .map_err(BackgroundTaskError::Farming)
1078 }
1079 }));
1080
1081 let farming_join_handle = task::spawn_blocking({
1082 let erasure_coding = erasure_coding.clone();
1083 let handlers = Arc::clone(&handlers);
1084 let sectors_being_modified = Arc::clone(§ors_being_modified);
1085 let sectors_metadata = Arc::clone(§ors_metadata);
1086 let mut start_receiver = start_sender.subscribe();
1087 let mut stop_receiver = stop_sender.subscribe();
1088 let node_client = node_client.clone();
1089 let span = span.clone();
1090 let global_mutex = Arc::clone(&global_mutex);
1091
1092 move || {
1093 let _span_guard = span.enter();
1094
1095 let farming_fut = async move {
1096 if start_receiver.recv().await.is_err() {
1097 return Ok(());
1099 }
1100
1101 let plot_audit = PlotAudit::new(&farming_plot);
1102
1103 let farming_options = FarmingOptions {
1104 public_key,
1105 reward_address,
1106 node_client,
1107 plot_audit,
1108 sectors_metadata,
1109 kzg,
1110 erasure_coding,
1111 handlers,
1112 sectors_being_modified,
1113 slot_info_notifications: slot_info_forwarder_receiver,
1114 thread_pool: farming_thread_pool,
1115 read_sector_record_chunks_mode,
1116 global_mutex,
1117 metrics,
1118 };
1119 farming::<PosTable, _, _>(farming_options).await
1120 };
1121
1122 Handle::current().block_on(async {
1123 select! {
1124 farming_result = farming_fut.fuse() => {
1125 if let Err(error) = farming_result
1126 && let Some(error_sender) = error_sender.lock().take()
1127 && let Err(error) = error_sender.send(error.into())
1128 {
1129 error!(
1130 %error,
1131 "Farming failed to send error to background task",
1132 );
1133 }
1134 }
1135 _ = stop_receiver.recv().fuse() => {
1136 }
1138 }
1139 });
1140 }
1141 });
1142 let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1143
1144 tasks.push(Box::pin(async move {
1145 farming_join_handle.await.map_err(|_error| {
1147 BackgroundTaskError::BackgroundTaskPanicked {
1148 task: format!("farming-{farm_index}"),
1149 }
1150 })
1151 }));
1152
1153 let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1154 public_key,
1155 pieces_in_sector,
1156 plot_file,
1157 Arc::clone(§ors_metadata),
1158 erasure_coding,
1159 sectors_being_modified,
1160 read_sector_record_chunks_mode,
1161 global_mutex,
1162 );
1163
1164 let reading_join_handle = task::spawn_blocking({
1165 let mut stop_receiver = stop_sender.subscribe();
1166 let reading_fut = reading_fut.instrument(span.clone());
1167
1168 move || {
1169 Handle::current().block_on(async {
1170 select! {
1171 _ = reading_fut.fuse() => {
1172 }
1174 _ = stop_receiver.recv().fuse() => {
1175 }
1177 }
1178 });
1179 }
1180 });
1181
1182 let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1183
1184 tasks.push(Box::pin(async move {
1185 reading_join_handle.await.map_err(|_error| {
1187 BackgroundTaskError::BackgroundTaskPanicked {
1188 task: format!("reading-{farm_index}"),
1189 }
1190 })
1191 }));
1192
1193 tasks.push(Box::pin(async move {
1194 match reward_signing(node_client, identity).await {
1195 Ok(reward_signing_fut) => {
1196 reward_signing_fut.await;
1197 }
1198 Err(error) => {
1199 return Err(BackgroundTaskError::RewardSigning(anyhow::anyhow!(
1200 "Failed to subscribe to reward signing notifications: {error}"
1201 )));
1202 }
1203 }
1204
1205 Ok(())
1206 }));
1207
1208 let farm = Self {
1209 farmer_protocol_info: farmer_app_info.protocol_info,
1210 single_disk_farm_info,
1211 sectors_metadata,
1212 pieces_in_sector,
1213 total_sectors_count: target_sector_count,
1214 span,
1215 tasks,
1216 handlers,
1217 piece_cache,
1218 plot_cache,
1219 piece_reader,
1220 start_sender: Some(start_sender),
1221 stop_sender: Some(stop_sender),
1222 _single_disk_farm_info_lock: single_disk_farm_info_lock,
1223 };
1224 Ok(farm)
1225 }
1226
1227 fn init(
1228 directory: &PathBuf,
1229 farmer_app_info: &FarmerAppInfo,
1230 allocated_space: u64,
1231 max_pieces_in_sector: u16,
1232 cache_percentage: u8,
1233 disable_farm_locking: bool,
1234 create: bool,
1235 ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1236 fs::create_dir_all(directory)?;
1237
1238 let identity = if create {
1239 Identity::open_or_create(directory)?
1240 } else {
1241 Identity::open(directory)?.ok_or_else(|| {
1242 IdentityError::Io(io::Error::new(
1243 io::ErrorKind::NotFound,
1244 "Farm does not exist and creation was explicitly disabled",
1245 ))
1246 })?
1247 };
1248 let public_key = identity.public_key().to_bytes().into();
1249
1250 let (single_disk_farm_info, single_disk_farm_info_lock) =
1251 match SingleDiskFarmInfo::load_from(directory)? {
1252 Some(mut single_disk_farm_info) => {
1253 if &farmer_app_info.genesis_hash != single_disk_farm_info.genesis_hash() {
1254 return Err(SingleDiskFarmError::WrongChain {
1255 id: *single_disk_farm_info.id(),
1256 correct_chain: hex::encode(single_disk_farm_info.genesis_hash()),
1257 wrong_chain: hex::encode(farmer_app_info.genesis_hash),
1258 });
1259 }
1260
1261 if &public_key != single_disk_farm_info.public_key() {
1262 return Err(SingleDiskFarmError::IdentityMismatch {
1263 id: *single_disk_farm_info.id(),
1264 correct_public_key: *single_disk_farm_info.public_key(),
1265 wrong_public_key: public_key,
1266 });
1267 }
1268
1269 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1270
1271 if max_pieces_in_sector < pieces_in_sector {
1272 return Err(SingleDiskFarmError::InvalidPiecesInSector {
1273 id: *single_disk_farm_info.id(),
1274 max_supported: max_pieces_in_sector,
1275 initialized_with: pieces_in_sector,
1276 });
1277 }
1278
1279 if max_pieces_in_sector > pieces_in_sector {
1280 info!(
1281 pieces_in_sector,
1282 max_pieces_in_sector,
1283 "Farm initialized with smaller number of pieces in sector, farm needs \
1284 to be re-created for increase"
1285 );
1286 }
1287
1288 let mut single_disk_farm_info_lock = None;
1289
1290 if allocated_space != single_disk_farm_info.allocated_space() {
1291 info!(
1292 old_space = %bytesize::to_string(single_disk_farm_info.allocated_space(), true),
1293 new_space = %bytesize::to_string(allocated_space, true),
1294 "Farm size has changed"
1295 );
1296
1297 let new_allocated_space = allocated_space;
1298 match &mut single_disk_farm_info {
1299 SingleDiskFarmInfo::V0 {
1300 allocated_space, ..
1301 } => {
1302 *allocated_space = new_allocated_space;
1303 }
1304 }
1305
1306 single_disk_farm_info_lock =
1307 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1308 } else if !disable_farm_locking {
1309 single_disk_farm_info_lock = Some(
1310 SingleDiskFarmInfo::try_lock(directory)
1311 .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1312 );
1313 }
1314
1315 (single_disk_farm_info, single_disk_farm_info_lock)
1316 }
1317 None => {
1318 let single_disk_farm_info = SingleDiskFarmInfo::new(
1319 FarmId::new(),
1320 farmer_app_info.genesis_hash,
1321 public_key,
1322 max_pieces_in_sector,
1323 allocated_space,
1324 );
1325
1326 let single_disk_farm_info_lock =
1327 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1328
1329 (single_disk_farm_info, single_disk_farm_info_lock)
1330 }
1331 };
1332
1333 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1334 let sector_size = sector_size(pieces_in_sector) as u64;
1335 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1336 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1337 allocated_space,
1338 sector_size,
1339 cache_percentage,
1340 sector_metadata_size as u64,
1341 )?;
1342 let target_sector_count = allocated_space_distribution.target_sector_count;
1343
1344 let metadata_file_path = directory.join(Self::METADATA_FILE);
1345 let metadata_file = DirectIoFile::open(&metadata_file_path)?;
1346
1347 let metadata_size = metadata_file.size()?;
1348 let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1349 let expected_metadata_size =
1351 expected_metadata_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
1352 let metadata_header = if metadata_size == 0 {
1353 let metadata_header = PlotMetadataHeader {
1354 version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1355 plotted_sector_count: 0,
1356 };
1357
1358 metadata_file
1359 .preallocate(expected_metadata_size)
1360 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1361 metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1362
1363 metadata_header
1364 } else {
1365 if metadata_size != expected_metadata_size {
1366 metadata_file
1369 .preallocate(expected_metadata_size)
1370 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1371 metadata_file.set_len(expected_metadata_size)?;
1373 }
1374
1375 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1376 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1377
1378 let mut metadata_header =
1379 PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1380 .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1381
1382 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1383 return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1384 metadata_header.version,
1385 ));
1386 }
1387
1388 if metadata_header.plotted_sector_count > target_sector_count {
1389 metadata_header.plotted_sector_count = target_sector_count;
1390 metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1391 }
1392
1393 metadata_header
1394 };
1395
1396 let sectors_metadata = {
1397 let mut sectors_metadata =
1398 Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1399
1400 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1401 for sector_index in 0..metadata_header.plotted_sector_count {
1402 let sector_offset =
1403 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1404 metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1405
1406 let sector_metadata =
1407 match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1408 Ok(sector_metadata) => sector_metadata,
1409 Err(error) => {
1410 warn!(
1411 path = %metadata_file_path.display(),
1412 %error,
1413 %sector_index,
1414 "Failed to decode sector metadata, replacing with dummy expired \
1415 sector metadata"
1416 );
1417
1418 let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1419 sector_index,
1420 pieces_in_sector,
1421 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1422 history_size: HistorySize::from(SegmentIndex::ZERO),
1423 });
1424 metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1425
1426 dummy_sector
1427 }
1428 };
1429 sectors_metadata.push(sector_metadata);
1430 }
1431
1432 Arc::new(AsyncRwLock::new(sectors_metadata))
1433 };
1434
1435 let plot_file = DirectIoFile::open(directory.join(Self::PLOT_FILE))?;
1436
1437 if plot_file.size()? != allocated_space_distribution.plot_file_size {
1438 plot_file
1441 .preallocate(allocated_space_distribution.plot_file_size)
1442 .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1443 plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1445 }
1446
1447 let plot_file = Arc::new(plot_file);
1448
1449 let plot_cache = DiskPlotCache::new(
1450 &plot_file,
1451 §ors_metadata,
1452 target_sector_count,
1453 sector_size,
1454 );
1455
1456 Ok(SingleDiskFarmInit {
1457 identity,
1458 single_disk_farm_info,
1459 single_disk_farm_info_lock,
1460 plot_file,
1461 metadata_file,
1462 metadata_header,
1463 target_sector_count,
1464 sectors_metadata,
1465 piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1466 plot_cache,
1467 })
1468 }
1469
1470 pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1472 let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1473 Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1474 Ok(None) => {
1475 return SingleDiskFarmSummary::NotFound { directory };
1476 }
1477 Err(error) => {
1478 return SingleDiskFarmSummary::Error { directory, error };
1479 }
1480 };
1481
1482 SingleDiskFarmSummary::Found {
1483 info: single_disk_farm_info,
1484 directory,
1485 }
1486 }
1487
1488 pub fn effective_disk_usage(
1494 directory: &Path,
1495 cache_percentage: u8,
1496 ) -> Result<u64, SingleDiskFarmError> {
1497 let mut effective_disk_usage;
1498 match SingleDiskFarmInfo::load_from(directory)? {
1499 Some(single_disk_farm_info) => {
1500 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1501 single_disk_farm_info.allocated_space(),
1502 sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1503 cache_percentage,
1504 SectorMetadataChecksummed::encoded_size() as u64,
1505 )?;
1506
1507 effective_disk_usage = single_disk_farm_info.allocated_space();
1508 effective_disk_usage -= Identity::file_size() as u64;
1509 effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1510 effective_disk_usage -= allocated_space_distribution.plot_file_size;
1511 effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1512 }
1513 None => {
1514 effective_disk_usage = 0;
1516 }
1517 };
1518
1519 if Identity::open(directory)?.is_some() {
1520 effective_disk_usage += Identity::file_size() as u64;
1521 }
1522
1523 match OpenOptions::new()
1524 .read(true)
1525 .open(directory.join(Self::METADATA_FILE))
1526 {
1527 Ok(metadata_file) => {
1528 effective_disk_usage += metadata_file.size()?;
1529 }
1530 Err(error) => {
1531 if error.kind() == io::ErrorKind::NotFound {
1532 } else {
1534 return Err(error.into());
1535 }
1536 }
1537 };
1538
1539 match OpenOptions::new()
1540 .read(true)
1541 .open(directory.join(Self::PLOT_FILE))
1542 {
1543 Ok(plot_file) => {
1544 effective_disk_usage += plot_file.size()?;
1545 }
1546 Err(error) => {
1547 if error.kind() == io::ErrorKind::NotFound {
1548 } else {
1550 return Err(error.into());
1551 }
1552 }
1553 };
1554
1555 match OpenOptions::new()
1556 .read(true)
1557 .open(directory.join(DiskPieceCache::FILE_NAME))
1558 {
1559 Ok(piece_cache) => {
1560 effective_disk_usage += piece_cache.size()?;
1561 }
1562 Err(error) => {
1563 if error.kind() == io::ErrorKind::NotFound {
1564 } else {
1566 return Err(error.into());
1567 }
1568 }
1569 };
1570
1571 Ok(effective_disk_usage)
1572 }
1573
1574 pub fn read_all_sectors_metadata(
1576 directory: &Path,
1577 ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1578 let metadata_file = DirectIoFile::open(directory.join(Self::METADATA_FILE))?;
1579
1580 let metadata_size = metadata_file.size()?;
1581 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1582
1583 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1584 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1585
1586 let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1587 .map_err(|error| {
1588 io::Error::other(format!("Failed to decode metadata header: {error}"))
1589 })?;
1590
1591 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1592 return Err(io::Error::other(format!(
1593 "Unsupported metadata version {}",
1594 metadata_header.version
1595 )));
1596 }
1597
1598 let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1599 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1600 );
1601
1602 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1603 for sector_index in 0..metadata_header.plotted_sector_count {
1604 metadata_file.read_exact_at(
1605 &mut sector_metadata_bytes,
1606 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1607 )?;
1608 sectors_metadata.push(
1609 SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1610 |error| io::Error::other(format!("Failed to decode sector metadata: {error}")),
1611 )?,
1612 );
1613 }
1614
1615 Ok(sectors_metadata)
1616 }
1617
1618 pub fn id(&self) -> &FarmId {
1620 self.single_disk_farm_info.id()
1621 }
1622
1623 pub fn info(&self) -> &SingleDiskFarmInfo {
1625 &self.single_disk_farm_info
1626 }
1627
1628 pub fn total_sectors_count(&self) -> SectorIndex {
1630 self.total_sectors_count
1631 }
1632
1633 pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1635 SingleDiskPlottedSectors {
1636 public_key: *self.single_disk_farm_info.public_key(),
1637 pieces_in_sector: self.pieces_in_sector,
1638 farmer_protocol_info: self.farmer_protocol_info,
1639 sectors_metadata: Arc::clone(&self.sectors_metadata),
1640 }
1641 }
1642
1643 pub fn piece_cache(&self) -> SingleDiskPieceCache {
1645 self.piece_cache.clone()
1646 }
1647
1648 pub fn plot_cache(&self) -> DiskPlotCache {
1650 self.plot_cache.clone()
1651 }
1652
1653 pub fn piece_reader(&self) -> DiskPieceReader {
1655 self.piece_reader.clone()
1656 }
1657
1658 pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1660 self.handlers.sector_update.add(callback)
1661 }
1662
1663 pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1665 self.handlers.farming_notification.add(callback)
1666 }
1667
1668 pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1670 self.handlers.solution.add(callback)
1671 }
1672
1673 pub async fn run(mut self) -> anyhow::Result<()> {
1675 if let Some(start_sender) = self.start_sender.take() {
1676 let _ = start_sender.send(());
1678 }
1679
1680 while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1681 result?;
1682 }
1683
1684 Ok(())
1685 }
1686
1687 pub fn wipe(directory: &Path) -> io::Result<()> {
1689 let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1690 match SingleDiskFarmInfo::load_from(directory) {
1691 Ok(Some(single_disk_farm_info)) => {
1692 info!("Found single disk farm {}", single_disk_farm_info.id());
1693 }
1694 Ok(None) => {
1695 return Err(io::Error::new(
1696 io::ErrorKind::NotFound,
1697 format!(
1698 "Single disk farm info not found at {}",
1699 single_disk_info_info_path.display()
1700 ),
1701 ));
1702 }
1703 Err(error) => {
1704 warn!("Found unknown single disk farm: {}", error);
1705 }
1706 }
1707
1708 {
1709 let plot = directory.join(Self::PLOT_FILE);
1710 if plot.exists() {
1711 info!("Deleting plot file at {}", plot.display());
1712 fs::remove_file(plot)?;
1713 }
1714 }
1715 {
1716 let metadata = directory.join(Self::METADATA_FILE);
1717 if metadata.exists() {
1718 info!("Deleting metadata file at {}", metadata.display());
1719 fs::remove_file(metadata)?;
1720 }
1721 }
1722 {
1725 let identity = directory.join("identity.bin");
1726 if identity.exists() {
1727 info!("Deleting identity file at {}", identity.display());
1728 fs::remove_file(identity)?;
1729 }
1730 }
1731
1732 DiskPieceCache::wipe(directory)?;
1733
1734 info!(
1735 "Deleting info file at {}",
1736 single_disk_info_info_path.display()
1737 );
1738 fs::remove_file(single_disk_info_info_path)
1739 }
1740
1741 pub fn scrub(
1744 directory: &Path,
1745 disable_farm_locking: bool,
1746 target: ScrubTarget,
1747 dry_run: bool,
1748 ) -> Result<(), SingleDiskFarmScrubError> {
1749 let span = Span::current();
1750
1751 if dry_run {
1752 info!("Dry run is used, no changes will be written to disk");
1753 }
1754
1755 if target.metadata() || target.plot() {
1756 let info = {
1757 let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1758 info!(path = %file.display(), "Checking info file");
1759
1760 match SingleDiskFarmInfo::load_from(directory) {
1761 Ok(Some(info)) => info,
1762 Ok(None) => {
1763 return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1764 }
1765 Err(error) => {
1766 return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1767 }
1768 }
1769 };
1770
1771 let _single_disk_farm_info_lock = if disable_farm_locking {
1772 None
1773 } else {
1774 Some(
1775 SingleDiskFarmInfo::try_lock(directory)
1776 .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1777 )
1778 };
1779
1780 let identity = {
1781 let file = directory.join(Identity::FILE_NAME);
1782 info!(path = %file.display(), "Checking identity file");
1783
1784 match Identity::open(directory) {
1785 Ok(Some(identity)) => identity,
1786 Ok(None) => {
1787 return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1788 }
1789 Err(error) => {
1790 return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1791 }
1792 }
1793 };
1794
1795 if PublicKey::from(identity.public.to_bytes()) != *info.public_key() {
1796 return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1797 identity: PublicKey::from(identity.public.to_bytes()),
1798 info: *info.public_key(),
1799 });
1800 }
1801
1802 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1803
1804 let metadata_file_path = directory.join(Self::METADATA_FILE);
1805 let (metadata_file, mut metadata_header) = {
1806 info!(path = %metadata_file_path.display(), "Checking metadata file");
1807
1808 let metadata_file = match OpenOptions::new()
1809 .read(true)
1810 .write(!dry_run)
1811 .open(&metadata_file_path)
1812 {
1813 Ok(metadata_file) => metadata_file,
1814 Err(error) => {
1815 return Err(if error.kind() == io::ErrorKind::NotFound {
1816 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1817 file: metadata_file_path,
1818 }
1819 } else {
1820 SingleDiskFarmScrubError::MetadataCantBeOpened {
1821 file: metadata_file_path,
1822 error,
1823 }
1824 });
1825 }
1826 };
1827
1828 let _ = metadata_file.advise_sequential_access();
1830
1831 let metadata_size = match metadata_file.size() {
1832 Ok(metadata_size) => metadata_size,
1833 Err(error) => {
1834 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1835 file: metadata_file_path,
1836 error,
1837 });
1838 }
1839 };
1840
1841 if metadata_size < RESERVED_PLOT_METADATA {
1842 return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1843 file: metadata_file_path,
1844 reserved_size: RESERVED_PLOT_METADATA,
1845 size: metadata_size,
1846 });
1847 }
1848
1849 let mut metadata_header = {
1850 let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1851
1852 if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1853 return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1854 file: metadata_file_path,
1855 size: RESERVED_PLOT_METADATA,
1856 offset: 0,
1857 error,
1858 });
1859 }
1860
1861 PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1862 .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1863 };
1864
1865 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1866 return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1867 metadata_header.version,
1868 ));
1869 }
1870
1871 let plotted_sector_count = metadata_header.plotted_sector_count;
1872
1873 let expected_metadata_size = RESERVED_PLOT_METADATA
1874 + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1875
1876 if metadata_size < expected_metadata_size {
1877 warn!(
1878 %metadata_size,
1879 %expected_metadata_size,
1880 "Metadata file size is smaller than expected, shrinking number of plotted \
1881 sectors to correct value"
1882 );
1883
1884 metadata_header.plotted_sector_count =
1885 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1886 as SectorIndex;
1887 let metadata_header_bytes = metadata_header.encode();
1888
1889 if !dry_run
1890 && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1891 {
1892 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1893 file: metadata_file_path,
1894 size: metadata_header_bytes.len() as u64,
1895 offset: 0,
1896 error,
1897 });
1898 }
1899 }
1900
1901 (metadata_file, metadata_header)
1902 };
1903
1904 let pieces_in_sector = info.pieces_in_sector();
1905 let sector_size = sector_size(pieces_in_sector) as u64;
1906
1907 let plot_file_path = directory.join(Self::PLOT_FILE);
1908 let plot_file = {
1909 let plot_file_path = directory.join(Self::PLOT_FILE);
1910 info!(path = %plot_file_path.display(), "Checking plot file");
1911
1912 let plot_file = match OpenOptions::new()
1913 .read(true)
1914 .write(!dry_run)
1915 .open(&plot_file_path)
1916 {
1917 Ok(plot_file) => plot_file,
1918 Err(error) => {
1919 return Err(if error.kind() == io::ErrorKind::NotFound {
1920 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1921 file: plot_file_path,
1922 }
1923 } else {
1924 SingleDiskFarmScrubError::MetadataCantBeOpened {
1925 file: plot_file_path,
1926 error,
1927 }
1928 });
1929 }
1930 };
1931
1932 let _ = plot_file.advise_sequential_access();
1934
1935 let plot_size = match plot_file.size() {
1936 Ok(metadata_size) => metadata_size,
1937 Err(error) => {
1938 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1939 file: plot_file_path,
1940 error,
1941 });
1942 }
1943 };
1944
1945 let min_expected_plot_size =
1946 u64::from(metadata_header.plotted_sector_count) * sector_size;
1947 if plot_size < min_expected_plot_size {
1948 warn!(
1949 %plot_size,
1950 %min_expected_plot_size,
1951 "Plot file size is smaller than expected, shrinking number of plotted \
1952 sectors to correct value"
1953 );
1954
1955 metadata_header.plotted_sector_count = (plot_size / sector_size) as SectorIndex;
1956 let metadata_header_bytes = metadata_header.encode();
1957
1958 if !dry_run
1959 && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1960 {
1961 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1962 file: plot_file_path,
1963 size: metadata_header_bytes.len() as u64,
1964 offset: 0,
1965 error,
1966 });
1967 }
1968 }
1969
1970 plot_file
1971 };
1972
1973 let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1974
1975 info!("Checking sectors and corresponding metadata");
1976 (0..metadata_header.plotted_sector_count)
1977 .into_par_iter()
1978 .map_init(
1979 || vec![0u8; Record::SIZE],
1980 |scratch_buffer, sector_index| {
1981 let _span_guard = span.enter();
1982
1983 let offset = RESERVED_PLOT_METADATA
1984 + u64::from(sector_index) * sector_metadata_size as u64;
1985 if let Err(error) = metadata_file
1986 .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
1987 {
1988 warn!(
1989 path = %metadata_file_path.display(),
1990 %error,
1991 %offset,
1992 size = %sector_metadata_size,
1993 %sector_index,
1994 "Failed to read sector metadata, replacing with dummy expired \
1995 sector metadata"
1996 );
1997
1998 if !dry_run {
1999 write_dummy_sector_metadata(
2000 &metadata_file,
2001 &metadata_file_path,
2002 sector_index,
2003 pieces_in_sector,
2004 )?;
2005 }
2006 return Ok(());
2007 }
2008
2009 let sector_metadata = match SectorMetadataChecksummed::decode(
2010 &mut &scratch_buffer[..sector_metadata_size],
2011 ) {
2012 Ok(sector_metadata) => sector_metadata,
2013 Err(error) => {
2014 warn!(
2015 path = %metadata_file_path.display(),
2016 %error,
2017 %sector_index,
2018 "Failed to decode sector metadata, replacing with dummy \
2019 expired sector metadata"
2020 );
2021
2022 if !dry_run {
2023 write_dummy_sector_metadata(
2024 &metadata_file,
2025 &metadata_file_path,
2026 sector_index,
2027 pieces_in_sector,
2028 )?;
2029 }
2030 return Ok(());
2031 }
2032 };
2033
2034 if sector_metadata.sector_index != sector_index {
2035 warn!(
2036 path = %metadata_file_path.display(),
2037 %sector_index,
2038 found_sector_index = sector_metadata.sector_index,
2039 "Sector index mismatch, replacing with dummy expired sector \
2040 metadata"
2041 );
2042
2043 if !dry_run {
2044 write_dummy_sector_metadata(
2045 &metadata_file,
2046 &metadata_file_path,
2047 sector_index,
2048 pieces_in_sector,
2049 )?;
2050 }
2051 return Ok(());
2052 }
2053
2054 if sector_metadata.pieces_in_sector != pieces_in_sector {
2055 warn!(
2056 path = %metadata_file_path.display(),
2057 %sector_index,
2058 %pieces_in_sector,
2059 found_pieces_in_sector = sector_metadata.pieces_in_sector,
2060 "Pieces in sector mismatch, replacing with dummy expired sector \
2061 metadata"
2062 );
2063
2064 if !dry_run {
2065 write_dummy_sector_metadata(
2066 &metadata_file,
2067 &metadata_file_path,
2068 sector_index,
2069 pieces_in_sector,
2070 )?;
2071 }
2072 return Ok(());
2073 }
2074
2075 if target.plot() {
2076 let mut hasher = blake3::Hasher::new();
2077 for offset_in_sector in
2079 sector_bytes_range.clone().step_by(scratch_buffer.len())
2080 {
2081 let offset =
2082 u64::from(sector_index) * sector_size + offset_in_sector as u64;
2083 let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2084 .min(sector_bytes_range.end)
2085 - offset_in_sector;
2086
2087 let bytes = &mut scratch_buffer[..bytes_to_read];
2088
2089 if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2090 warn!(
2091 path = %plot_file_path.display(),
2092 %error,
2093 %sector_index,
2094 %offset,
2095 size = %bytes.len() as u64,
2096 "Failed to read sector bytes"
2097 );
2098
2099 continue;
2100 }
2101
2102 hasher.update(bytes);
2103 }
2104
2105 let actual_checksum = *hasher.finalize().as_bytes();
2106 let mut expected_checksum = [0; Blake3Hash::SIZE];
2107 {
2108 let offset = u64::from(sector_index) * sector_size
2109 + sector_bytes_range.end as u64;
2110 if let Err(error) =
2111 plot_file.read_exact_at(&mut expected_checksum, offset)
2112 {
2113 warn!(
2114 path = %plot_file_path.display(),
2115 %error,
2116 %sector_index,
2117 %offset,
2118 size = %expected_checksum.len() as u64,
2119 "Failed to read sector checksum bytes"
2120 );
2121 }
2122 }
2123
2124 if actual_checksum != expected_checksum {
2126 warn!(
2127 path = %plot_file_path.display(),
2128 %sector_index,
2129 actual_checksum = %hex::encode(actual_checksum),
2130 expected_checksum = %hex::encode(expected_checksum),
2131 "Plotted sector checksum mismatch, replacing with dummy \
2132 expired sector"
2133 );
2134
2135 if !dry_run {
2136 write_dummy_sector_metadata(
2137 &metadata_file,
2138 &metadata_file_path,
2139 sector_index,
2140 pieces_in_sector,
2141 )?;
2142 }
2143
2144 scratch_buffer.fill(0);
2145
2146 hasher.reset();
2147 for offset_in_sector in
2149 sector_bytes_range.clone().step_by(scratch_buffer.len())
2150 {
2151 let offset = u64::from(sector_index) * sector_size
2152 + offset_in_sector as u64;
2153 let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2154 .min(sector_bytes_range.end)
2155 - offset_in_sector;
2156 let bytes = &mut scratch_buffer[..bytes_to_write];
2157
2158 if !dry_run
2159 && let Err(error) = plot_file.write_all_at(bytes, offset)
2160 {
2161 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2162 file: plot_file_path.clone(),
2163 size: scratch_buffer.len() as u64,
2164 offset,
2165 error,
2166 });
2167 }
2168
2169 hasher.update(bytes);
2170 }
2171 {
2173 let checksum = *hasher.finalize().as_bytes();
2174 let offset = u64::from(sector_index) * sector_size
2175 + sector_bytes_range.end as u64;
2176 if !dry_run
2177 && let Err(error) =
2178 plot_file.write_all_at(&checksum, offset)
2179 {
2180 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2181 file: plot_file_path.clone(),
2182 size: checksum.len() as u64,
2183 offset,
2184 error,
2185 });
2186 }
2187 }
2188
2189 return Ok(());
2190 }
2191 }
2192
2193 trace!(%sector_index, "Sector is in good shape");
2194
2195 Ok(())
2196 },
2197 )
2198 .try_for_each({
2199 let span = &span;
2200 let checked_sectors = AtomicUsize::new(0);
2201
2202 move |result| {
2203 let _span_guard = span.enter();
2204
2205 let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2206 if checked_sectors > 1 && checked_sectors % 10 == 0 {
2207 info!(
2208 "Checked {}/{} sectors",
2209 checked_sectors, metadata_header.plotted_sector_count
2210 );
2211 }
2212
2213 result
2214 }
2215 })?;
2216 }
2217
2218 if target.cache() {
2219 Self::scrub_cache(directory, dry_run)?;
2220 }
2221
2222 info!("Farm check completed");
2223
2224 Ok(())
2225 }
2226
2227 fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2228 let span = Span::current();
2229
2230 let file = directory.join(DiskPieceCache::FILE_NAME);
2231 info!(path = %file.display(), "Checking cache file");
2232
2233 let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2234 Ok(plot_file) => plot_file,
2235 Err(error) => {
2236 return if error.kind() == io::ErrorKind::NotFound {
2237 warn!(
2238 file = %file.display(),
2239 "Cache file does not exist, this is expected in farming cluster"
2240 );
2241 Ok(())
2242 } else {
2243 Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2244 };
2245 }
2246 };
2247
2248 let _ = cache_file.advise_sequential_access();
2250
2251 let cache_size = match cache_file.size() {
2252 Ok(cache_size) => cache_size,
2253 Err(error) => {
2254 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2255 }
2256 };
2257
2258 let element_size = DiskPieceCache::element_size();
2259 let number_of_cached_elements = cache_size / u64::from(element_size);
2260 let dummy_element = vec![0; element_size as usize];
2261 (0..number_of_cached_elements)
2262 .into_par_iter()
2263 .map_with(vec![0; element_size as usize], |element, cache_offset| {
2264 let _span_guard = span.enter();
2265
2266 let offset = cache_offset * u64::from(element_size);
2267 if let Err(error) = cache_file.read_exact_at(element, offset) {
2268 warn!(
2269 path = %file.display(),
2270 %cache_offset,
2271 size = %element.len() as u64,
2272 %offset,
2273 %error,
2274 "Failed to read cached piece, replacing with dummy element"
2275 );
2276
2277 if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2278 {
2279 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2280 file: file.clone(),
2281 size: u64::from(element_size),
2282 offset,
2283 error,
2284 });
2285 }
2286
2287 return Ok(());
2288 }
2289
2290 let (index_and_piece_bytes, expected_checksum) =
2291 element.split_at(element_size as usize - Blake3Hash::SIZE);
2292 let actual_checksum = blake3_hash(index_and_piece_bytes);
2293 if *actual_checksum != *expected_checksum && element != &dummy_element {
2294 warn!(
2295 %cache_offset,
2296 actual_checksum = %hex::encode(actual_checksum),
2297 expected_checksum = %hex::encode(expected_checksum),
2298 "Cached piece checksum mismatch, replacing with dummy element"
2299 );
2300
2301 if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2302 {
2303 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2304 file: file.clone(),
2305 size: u64::from(element_size),
2306 offset,
2307 error,
2308 });
2309 }
2310
2311 return Ok(());
2312 }
2313
2314 Ok(())
2315 })
2316 .try_for_each({
2317 let span = &span;
2318 let checked_elements = AtomicUsize::new(0);
2319
2320 move |result| {
2321 let _span_guard = span.enter();
2322
2323 let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2324 if checked_elements > 1 && checked_elements % 1000 == 0 {
2325 info!(
2326 "Checked {}/{} cache elements",
2327 checked_elements, number_of_cached_elements
2328 );
2329 }
2330
2331 result
2332 }
2333 })?;
2334
2335 Ok(())
2336 }
2337}
2338
2339fn write_dummy_sector_metadata(
2340 metadata_file: &File,
2341 metadata_file_path: &Path,
2342 sector_index: SectorIndex,
2343 pieces_in_sector: u16,
2344) -> Result<(), SingleDiskFarmScrubError> {
2345 let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2346 sector_index,
2347 pieces_in_sector,
2348 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2349 history_size: HistorySize::from(SegmentIndex::ZERO),
2350 })
2351 .encode();
2352 let sector_offset = RESERVED_PLOT_METADATA
2353 + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2354 metadata_file
2355 .write_all_at(&dummy_sector_bytes, sector_offset)
2356 .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2357 file: metadata_file_path.to_path_buf(),
2358 size: dummy_sector_bytes.len() as u64,
2359 offset: sector_offset,
2360 error,
2361 })
2362}