1pub mod direct_io_file;
8pub mod farming;
9pub mod identity;
10mod metrics;
11pub mod piece_cache;
12pub mod piece_reader;
13pub mod plot_cache;
14mod plotted_sectors;
15mod plotting;
16mod reward_signing;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20 Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21 PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE};
26use crate::single_disk_farm::farming::rayon_files::RayonFiles;
27use crate::single_disk_farm::farming::{
28 farming, slot_notification_forwarder, FarmingOptions, PlotAudit,
29};
30use crate::single_disk_farm::identity::{Identity, IdentityError};
31use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
32use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
33use crate::single_disk_farm::piece_reader::DiskPieceReader;
34use crate::single_disk_farm::plot_cache::DiskPlotCache;
35use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
36pub use crate::single_disk_farm::plotting::PlottingError;
37use crate::single_disk_farm::plotting::{
38 plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions,
39};
40use crate::single_disk_farm::reward_signing::reward_signing;
41use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop};
42use crate::{farm, KNOWN_PEERS_CACHE_SIZE};
43use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
44use async_trait::async_trait;
45use event_listener_primitives::{Bag, HandlerId};
46use futures::channel::{mpsc, oneshot};
47use futures::stream::FuturesUnordered;
48use futures::{select, FutureExt, StreamExt};
49use parity_scale_codec::{Decode, Encode};
50use parking_lot::Mutex;
51use prometheus_client::registry::Registry;
52use rayon::prelude::*;
53use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
54use serde::{Deserialize, Serialize};
55use static_assertions::const_assert;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::future::Future;
59use std::io::Write;
60use std::num::{NonZeroU32, NonZeroUsize};
61use std::path::{Path, PathBuf};
62use std::pin::Pin;
63use std::str::FromStr;
64use std::sync::atomic::{AtomicUsize, Ordering};
65use std::sync::Arc;
66use std::time::Duration;
67use std::{fmt, fs, io, mem};
68use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
69use subspace_core_primitives::pieces::Record;
70use subspace_core_primitives::sectors::SectorIndex;
71use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
72use subspace_core_primitives::PublicKey;
73use subspace_erasure_coding::ErasureCoding;
74use subspace_farmer_components::file_ext::FileExt;
75use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
76use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
77use subspace_farmer_components::FarmerProtocolInfo;
78use subspace_kzg::Kzg;
79use subspace_networking::KnownPeersManager;
80use subspace_proof_of_space::Table;
81use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
82use thiserror::Error;
83use tokio::runtime::Handle;
84use tokio::sync::broadcast;
85use tokio::task;
86use tracing::{error, info, trace, warn, Instrument, Span};
87
88const_assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
91
92const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
94const RESERVED_FARM_INFO: u64 = 1024 * 1024;
96const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
97
98#[derive(Debug)]
100#[must_use = "Lock file must be kept around or as long as farm is used"]
101pub struct SingleDiskFarmInfoLock {
102 _file: File,
103}
104
105#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")]
108pub enum SingleDiskFarmInfo {
109 #[serde(rename_all = "camelCase")]
111 V0 {
112 id: FarmId,
114 #[serde(with = "hex")]
116 genesis_hash: [u8; 32],
117 public_key: PublicKey,
119 pieces_in_sector: u16,
121 allocated_space: u64,
123 },
124}
125
126impl SingleDiskFarmInfo {
127 const FILE_NAME: &'static str = "single_disk_farm.json";
128
129 pub fn new(
131 id: FarmId,
132 genesis_hash: [u8; 32],
133 public_key: PublicKey,
134 pieces_in_sector: u16,
135 allocated_space: u64,
136 ) -> Self {
137 Self::V0 {
138 id,
139 genesis_hash,
140 public_key,
141 pieces_in_sector,
142 allocated_space,
143 }
144 }
145
146 pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
149 let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
150 Ok(bytes) => bytes,
151 Err(error) => {
152 return if error.kind() == io::ErrorKind::NotFound {
153 Ok(None)
154 } else {
155 Err(error)
156 };
157 }
158 };
159
160 serde_json::from_slice(&bytes)
161 .map(Some)
162 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
163 }
164
165 pub fn store_to(
169 &self,
170 directory: &Path,
171 lock: bool,
172 ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
173 let mut file = OpenOptions::new()
174 .write(true)
175 .create(true)
176 .truncate(false)
177 .open(directory.join(Self::FILE_NAME))?;
178 if lock {
179 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
180 }
181 file.set_len(0)?;
182 file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
183
184 Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
185 }
186
187 pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
190 let file = File::open(directory.join(Self::FILE_NAME))?;
191 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
192
193 Ok(SingleDiskFarmInfoLock { _file: file })
194 }
195
196 pub fn id(&self) -> &FarmId {
198 let Self::V0 { id, .. } = self;
199 id
200 }
201
202 pub fn genesis_hash(&self) -> &[u8; 32] {
204 let Self::V0 { genesis_hash, .. } = self;
205 genesis_hash
206 }
207
208 pub fn public_key(&self) -> &PublicKey {
210 let Self::V0 { public_key, .. } = self;
211 public_key
212 }
213
214 pub fn pieces_in_sector(&self) -> u16 {
216 match self {
217 SingleDiskFarmInfo::V0 {
218 pieces_in_sector, ..
219 } => *pieces_in_sector,
220 }
221 }
222
223 pub fn allocated_space(&self) -> u64 {
225 match self {
226 SingleDiskFarmInfo::V0 {
227 allocated_space, ..
228 } => *allocated_space,
229 }
230 }
231}
232
233#[derive(Debug)]
235pub enum SingleDiskFarmSummary {
236 Found {
238 info: SingleDiskFarmInfo,
240 directory: PathBuf,
242 },
243 NotFound {
245 directory: PathBuf,
247 },
248 Error {
250 directory: PathBuf,
252 error: io::Error,
254 },
255}
256
257#[derive(Debug, Encode, Decode)]
258struct PlotMetadataHeader {
259 version: u8,
260 plotted_sector_count: SectorIndex,
261}
262
263impl PlotMetadataHeader {
264 #[inline]
265 fn encoded_size() -> usize {
266 let default = PlotMetadataHeader {
267 version: 0,
268 plotted_sector_count: 0,
269 };
270
271 default.encoded_size()
272 }
273}
274
275#[derive(Debug)]
277pub struct SingleDiskFarmOptions<'a, NC>
278where
279 NC: Clone,
280{
281 pub directory: PathBuf,
283 pub farmer_app_info: FarmerAppInfo,
285 pub allocated_space: u64,
287 pub max_pieces_in_sector: u16,
289 pub node_client: NC,
291 pub reward_address: PublicKey,
293 pub plotter: Arc<dyn Plotter + Send + Sync>,
295 pub kzg: Kzg,
297 pub erasure_coding: ErasureCoding,
299 pub cache_percentage: u8,
301 pub farming_thread_pool_size: usize,
304 pub plotting_delay: Option<oneshot::Receiver<()>>,
307 pub global_mutex: Arc<AsyncMutex<()>>,
311 pub max_plotting_sectors_per_farm: NonZeroUsize,
313 pub disable_farm_locking: bool,
315 pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
317 pub registry: Option<&'a Mutex<&'a mut Registry>>,
319 pub create: bool,
321}
322
323#[derive(Debug, Error)]
325pub enum SingleDiskFarmError {
326 #[error("Failed to open or create identity: {0}")]
328 FailedToOpenIdentity(#[from] IdentityError),
329 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
331 LikelyAlreadyInUse(io::Error),
332 #[error("Single disk farm I/O error: {0}")]
334 Io(#[from] io::Error),
335 #[error("Failed to spawn task for blocking thread: {0}")]
337 TokioJoinError(#[from] task::JoinError),
338 #[error("Piece cache error: {0}")]
340 PieceCacheError(#[from] DiskPieceCacheError),
341 #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
343 CantPreallocateMetadataFile(io::Error),
344 #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
346 CantPreallocatePlotFile(io::Error),
347 #[error(
349 "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
350 created, it is not possible to use farm on a different chain"
351 )]
352 WrongChain {
353 id: FarmId,
355 correct_chain: String,
358 wrong_chain: String,
360 },
361 #[error(
363 "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
364 farm was created, something went wrong, likely due to manual edits"
365 )]
366 IdentityMismatch {
367 id: FarmId,
369 correct_public_key: PublicKey,
371 wrong_public_key: PublicKey,
373 },
374 #[error(
376 "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
377 {initialized_with}"
378 )]
379 InvalidPiecesInSector {
380 id: FarmId,
382 max_supported: u16,
384 initialized_with: u16,
386 },
387 #[error("Failed to decode metadata header: {0}")]
389 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
390 #[error("Unexpected metadata version {0}")]
392 UnexpectedMetadataVersion(u8),
393 #[error(
395 "Allocated space is not enough for one sector. \
396 The lowest acceptable value for allocated space is {min_space} bytes, \
397 provided {allocated_space} bytes."
398 )]
399 InsufficientAllocatedSpace {
400 min_space: u64,
402 allocated_space: u64,
404 },
405 #[error(
407 "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
408 supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
409 instead."
410 )]
411 FarmTooLarge {
412 allocated_space: u64,
414 allocated_sectors: u64,
416 max_space: u64,
418 max_sectors: u16,
420 },
421 #[error("Failed to create thread pool: {0}")]
423 FailedToCreateThreadPool(ThreadPoolBuildError),
424}
425
426#[derive(Debug, Error)]
428pub enum SingleDiskFarmScrubError {
429 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
431 LikelyAlreadyInUse(io::Error),
432 #[error("Failed to file size of {file}: {error}")]
434 FailedToDetermineFileSize {
435 file: PathBuf,
437 error: io::Error,
439 },
440 #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
442 FailedToReadBytes {
443 file: PathBuf,
445 size: u64,
447 offset: u64,
449 error: io::Error,
451 },
452 #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
454 FailedToWriteBytes {
455 file: PathBuf,
457 size: u64,
459 offset: u64,
461 error: io::Error,
463 },
464 #[error("Farm info file does not exist at {file}")]
466 FarmInfoFileDoesNotExist {
467 file: PathBuf,
469 },
470 #[error("Farm info at {file} can't be opened: {error}")]
472 FarmInfoCantBeOpened {
473 file: PathBuf,
475 error: io::Error,
477 },
478 #[error("Identity file does not exist at {file}")]
480 IdentityFileDoesNotExist {
481 file: PathBuf,
483 },
484 #[error("Identity at {file} can't be opened: {error}")]
486 IdentityCantBeOpened {
487 file: PathBuf,
489 error: IdentityError,
491 },
492 #[error(
494 "Identity public key {identity} doesn't match public key in the disk farm info {info}"
495 )]
496 PublicKeyMismatch {
497 identity: PublicKey,
499 info: PublicKey,
501 },
502 #[error("Metadata file does not exist at {file}")]
504 MetadataFileDoesNotExist {
505 file: PathBuf,
507 },
508 #[error("Metadata at {file} can't be opened: {error}")]
510 MetadataCantBeOpened {
511 file: PathBuf,
513 error: io::Error,
515 },
516 #[error(
518 "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
519 is {size}"
520 )]
521 MetadataFileTooSmall {
522 file: PathBuf,
524 reserved_size: u64,
526 size: u64,
528 },
529 #[error("Failed to decode metadata header: {0}")]
531 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
532 #[error("Unexpected metadata version {0}")]
534 UnexpectedMetadataVersion(u8),
535 #[error("Cache at {file} can't be opened: {error}")]
537 CacheCantBeOpened {
538 file: PathBuf,
540 error: io::Error,
542 },
543}
544
545#[derive(Debug, Error)]
547pub enum BackgroundTaskError {
548 #[error(transparent)]
550 Plotting(#[from] PlottingError),
551 #[error(transparent)]
553 Farming(#[from] FarmingError),
554 #[error(transparent)]
556 RewardSigning(#[from] anyhow::Error),
557 #[error("Background task {task} panicked")]
559 BackgroundTaskPanicked {
560 task: String,
562 },
563}
564
565type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
566
567#[derive(Debug, Copy, Clone)]
569pub enum ScrubTarget {
570 All,
572 Metadata,
574 Plot,
576 Cache,
578}
579
580impl fmt::Display for ScrubTarget {
581 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
582 match self {
583 Self::All => f.write_str("all"),
584 Self::Metadata => f.write_str("metadata"),
585 Self::Plot => f.write_str("plot"),
586 Self::Cache => f.write_str("cache"),
587 }
588 }
589}
590
591impl FromStr for ScrubTarget {
592 type Err = String;
593
594 fn from_str(s: &str) -> Result<Self, Self::Err> {
595 match s {
596 "all" => Ok(Self::All),
597 "metadata" => Ok(Self::Metadata),
598 "plot" => Ok(Self::Plot),
599 "cache" => Ok(Self::Cache),
600 s => Err(format!("Can't parse {s} as `ScrubTarget`")),
601 }
602 }
603}
604
605impl ScrubTarget {
606 fn metadata(&self) -> bool {
607 match self {
608 Self::All | Self::Metadata | Self::Plot => true,
609 Self::Cache => false,
610 }
611 }
612
613 fn plot(&self) -> bool {
614 match self {
615 Self::All | Self::Plot => true,
616 Self::Metadata | Self::Cache => false,
617 }
618 }
619
620 fn cache(&self) -> bool {
621 match self {
622 Self::All | Self::Cache => true,
623 Self::Metadata | Self::Plot => false,
624 }
625 }
626}
627
628struct AllocatedSpaceDistribution {
629 piece_cache_file_size: u64,
630 piece_cache_capacity: u32,
631 plot_file_size: u64,
632 target_sector_count: u16,
633 metadata_file_size: u64,
634}
635
636impl AllocatedSpaceDistribution {
637 fn new(
638 allocated_space: u64,
639 sector_size: u64,
640 cache_percentage: u8,
641 sector_metadata_size: u64,
642 ) -> Result<Self, SingleDiskFarmError> {
643 let single_sector_overhead = sector_size + sector_metadata_size;
644 let fixed_space_usage = RESERVED_PLOT_METADATA
646 + RESERVED_FARM_INFO
647 + Identity::file_size() as u64
648 + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
649 let target_sector_count = {
651 let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
652 / 100
653 * (100 - u64::from(cache_percentage));
654 (potentially_plottable_space - DISK_SECTOR_SIZE as u64) / single_sector_overhead
657 };
658
659 if target_sector_count == 0 {
660 let mut single_plot_with_cache_space =
661 single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
662 if single_plot_with_cache_space - single_sector_overhead
665 < DiskPieceCache::element_size() as u64
666 {
667 single_plot_with_cache_space =
668 single_sector_overhead + DiskPieceCache::element_size() as u64;
669 }
670
671 return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
672 min_space: fixed_space_usage + single_plot_with_cache_space,
673 allocated_space,
674 });
675 }
676 let plot_file_size = target_sector_count * sector_size;
677 let plot_file_size =
679 plot_file_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
680
681 let piece_cache_capacity = if cache_percentage > 0 {
683 let cache_space = allocated_space
684 - fixed_space_usage
685 - plot_file_size
686 - (sector_metadata_size * target_sector_count);
687 (cache_space / u64::from(DiskPieceCache::element_size())) as u32
688 } else {
689 0
690 };
691 let target_sector_count = match SectorIndex::try_from(target_sector_count) {
692 Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
693 target_sector_count
694 }
695 _ => {
696 let max_sectors = SectorIndex::MAX - 1;
699 return Err(SingleDiskFarmError::FarmTooLarge {
700 allocated_space: target_sector_count * sector_size,
701 allocated_sectors: target_sector_count,
702 max_space: max_sectors as u64 * sector_size,
703 max_sectors,
704 });
705 }
706 };
707
708 Ok(Self {
709 piece_cache_file_size: u64::from(piece_cache_capacity)
710 * u64::from(DiskPieceCache::element_size()),
711 piece_cache_capacity,
712 plot_file_size,
713 target_sector_count,
714 metadata_file_size: RESERVED_PLOT_METADATA
715 + sector_metadata_size * u64::from(target_sector_count),
716 })
717 }
718}
719
720type Handler<A> = Bag<HandlerFn<A>, A>;
721
722#[derive(Default, Debug)]
723struct Handlers {
724 sector_update: Handler<(SectorIndex, SectorUpdate)>,
725 farming_notification: Handler<FarmingNotification>,
726 solution: Handler<SolutionResponse>,
727}
728
729struct SingleDiskFarmInit {
730 identity: Identity,
731 single_disk_farm_info: SingleDiskFarmInfo,
732 single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
733 plot_file: Arc<DirectIoFile>,
734 metadata_file: DirectIoFile,
735 metadata_header: PlotMetadataHeader,
736 target_sector_count: u16,
737 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
738 piece_cache_capacity: u32,
739 plot_cache: DiskPlotCache,
740}
741
742#[derive(Debug)]
747#[must_use = "Plot does not function properly unless run() method is called"]
748pub struct SingleDiskFarm {
749 farmer_protocol_info: FarmerProtocolInfo,
750 single_disk_farm_info: SingleDiskFarmInfo,
751 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
753 pieces_in_sector: u16,
754 total_sectors_count: SectorIndex,
755 span: Span,
756 tasks: FuturesUnordered<BackgroundTask>,
757 handlers: Arc<Handlers>,
758 piece_cache: SingleDiskPieceCache,
759 plot_cache: DiskPlotCache,
760 piece_reader: DiskPieceReader,
761 start_sender: Option<broadcast::Sender<()>>,
763 stop_sender: Option<broadcast::Sender<()>>,
765 _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
766}
767
768impl Drop for SingleDiskFarm {
769 #[inline]
770 fn drop(&mut self) {
771 self.piece_reader.close_all_readers();
772 self.start_sender.take();
774 self.stop_sender.take();
776 }
777}
778
779#[async_trait(?Send)]
780impl Farm for SingleDiskFarm {
781 fn id(&self) -> &FarmId {
782 self.id()
783 }
784
785 fn total_sectors_count(&self) -> SectorIndex {
786 self.total_sectors_count
787 }
788
789 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
790 Arc::new(self.plotted_sectors())
791 }
792
793 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
794 Arc::new(self.piece_reader())
795 }
796
797 fn on_sector_update(
798 &self,
799 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
800 ) -> Box<dyn farm::HandlerId> {
801 Box::new(self.on_sector_update(callback))
802 }
803
804 fn on_farming_notification(
805 &self,
806 callback: HandlerFn<FarmingNotification>,
807 ) -> Box<dyn farm::HandlerId> {
808 Box::new(self.on_farming_notification(callback))
809 }
810
811 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
812 Box::new(self.on_solution(callback))
813 }
814
815 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
816 Box::pin((*self).run())
817 }
818}
819
820impl SingleDiskFarm {
821 pub const PLOT_FILE: &'static str = "plot.bin";
823 pub const METADATA_FILE: &'static str = "metadata.bin";
825 const SUPPORTED_PLOT_VERSION: u8 = 0;
826
827 pub async fn new<NC, PosTable>(
829 options: SingleDiskFarmOptions<'_, NC>,
830 farm_index: usize,
831 ) -> Result<Self, SingleDiskFarmError>
832 where
833 NC: NodeClient + Clone,
834 PosTable: Table,
835 {
836 let span = Span::current();
837
838 let SingleDiskFarmOptions {
839 directory,
840 farmer_app_info,
841 allocated_space,
842 max_pieces_in_sector,
843 node_client,
844 reward_address,
845 plotter,
846 kzg,
847 erasure_coding,
848 cache_percentage,
849 farming_thread_pool_size,
850 plotting_delay,
851 global_mutex,
852 max_plotting_sectors_per_farm,
853 disable_farm_locking,
854 read_sector_record_chunks_mode,
855 registry,
856 create,
857 } = options;
858
859 let single_disk_farm_init_fut = task::spawn_blocking({
860 let directory = directory.clone();
861 let farmer_app_info = farmer_app_info.clone();
862 let span = span.clone();
863
864 move || {
865 let _span_guard = span.enter();
866 Self::init(
867 &directory,
868 &farmer_app_info,
869 allocated_space,
870 max_pieces_in_sector,
871 cache_percentage,
872 disable_farm_locking,
873 create,
874 )
875 }
876 });
877
878 let single_disk_farm_init =
879 AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
880
881 let SingleDiskFarmInit {
882 identity,
883 single_disk_farm_info,
884 single_disk_farm_info_lock,
885 plot_file,
886 metadata_file,
887 metadata_header,
888 target_sector_count,
889 sectors_metadata,
890 piece_cache_capacity,
891 plot_cache,
892 } = single_disk_farm_init;
893
894 let piece_cache = {
895 let FarmId::Ulid(id) = *single_disk_farm_info.id();
897 let id = PieceCacheId::Ulid(id);
898
899 SingleDiskPieceCache::new(
900 id,
901 if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
902 Some(task::block_in_place(|| {
903 if let Some(registry) = registry {
904 DiskPieceCache::open(
905 &directory,
906 piece_cache_capacity,
907 Some(id),
908 Some(*registry.lock()),
909 )
910 } else {
911 DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
912 }
913 })?)
914 } else {
915 None
916 },
917 )
918 };
919
920 let public_key = *single_disk_farm_info.public_key();
921 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
922 let sector_size = sector_size(pieces_in_sector);
923
924 let metrics = registry.map(|registry| {
925 Arc::new(SingleDiskFarmMetrics::new(
926 *registry.lock(),
927 single_disk_farm_info.id(),
928 target_sector_count,
929 sectors_metadata.read_blocking().len() as SectorIndex,
930 ))
931 });
932
933 let (error_sender, error_receiver) = oneshot::channel();
934 let error_sender = Arc::new(Mutex::new(Some(error_sender)));
935
936 let tasks = FuturesUnordered::<BackgroundTask>::new();
937
938 tasks.push(Box::pin(async move {
939 if let Ok(error) = error_receiver.await {
940 return Err(error);
941 }
942
943 Ok(())
944 }));
945
946 let handlers = Arc::<Handlers>::default();
947 let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
948 let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
949 let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
950 let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
951 let sectors_indices_left_to_plot =
953 metadata_header.plotted_sector_count..target_sector_count;
954
955 let farming_thread_pool = ThreadPoolBuilder::new()
956 .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}"))
957 .num_threads(farming_thread_pool_size)
958 .spawn_handler(tokio_rayon_spawn_handler())
959 .build()
960 .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
961 let farming_plot_fut = task::spawn_blocking(|| {
962 farming_thread_pool
963 .install(move || {
964 RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
965 DirectIoFile::open(path)
966 })
967 })
968 .map(|farming_plot| (farming_plot, farming_thread_pool))
969 });
970
971 let (farming_plot, farming_thread_pool) =
972 AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
973
974 let plotting_join_handle = task::spawn_blocking({
975 let sectors_metadata = Arc::clone(§ors_metadata);
976 let handlers = Arc::clone(&handlers);
977 let sectors_being_modified = Arc::clone(§ors_being_modified);
978 let node_client = node_client.clone();
979 let plot_file = Arc::clone(&plot_file);
980 let error_sender = Arc::clone(&error_sender);
981 let span = span.clone();
982 let global_mutex = Arc::clone(&global_mutex);
983 let metrics = metrics.clone();
984
985 move || {
986 let _span_guard = span.enter();
987
988 let plotting_options = PlottingOptions {
989 metadata_header,
990 sectors_metadata: §ors_metadata,
991 sectors_being_modified: §ors_being_modified,
992 sectors_to_plot_receiver,
993 sector_plotting_options: SectorPlottingOptions {
994 public_key,
995 node_client: &node_client,
996 pieces_in_sector,
997 sector_size,
998 plot_file,
999 metadata_file: Arc::new(metadata_file),
1000 handlers: &handlers,
1001 global_mutex: &global_mutex,
1002 plotter,
1003 metrics,
1004 },
1005 max_plotting_sectors_per_farm,
1006 };
1007
1008 let plotting_fut = async {
1009 if start_receiver.recv().await.is_err() {
1010 return Ok(());
1012 }
1013
1014 if let Some(plotting_delay) = plotting_delay {
1015 if plotting_delay.await.is_err() {
1016 return Ok(());
1018 }
1019 }
1020
1021 plotting(plotting_options).await
1022 };
1023
1024 Handle::current().block_on(async {
1025 select! {
1026 plotting_result = plotting_fut.fuse() => {
1027 if let Err(error) = plotting_result
1028 && let Some(error_sender) = error_sender.lock().take()
1029 && let Err(error) = error_sender.send(error.into())
1030 {
1031 error!(
1032 %error,
1033 "Plotting failed to send error to background task"
1034 );
1035 }
1036 }
1037 _ = stop_receiver.recv().fuse() => {
1038 }
1040 }
1041 });
1042 }
1043 });
1044 let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1045
1046 tasks.push(Box::pin(async move {
1047 plotting_join_handle.await.map_err(|_error| {
1049 BackgroundTaskError::BackgroundTaskPanicked {
1050 task: format!("plotting-{farm_index}"),
1051 }
1052 })
1053 }));
1054
1055 let plotting_scheduler_options = PlottingSchedulerOptions {
1056 public_key_hash: public_key.hash(),
1057 sectors_indices_left_to_plot,
1058 target_sector_count,
1059 last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1060 min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1061 node_client: node_client.clone(),
1062 handlers: Arc::clone(&handlers),
1063 sectors_metadata: Arc::clone(§ors_metadata),
1064 sectors_to_plot_sender,
1065 new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1066 metrics: metrics.clone(),
1067 };
1068 tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1069
1070 let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1071
1072 tasks.push(Box::pin({
1073 let node_client = node_client.clone();
1074 let metrics = metrics.clone();
1075
1076 async move {
1077 slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1078 .await
1079 .map_err(BackgroundTaskError::Farming)
1080 }
1081 }));
1082
1083 let farming_join_handle = task::spawn_blocking({
1084 let erasure_coding = erasure_coding.clone();
1085 let handlers = Arc::clone(&handlers);
1086 let sectors_being_modified = Arc::clone(§ors_being_modified);
1087 let sectors_metadata = Arc::clone(§ors_metadata);
1088 let mut start_receiver = start_sender.subscribe();
1089 let mut stop_receiver = stop_sender.subscribe();
1090 let node_client = node_client.clone();
1091 let span = span.clone();
1092 let global_mutex = Arc::clone(&global_mutex);
1093
1094 move || {
1095 let _span_guard = span.enter();
1096
1097 let farming_fut = async move {
1098 if start_receiver.recv().await.is_err() {
1099 return Ok(());
1101 }
1102
1103 let plot_audit = PlotAudit::new(&farming_plot);
1104
1105 let farming_options = FarmingOptions {
1106 public_key,
1107 reward_address,
1108 node_client,
1109 plot_audit,
1110 sectors_metadata,
1111 kzg,
1112 erasure_coding,
1113 handlers,
1114 sectors_being_modified,
1115 slot_info_notifications: slot_info_forwarder_receiver,
1116 thread_pool: farming_thread_pool,
1117 read_sector_record_chunks_mode,
1118 global_mutex,
1119 metrics,
1120 };
1121 farming::<PosTable, _, _>(farming_options).await
1122 };
1123
1124 Handle::current().block_on(async {
1125 select! {
1126 farming_result = farming_fut.fuse() => {
1127 if let Err(error) = farming_result
1128 && let Some(error_sender) = error_sender.lock().take()
1129 && let Err(error) = error_sender.send(error.into())
1130 {
1131 error!(
1132 %error,
1133 "Farming failed to send error to background task",
1134 );
1135 }
1136 }
1137 _ = stop_receiver.recv().fuse() => {
1138 }
1140 }
1141 });
1142 }
1143 });
1144 let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1145
1146 tasks.push(Box::pin(async move {
1147 farming_join_handle.await.map_err(|_error| {
1149 BackgroundTaskError::BackgroundTaskPanicked {
1150 task: format!("farming-{farm_index}"),
1151 }
1152 })
1153 }));
1154
1155 let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1156 public_key,
1157 pieces_in_sector,
1158 plot_file,
1159 Arc::clone(§ors_metadata),
1160 erasure_coding,
1161 sectors_being_modified,
1162 read_sector_record_chunks_mode,
1163 global_mutex,
1164 );
1165
1166 let reading_join_handle = task::spawn_blocking({
1167 let mut stop_receiver = stop_sender.subscribe();
1168 let reading_fut = reading_fut.instrument(span.clone());
1169
1170 move || {
1171 Handle::current().block_on(async {
1172 select! {
1173 _ = reading_fut.fuse() => {
1174 }
1176 _ = stop_receiver.recv().fuse() => {
1177 }
1179 }
1180 });
1181 }
1182 });
1183
1184 let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1185
1186 tasks.push(Box::pin(async move {
1187 reading_join_handle.await.map_err(|_error| {
1189 BackgroundTaskError::BackgroundTaskPanicked {
1190 task: format!("reading-{farm_index}"),
1191 }
1192 })
1193 }));
1194
1195 tasks.push(Box::pin(async move {
1196 match reward_signing(node_client, identity).await {
1197 Ok(reward_signing_fut) => {
1198 reward_signing_fut.await;
1199 }
1200 Err(error) => {
1201 return Err(BackgroundTaskError::RewardSigning(anyhow::anyhow!(
1202 "Failed to subscribe to reward signing notifications: {error}"
1203 )));
1204 }
1205 }
1206
1207 Ok(())
1208 }));
1209
1210 let farm = Self {
1211 farmer_protocol_info: farmer_app_info.protocol_info,
1212 single_disk_farm_info,
1213 sectors_metadata,
1214 pieces_in_sector,
1215 total_sectors_count: target_sector_count,
1216 span,
1217 tasks,
1218 handlers,
1219 piece_cache,
1220 plot_cache,
1221 piece_reader,
1222 start_sender: Some(start_sender),
1223 stop_sender: Some(stop_sender),
1224 _single_disk_farm_info_lock: single_disk_farm_info_lock,
1225 };
1226 Ok(farm)
1227 }
1228
1229 fn init(
1230 directory: &PathBuf,
1231 farmer_app_info: &FarmerAppInfo,
1232 allocated_space: u64,
1233 max_pieces_in_sector: u16,
1234 cache_percentage: u8,
1235 disable_farm_locking: bool,
1236 create: bool,
1237 ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1238 fs::create_dir_all(directory)?;
1239
1240 let identity = if create {
1241 Identity::open_or_create(directory)?
1242 } else {
1243 Identity::open(directory)?.ok_or_else(|| {
1244 IdentityError::Io(io::Error::new(
1245 io::ErrorKind::NotFound,
1246 "Farm does not exist and creation was explicitly disabled",
1247 ))
1248 })?
1249 };
1250 let public_key = identity.public_key().to_bytes().into();
1251
1252 let (single_disk_farm_info, single_disk_farm_info_lock) =
1253 match SingleDiskFarmInfo::load_from(directory)? {
1254 Some(mut single_disk_farm_info) => {
1255 if &farmer_app_info.genesis_hash != single_disk_farm_info.genesis_hash() {
1256 return Err(SingleDiskFarmError::WrongChain {
1257 id: *single_disk_farm_info.id(),
1258 correct_chain: hex::encode(single_disk_farm_info.genesis_hash()),
1259 wrong_chain: hex::encode(farmer_app_info.genesis_hash),
1260 });
1261 }
1262
1263 if &public_key != single_disk_farm_info.public_key() {
1264 return Err(SingleDiskFarmError::IdentityMismatch {
1265 id: *single_disk_farm_info.id(),
1266 correct_public_key: *single_disk_farm_info.public_key(),
1267 wrong_public_key: public_key,
1268 });
1269 }
1270
1271 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1272
1273 if max_pieces_in_sector < pieces_in_sector {
1274 return Err(SingleDiskFarmError::InvalidPiecesInSector {
1275 id: *single_disk_farm_info.id(),
1276 max_supported: max_pieces_in_sector,
1277 initialized_with: pieces_in_sector,
1278 });
1279 }
1280
1281 if max_pieces_in_sector > pieces_in_sector {
1282 info!(
1283 pieces_in_sector,
1284 max_pieces_in_sector,
1285 "Farm initialized with smaller number of pieces in sector, farm needs \
1286 to be re-created for increase"
1287 );
1288 }
1289
1290 let mut single_disk_farm_info_lock = None;
1291
1292 if allocated_space != single_disk_farm_info.allocated_space() {
1293 info!(
1294 old_space = %bytesize::to_string(single_disk_farm_info.allocated_space(), true),
1295 new_space = %bytesize::to_string(allocated_space, true),
1296 "Farm size has changed"
1297 );
1298
1299 let new_allocated_space = allocated_space;
1300 match &mut single_disk_farm_info {
1301 SingleDiskFarmInfo::V0 {
1302 allocated_space, ..
1303 } => {
1304 *allocated_space = new_allocated_space;
1305 }
1306 }
1307
1308 single_disk_farm_info_lock =
1309 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1310 } else if !disable_farm_locking {
1311 single_disk_farm_info_lock = Some(
1312 SingleDiskFarmInfo::try_lock(directory)
1313 .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1314 );
1315 }
1316
1317 (single_disk_farm_info, single_disk_farm_info_lock)
1318 }
1319 None => {
1320 let single_disk_farm_info = SingleDiskFarmInfo::new(
1321 FarmId::new(),
1322 farmer_app_info.genesis_hash,
1323 public_key,
1324 max_pieces_in_sector,
1325 allocated_space,
1326 );
1327
1328 let single_disk_farm_info_lock =
1329 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1330
1331 (single_disk_farm_info, single_disk_farm_info_lock)
1332 }
1333 };
1334
1335 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1336 let sector_size = sector_size(pieces_in_sector) as u64;
1337 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1338 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1339 allocated_space,
1340 sector_size,
1341 cache_percentage,
1342 sector_metadata_size as u64,
1343 )?;
1344 let target_sector_count = allocated_space_distribution.target_sector_count;
1345
1346 let metadata_file_path = directory.join(Self::METADATA_FILE);
1347 let metadata_file = DirectIoFile::open(&metadata_file_path)?;
1348
1349 let metadata_size = metadata_file.size()?;
1350 let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1351 let expected_metadata_size =
1353 expected_metadata_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
1354 let metadata_header = if metadata_size == 0 {
1355 let metadata_header = PlotMetadataHeader {
1356 version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1357 plotted_sector_count: 0,
1358 };
1359
1360 metadata_file
1361 .preallocate(expected_metadata_size)
1362 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1363 metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1364
1365 metadata_header
1366 } else {
1367 if metadata_size != expected_metadata_size {
1368 metadata_file
1371 .preallocate(expected_metadata_size)
1372 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1373 metadata_file.set_len(expected_metadata_size)?;
1375 }
1376
1377 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1378 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1379
1380 let mut metadata_header =
1381 PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1382 .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1383
1384 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1385 return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1386 metadata_header.version,
1387 ));
1388 }
1389
1390 if metadata_header.plotted_sector_count > target_sector_count {
1391 metadata_header.plotted_sector_count = target_sector_count;
1392 metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1393 }
1394
1395 metadata_header
1396 };
1397
1398 let sectors_metadata = {
1399 let mut sectors_metadata =
1400 Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1401
1402 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1403 for sector_index in 0..metadata_header.plotted_sector_count {
1404 let sector_offset =
1405 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1406 metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1407
1408 let sector_metadata =
1409 match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1410 Ok(sector_metadata) => sector_metadata,
1411 Err(error) => {
1412 warn!(
1413 path = %metadata_file_path.display(),
1414 %error,
1415 %sector_index,
1416 "Failed to decode sector metadata, replacing with dummy expired \
1417 sector metadata"
1418 );
1419
1420 let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1421 sector_index,
1422 pieces_in_sector,
1423 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1424 history_size: HistorySize::from(SegmentIndex::ZERO),
1425 });
1426 metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1427
1428 dummy_sector
1429 }
1430 };
1431 sectors_metadata.push(sector_metadata);
1432 }
1433
1434 Arc::new(AsyncRwLock::new(sectors_metadata))
1435 };
1436
1437 let plot_file = DirectIoFile::open(directory.join(Self::PLOT_FILE))?;
1438
1439 if plot_file.size()? != allocated_space_distribution.plot_file_size {
1440 plot_file
1443 .preallocate(allocated_space_distribution.plot_file_size)
1444 .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1445 plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1447 }
1448
1449 let plot_file = Arc::new(plot_file);
1450
1451 let plot_cache = DiskPlotCache::new(
1452 &plot_file,
1453 §ors_metadata,
1454 target_sector_count,
1455 sector_size,
1456 );
1457
1458 Ok(SingleDiskFarmInit {
1459 identity,
1460 single_disk_farm_info,
1461 single_disk_farm_info_lock,
1462 plot_file,
1463 metadata_file,
1464 metadata_header,
1465 target_sector_count,
1466 sectors_metadata,
1467 piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1468 plot_cache,
1469 })
1470 }
1471
1472 pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1474 let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1475 Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1476 Ok(None) => {
1477 return SingleDiskFarmSummary::NotFound { directory };
1478 }
1479 Err(error) => {
1480 return SingleDiskFarmSummary::Error { directory, error };
1481 }
1482 };
1483
1484 SingleDiskFarmSummary::Found {
1485 info: single_disk_farm_info,
1486 directory,
1487 }
1488 }
1489
1490 pub fn effective_disk_usage(
1496 directory: &Path,
1497 cache_percentage: u8,
1498 ) -> Result<u64, SingleDiskFarmError> {
1499 let mut effective_disk_usage;
1500 match SingleDiskFarmInfo::load_from(directory)? {
1501 Some(single_disk_farm_info) => {
1502 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1503 single_disk_farm_info.allocated_space(),
1504 sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1505 cache_percentage,
1506 SectorMetadataChecksummed::encoded_size() as u64,
1507 )?;
1508
1509 effective_disk_usage = single_disk_farm_info.allocated_space();
1510 effective_disk_usage -= Identity::file_size() as u64;
1511 effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1512 effective_disk_usage -= allocated_space_distribution.plot_file_size;
1513 effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1514 }
1515 None => {
1516 effective_disk_usage = 0;
1518 }
1519 };
1520
1521 if Identity::open(directory)?.is_some() {
1522 effective_disk_usage += Identity::file_size() as u64;
1523 }
1524
1525 match OpenOptions::new()
1526 .read(true)
1527 .open(directory.join(Self::METADATA_FILE))
1528 {
1529 Ok(metadata_file) => {
1530 effective_disk_usage += metadata_file.size()?;
1531 }
1532 Err(error) => {
1533 if error.kind() == io::ErrorKind::NotFound {
1534 } else {
1536 return Err(error.into());
1537 }
1538 }
1539 };
1540
1541 match OpenOptions::new()
1542 .read(true)
1543 .open(directory.join(Self::PLOT_FILE))
1544 {
1545 Ok(plot_file) => {
1546 effective_disk_usage += plot_file.size()?;
1547 }
1548 Err(error) => {
1549 if error.kind() == io::ErrorKind::NotFound {
1550 } else {
1552 return Err(error.into());
1553 }
1554 }
1555 };
1556
1557 match OpenOptions::new()
1558 .read(true)
1559 .open(directory.join(DiskPieceCache::FILE_NAME))
1560 {
1561 Ok(piece_cache) => {
1562 effective_disk_usage += piece_cache.size()?;
1563 }
1564 Err(error) => {
1565 if error.kind() == io::ErrorKind::NotFound {
1566 } else {
1568 return Err(error.into());
1569 }
1570 }
1571 };
1572
1573 Ok(effective_disk_usage)
1574 }
1575
1576 pub fn read_all_sectors_metadata(
1578 directory: &Path,
1579 ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1580 let metadata_file = DirectIoFile::open(directory.join(Self::METADATA_FILE))?;
1581
1582 let metadata_size = metadata_file.size()?;
1583 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1584
1585 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1586 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1587
1588 let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1589 .map_err(|error| {
1590 io::Error::other(format!("Failed to decode metadata header: {}", error))
1591 })?;
1592
1593 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1594 return Err(io::Error::other(format!(
1595 "Unsupported metadata version {}",
1596 metadata_header.version
1597 )));
1598 }
1599
1600 let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1601 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1602 );
1603
1604 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1605 for sector_index in 0..metadata_header.plotted_sector_count {
1606 metadata_file.read_exact_at(
1607 &mut sector_metadata_bytes,
1608 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1609 )?;
1610 sectors_metadata.push(
1611 SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1612 |error| {
1613 io::Error::other(format!("Failed to decode sector metadata: {}", error))
1614 },
1615 )?,
1616 );
1617 }
1618
1619 Ok(sectors_metadata)
1620 }
1621
1622 pub fn id(&self) -> &FarmId {
1624 self.single_disk_farm_info.id()
1625 }
1626
1627 pub fn info(&self) -> &SingleDiskFarmInfo {
1629 &self.single_disk_farm_info
1630 }
1631
1632 pub fn total_sectors_count(&self) -> SectorIndex {
1634 self.total_sectors_count
1635 }
1636
1637 pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1639 SingleDiskPlottedSectors {
1640 public_key: *self.single_disk_farm_info.public_key(),
1641 pieces_in_sector: self.pieces_in_sector,
1642 farmer_protocol_info: self.farmer_protocol_info,
1643 sectors_metadata: Arc::clone(&self.sectors_metadata),
1644 }
1645 }
1646
1647 pub fn piece_cache(&self) -> SingleDiskPieceCache {
1649 self.piece_cache.clone()
1650 }
1651
1652 pub fn plot_cache(&self) -> DiskPlotCache {
1654 self.plot_cache.clone()
1655 }
1656
1657 pub fn piece_reader(&self) -> DiskPieceReader {
1659 self.piece_reader.clone()
1660 }
1661
1662 pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1664 self.handlers.sector_update.add(callback)
1665 }
1666
1667 pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1669 self.handlers.farming_notification.add(callback)
1670 }
1671
1672 pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1674 self.handlers.solution.add(callback)
1675 }
1676
1677 pub async fn run(mut self) -> anyhow::Result<()> {
1679 if let Some(start_sender) = self.start_sender.take() {
1680 let _ = start_sender.send(());
1682 }
1683
1684 while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1685 result?;
1686 }
1687
1688 Ok(())
1689 }
1690
1691 pub fn wipe(directory: &Path) -> io::Result<()> {
1693 let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1694 match SingleDiskFarmInfo::load_from(directory) {
1695 Ok(Some(single_disk_farm_info)) => {
1696 info!("Found single disk farm {}", single_disk_farm_info.id());
1697 }
1698 Ok(None) => {
1699 return Err(io::Error::new(
1700 io::ErrorKind::NotFound,
1701 format!(
1702 "Single disk farm info not found at {}",
1703 single_disk_info_info_path.display()
1704 ),
1705 ));
1706 }
1707 Err(error) => {
1708 warn!("Found unknown single disk farm: {}", error);
1709 }
1710 }
1711
1712 {
1713 let plot = directory.join(Self::PLOT_FILE);
1714 if plot.exists() {
1715 info!("Deleting plot file at {}", plot.display());
1716 fs::remove_file(plot)?;
1717 }
1718 }
1719 {
1720 let metadata = directory.join(Self::METADATA_FILE);
1721 if metadata.exists() {
1722 info!("Deleting metadata file at {}", metadata.display());
1723 fs::remove_file(metadata)?;
1724 }
1725 }
1726 {
1729 let identity = directory.join("identity.bin");
1730 if identity.exists() {
1731 info!("Deleting identity file at {}", identity.display());
1732 fs::remove_file(identity)?;
1733 }
1734 }
1735
1736 DiskPieceCache::wipe(directory)?;
1737
1738 info!(
1739 "Deleting info file at {}",
1740 single_disk_info_info_path.display()
1741 );
1742 fs::remove_file(single_disk_info_info_path)
1743 }
1744
1745 pub fn scrub(
1748 directory: &Path,
1749 disable_farm_locking: bool,
1750 target: ScrubTarget,
1751 dry_run: bool,
1752 ) -> Result<(), SingleDiskFarmScrubError> {
1753 let span = Span::current();
1754
1755 if dry_run {
1756 info!("Dry run is used, no changes will be written to disk");
1757 }
1758
1759 if target.metadata() || target.plot() {
1760 let info = {
1761 let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1762 info!(path = %file.display(), "Checking info file");
1763
1764 match SingleDiskFarmInfo::load_from(directory) {
1765 Ok(Some(info)) => info,
1766 Ok(None) => {
1767 return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1768 }
1769 Err(error) => {
1770 return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1771 }
1772 }
1773 };
1774
1775 let _single_disk_farm_info_lock = if disable_farm_locking {
1776 None
1777 } else {
1778 Some(
1779 SingleDiskFarmInfo::try_lock(directory)
1780 .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1781 )
1782 };
1783
1784 let identity = {
1785 let file = directory.join(Identity::FILE_NAME);
1786 info!(path = %file.display(), "Checking identity file");
1787
1788 match Identity::open(directory) {
1789 Ok(Some(identity)) => identity,
1790 Ok(None) => {
1791 return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1792 }
1793 Err(error) => {
1794 return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1795 }
1796 }
1797 };
1798
1799 if PublicKey::from(identity.public.to_bytes()) != *info.public_key() {
1800 return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1801 identity: PublicKey::from(identity.public.to_bytes()),
1802 info: *info.public_key(),
1803 });
1804 }
1805
1806 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1807
1808 let metadata_file_path = directory.join(Self::METADATA_FILE);
1809 let (metadata_file, mut metadata_header) = {
1810 info!(path = %metadata_file_path.display(), "Checking metadata file");
1811
1812 let metadata_file = match OpenOptions::new()
1813 .read(true)
1814 .write(!dry_run)
1815 .open(&metadata_file_path)
1816 {
1817 Ok(metadata_file) => metadata_file,
1818 Err(error) => {
1819 return Err(if error.kind() == io::ErrorKind::NotFound {
1820 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1821 file: metadata_file_path,
1822 }
1823 } else {
1824 SingleDiskFarmScrubError::MetadataCantBeOpened {
1825 file: metadata_file_path,
1826 error,
1827 }
1828 });
1829 }
1830 };
1831
1832 let _ = metadata_file.advise_sequential_access();
1834
1835 let metadata_size = match metadata_file.size() {
1836 Ok(metadata_size) => metadata_size,
1837 Err(error) => {
1838 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1839 file: metadata_file_path,
1840 error,
1841 });
1842 }
1843 };
1844
1845 if metadata_size < RESERVED_PLOT_METADATA {
1846 return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1847 file: metadata_file_path,
1848 reserved_size: RESERVED_PLOT_METADATA,
1849 size: metadata_size,
1850 });
1851 }
1852
1853 let mut metadata_header = {
1854 let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1855
1856 if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1857 return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1858 file: metadata_file_path,
1859 size: RESERVED_PLOT_METADATA,
1860 offset: 0,
1861 error,
1862 });
1863 }
1864
1865 PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1866 .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1867 };
1868
1869 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1870 return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1871 metadata_header.version,
1872 ));
1873 }
1874
1875 let plotted_sector_count = metadata_header.plotted_sector_count;
1876
1877 let expected_metadata_size = RESERVED_PLOT_METADATA
1878 + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1879
1880 if metadata_size < expected_metadata_size {
1881 warn!(
1882 %metadata_size,
1883 %expected_metadata_size,
1884 "Metadata file size is smaller than expected, shrinking number of plotted \
1885 sectors to correct value"
1886 );
1887
1888 metadata_header.plotted_sector_count =
1889 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1890 as SectorIndex;
1891 let metadata_header_bytes = metadata_header.encode();
1892
1893 if !dry_run {
1894 if let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0) {
1895 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1896 file: metadata_file_path,
1897 size: metadata_header_bytes.len() as u64,
1898 offset: 0,
1899 error,
1900 });
1901 }
1902 }
1903 }
1904
1905 (metadata_file, metadata_header)
1906 };
1907
1908 let pieces_in_sector = info.pieces_in_sector();
1909 let sector_size = sector_size(pieces_in_sector) as u64;
1910
1911 let plot_file_path = directory.join(Self::PLOT_FILE);
1912 let plot_file = {
1913 let plot_file_path = directory.join(Self::PLOT_FILE);
1914 info!(path = %plot_file_path.display(), "Checking plot file");
1915
1916 let plot_file = match OpenOptions::new()
1917 .read(true)
1918 .write(!dry_run)
1919 .open(&plot_file_path)
1920 {
1921 Ok(plot_file) => plot_file,
1922 Err(error) => {
1923 return Err(if error.kind() == io::ErrorKind::NotFound {
1924 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1925 file: plot_file_path,
1926 }
1927 } else {
1928 SingleDiskFarmScrubError::MetadataCantBeOpened {
1929 file: plot_file_path,
1930 error,
1931 }
1932 });
1933 }
1934 };
1935
1936 let _ = plot_file.advise_sequential_access();
1938
1939 let plot_size = match plot_file.size() {
1940 Ok(metadata_size) => metadata_size,
1941 Err(error) => {
1942 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1943 file: plot_file_path,
1944 error,
1945 });
1946 }
1947 };
1948
1949 let min_expected_plot_size =
1950 u64::from(metadata_header.plotted_sector_count) * sector_size;
1951 if plot_size < min_expected_plot_size {
1952 warn!(
1953 %plot_size,
1954 %min_expected_plot_size,
1955 "Plot file size is smaller than expected, shrinking number of plotted \
1956 sectors to correct value"
1957 );
1958
1959 metadata_header.plotted_sector_count = (plot_size / sector_size) as SectorIndex;
1960 let metadata_header_bytes = metadata_header.encode();
1961
1962 if !dry_run {
1963 if let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0) {
1964 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1965 file: plot_file_path,
1966 size: metadata_header_bytes.len() as u64,
1967 offset: 0,
1968 error,
1969 });
1970 }
1971 }
1972 }
1973
1974 plot_file
1975 };
1976
1977 let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1978
1979 info!("Checking sectors and corresponding metadata");
1980 (0..metadata_header.plotted_sector_count)
1981 .into_par_iter()
1982 .map_init(
1983 || vec![0u8; Record::SIZE],
1984 |scratch_buffer, sector_index| {
1985 let _span_guard = span.enter();
1986
1987 let offset = RESERVED_PLOT_METADATA
1988 + u64::from(sector_index) * sector_metadata_size as u64;
1989 if let Err(error) = metadata_file
1990 .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
1991 {
1992 warn!(
1993 path = %metadata_file_path.display(),
1994 %error,
1995 %offset,
1996 size = %sector_metadata_size,
1997 %sector_index,
1998 "Failed to read sector metadata, replacing with dummy expired \
1999 sector metadata"
2000 );
2001
2002 if !dry_run {
2003 write_dummy_sector_metadata(
2004 &metadata_file,
2005 &metadata_file_path,
2006 sector_index,
2007 pieces_in_sector,
2008 )?;
2009 }
2010 return Ok(());
2011 }
2012
2013 let sector_metadata = match SectorMetadataChecksummed::decode(
2014 &mut &scratch_buffer[..sector_metadata_size],
2015 ) {
2016 Ok(sector_metadata) => sector_metadata,
2017 Err(error) => {
2018 warn!(
2019 path = %metadata_file_path.display(),
2020 %error,
2021 %sector_index,
2022 "Failed to decode sector metadata, replacing with dummy \
2023 expired sector metadata"
2024 );
2025
2026 if !dry_run {
2027 write_dummy_sector_metadata(
2028 &metadata_file,
2029 &metadata_file_path,
2030 sector_index,
2031 pieces_in_sector,
2032 )?;
2033 }
2034 return Ok(());
2035 }
2036 };
2037
2038 if sector_metadata.sector_index != sector_index {
2039 warn!(
2040 path = %metadata_file_path.display(),
2041 %sector_index,
2042 found_sector_index = sector_metadata.sector_index,
2043 "Sector index mismatch, replacing with dummy expired sector \
2044 metadata"
2045 );
2046
2047 if !dry_run {
2048 write_dummy_sector_metadata(
2049 &metadata_file,
2050 &metadata_file_path,
2051 sector_index,
2052 pieces_in_sector,
2053 )?;
2054 }
2055 return Ok(());
2056 }
2057
2058 if sector_metadata.pieces_in_sector != pieces_in_sector {
2059 warn!(
2060 path = %metadata_file_path.display(),
2061 %sector_index,
2062 %pieces_in_sector,
2063 found_pieces_in_sector = sector_metadata.pieces_in_sector,
2064 "Pieces in sector mismatch, replacing with dummy expired sector \
2065 metadata"
2066 );
2067
2068 if !dry_run {
2069 write_dummy_sector_metadata(
2070 &metadata_file,
2071 &metadata_file_path,
2072 sector_index,
2073 pieces_in_sector,
2074 )?;
2075 }
2076 return Ok(());
2077 }
2078
2079 if target.plot() {
2080 let mut hasher = blake3::Hasher::new();
2081 for offset_in_sector in
2083 sector_bytes_range.clone().step_by(scratch_buffer.len())
2084 {
2085 let offset =
2086 u64::from(sector_index) * sector_size + offset_in_sector as u64;
2087 let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2088 .min(sector_bytes_range.end)
2089 - offset_in_sector;
2090
2091 let bytes = &mut scratch_buffer[..bytes_to_read];
2092
2093 if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2094 warn!(
2095 path = %plot_file_path.display(),
2096 %error,
2097 %sector_index,
2098 %offset,
2099 size = %bytes.len() as u64,
2100 "Failed to read sector bytes"
2101 );
2102
2103 continue;
2104 }
2105
2106 hasher.update(bytes);
2107 }
2108
2109 let actual_checksum = *hasher.finalize().as_bytes();
2110 let mut expected_checksum = [0; Blake3Hash::SIZE];
2111 {
2112 let offset = u64::from(sector_index) * sector_size
2113 + sector_bytes_range.end as u64;
2114 if let Err(error) =
2115 plot_file.read_exact_at(&mut expected_checksum, offset)
2116 {
2117 warn!(
2118 path = %plot_file_path.display(),
2119 %error,
2120 %sector_index,
2121 %offset,
2122 size = %expected_checksum.len() as u64,
2123 "Failed to read sector checksum bytes"
2124 );
2125 }
2126 }
2127
2128 if actual_checksum != expected_checksum {
2130 warn!(
2131 path = %plot_file_path.display(),
2132 %sector_index,
2133 actual_checksum = %hex::encode(actual_checksum),
2134 expected_checksum = %hex::encode(expected_checksum),
2135 "Plotted sector checksum mismatch, replacing with dummy \
2136 expired sector"
2137 );
2138
2139 if !dry_run {
2140 write_dummy_sector_metadata(
2141 &metadata_file,
2142 &metadata_file_path,
2143 sector_index,
2144 pieces_in_sector,
2145 )?;
2146 }
2147
2148 scratch_buffer.fill(0);
2149
2150 hasher.reset();
2151 for offset_in_sector in
2153 sector_bytes_range.clone().step_by(scratch_buffer.len())
2154 {
2155 let offset = u64::from(sector_index) * sector_size
2156 + offset_in_sector as u64;
2157 let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2158 .min(sector_bytes_range.end)
2159 - offset_in_sector;
2160 let bytes = &mut scratch_buffer[..bytes_to_write];
2161
2162 if !dry_run {
2163 if let Err(error) = plot_file.write_all_at(bytes, offset) {
2164 return Err(
2165 SingleDiskFarmScrubError::FailedToWriteBytes {
2166 file: plot_file_path.clone(),
2167 size: scratch_buffer.len() as u64,
2168 offset,
2169 error,
2170 },
2171 );
2172 }
2173 }
2174
2175 hasher.update(bytes);
2176 }
2177 {
2179 let checksum = *hasher.finalize().as_bytes();
2180 let offset = u64::from(sector_index) * sector_size
2181 + sector_bytes_range.end as u64;
2182 if !dry_run {
2183 if let Err(error) =
2184 plot_file.write_all_at(&checksum, offset)
2185 {
2186 return Err(
2187 SingleDiskFarmScrubError::FailedToWriteBytes {
2188 file: plot_file_path.clone(),
2189 size: checksum.len() as u64,
2190 offset,
2191 error,
2192 },
2193 );
2194 }
2195 }
2196 }
2197
2198 return Ok(());
2199 }
2200 }
2201
2202 trace!(%sector_index, "Sector is in good shape");
2203
2204 Ok(())
2205 },
2206 )
2207 .try_for_each({
2208 let span = &span;
2209 let checked_sectors = AtomicUsize::new(0);
2210
2211 move |result| {
2212 let _span_guard = span.enter();
2213
2214 let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2215 if checked_sectors > 1 && checked_sectors % 10 == 0 {
2216 info!(
2217 "Checked {}/{} sectors",
2218 checked_sectors, metadata_header.plotted_sector_count
2219 );
2220 }
2221
2222 result
2223 }
2224 })?;
2225 }
2226
2227 if target.cache() {
2228 Self::scrub_cache(directory, dry_run)?;
2229 }
2230
2231 info!("Farm check completed");
2232
2233 Ok(())
2234 }
2235
2236 fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2237 let span = Span::current();
2238
2239 let file = directory.join(DiskPieceCache::FILE_NAME);
2240 info!(path = %file.display(), "Checking cache file");
2241
2242 let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2243 Ok(plot_file) => plot_file,
2244 Err(error) => {
2245 return if error.kind() == io::ErrorKind::NotFound {
2246 warn!(
2247 file = %file.display(),
2248 "Cache file does not exist, this is expected in farming cluster"
2249 );
2250 Ok(())
2251 } else {
2252 Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2253 };
2254 }
2255 };
2256
2257 let _ = cache_file.advise_sequential_access();
2259
2260 let cache_size = match cache_file.size() {
2261 Ok(cache_size) => cache_size,
2262 Err(error) => {
2263 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2264 }
2265 };
2266
2267 let element_size = DiskPieceCache::element_size();
2268 let number_of_cached_elements = cache_size / u64::from(element_size);
2269 let dummy_element = vec![0; element_size as usize];
2270 (0..number_of_cached_elements)
2271 .into_par_iter()
2272 .map_with(vec![0; element_size as usize], |element, cache_offset| {
2273 let _span_guard = span.enter();
2274
2275 let offset = cache_offset * u64::from(element_size);
2276 if let Err(error) = cache_file.read_exact_at(element, offset) {
2277 warn!(
2278 path = %file.display(),
2279 %cache_offset,
2280 size = %element.len() as u64,
2281 %offset,
2282 %error,
2283 "Failed to read cached piece, replacing with dummy element"
2284 );
2285
2286 if !dry_run {
2287 if let Err(error) = cache_file.write_all_at(&dummy_element, offset) {
2288 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2289 file: file.clone(),
2290 size: u64::from(element_size),
2291 offset,
2292 error,
2293 });
2294 }
2295 }
2296
2297 return Ok(());
2298 }
2299
2300 let (index_and_piece_bytes, expected_checksum) =
2301 element.split_at(element_size as usize - Blake3Hash::SIZE);
2302 let actual_checksum = blake3_hash(index_and_piece_bytes);
2303 if *actual_checksum != *expected_checksum && element != &dummy_element {
2304 warn!(
2305 %cache_offset,
2306 actual_checksum = %hex::encode(actual_checksum),
2307 expected_checksum = %hex::encode(expected_checksum),
2308 "Cached piece checksum mismatch, replacing with dummy element"
2309 );
2310
2311 if !dry_run {
2312 if let Err(error) = cache_file.write_all_at(&dummy_element, offset) {
2313 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2314 file: file.clone(),
2315 size: u64::from(element_size),
2316 offset,
2317 error,
2318 });
2319 }
2320 }
2321
2322 return Ok(());
2323 }
2324
2325 Ok(())
2326 })
2327 .try_for_each({
2328 let span = &span;
2329 let checked_elements = AtomicUsize::new(0);
2330
2331 move |result| {
2332 let _span_guard = span.enter();
2333
2334 let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2335 if checked_elements > 1 && checked_elements % 1000 == 0 {
2336 info!(
2337 "Checked {}/{} cache elements",
2338 checked_elements, number_of_cached_elements
2339 );
2340 }
2341
2342 result
2343 }
2344 })?;
2345
2346 Ok(())
2347 }
2348}
2349
2350fn write_dummy_sector_metadata(
2351 metadata_file: &File,
2352 metadata_file_path: &Path,
2353 sector_index: SectorIndex,
2354 pieces_in_sector: u16,
2355) -> Result<(), SingleDiskFarmScrubError> {
2356 let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2357 sector_index,
2358 pieces_in_sector,
2359 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2360 history_size: HistorySize::from(SegmentIndex::ZERO),
2361 })
2362 .encode();
2363 let sector_offset = RESERVED_PLOT_METADATA
2364 + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2365 metadata_file
2366 .write_all_at(&dummy_sector_bytes, sector_offset)
2367 .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2368 file: metadata_file_path.to_path_buf(),
2369 size: dummy_sector_bytes.len() as u64,
2370 offset: sector_offset,
2371 error,
2372 })
2373}