subspace_farmer/
single_disk_farm.rs

1//! Primary [`Farm`] implementation that deals with hardware directly
2//!
3//! Single disk farm is an abstraction that contains an identity, associated plot with metadata and
4//! a small piece cache. It fully manages farming and plotting process, including listening to node
5//! notifications, producing solutions and singing rewards.
6
7pub mod direct_io_file;
8pub mod farming;
9pub mod identity;
10mod metrics;
11pub mod piece_cache;
12pub mod piece_reader;
13pub mod plot_cache;
14mod plotted_sectors;
15mod plotting;
16mod reward_signing;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20    Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21    PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE};
26use crate::single_disk_farm::farming::rayon_files::RayonFiles;
27use crate::single_disk_farm::farming::{
28    farming, slot_notification_forwarder, FarmingOptions, PlotAudit,
29};
30use crate::single_disk_farm::identity::{Identity, IdentityError};
31use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
32use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
33use crate::single_disk_farm::piece_reader::DiskPieceReader;
34use crate::single_disk_farm::plot_cache::DiskPlotCache;
35use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
36pub use crate::single_disk_farm::plotting::PlottingError;
37use crate::single_disk_farm::plotting::{
38    plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions,
39};
40use crate::single_disk_farm::reward_signing::reward_signing;
41use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop};
42use crate::{farm, KNOWN_PEERS_CACHE_SIZE};
43use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
44use async_trait::async_trait;
45use event_listener_primitives::{Bag, HandlerId};
46use futures::channel::{mpsc, oneshot};
47use futures::stream::FuturesUnordered;
48use futures::{select, FutureExt, StreamExt};
49use parity_scale_codec::{Decode, Encode};
50use parking_lot::Mutex;
51use prometheus_client::registry::Registry;
52use rayon::prelude::*;
53use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
54use serde::{Deserialize, Serialize};
55use static_assertions::const_assert;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::future::Future;
59use std::io::Write;
60use std::num::{NonZeroU32, NonZeroUsize};
61use std::path::{Path, PathBuf};
62use std::pin::Pin;
63use std::str::FromStr;
64use std::sync::atomic::{AtomicUsize, Ordering};
65use std::sync::Arc;
66use std::time::Duration;
67use std::{fmt, fs, io, mem};
68use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
69use subspace_core_primitives::pieces::Record;
70use subspace_core_primitives::sectors::SectorIndex;
71use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
72use subspace_core_primitives::PublicKey;
73use subspace_erasure_coding::ErasureCoding;
74use subspace_farmer_components::file_ext::FileExt;
75use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
76use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
77use subspace_farmer_components::FarmerProtocolInfo;
78use subspace_kzg::Kzg;
79use subspace_networking::KnownPeersManager;
80use subspace_proof_of_space::Table;
81use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
82use thiserror::Error;
83use tokio::runtime::Handle;
84use tokio::sync::broadcast;
85use tokio::task;
86use tracing::{error, info, trace, warn, Instrument, Span};
87
88// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
89// usize depending on chain parameters
90const_assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
91
92/// Reserve 1M of space for plot metadata (for potential future expansion)
93const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
94/// Reserve 1M of space for farm info (for potential future expansion)
95const RESERVED_FARM_INFO: u64 = 1024 * 1024;
96const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
97
98/// Exclusive lock for single disk farm info file, ensuring no concurrent edits by cooperating processes is done
99#[derive(Debug)]
100#[must_use = "Lock file must be kept around or as long as farm is used"]
101pub struct SingleDiskFarmInfoLock {
102    _file: File,
103}
104
105/// Important information about the contents of the `SingleDiskFarm`
106#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")]
108pub enum SingleDiskFarmInfo {
109    /// V0 of the info
110    #[serde(rename_all = "camelCase")]
111    V0 {
112        /// ID of the farm
113        id: FarmId,
114        /// Genesis hash of the chain used for farm creation
115        #[serde(with = "hex")]
116        genesis_hash: [u8; 32],
117        /// Public key of identity used for farm creation
118        public_key: PublicKey,
119        /// How many pieces does one sector contain.
120        pieces_in_sector: u16,
121        /// How much space in bytes is allocated for this farm
122        allocated_space: u64,
123    },
124}
125
126impl SingleDiskFarmInfo {
127    const FILE_NAME: &'static str = "single_disk_farm.json";
128
129    /// Create new instance
130    pub fn new(
131        id: FarmId,
132        genesis_hash: [u8; 32],
133        public_key: PublicKey,
134        pieces_in_sector: u16,
135        allocated_space: u64,
136    ) -> Self {
137        Self::V0 {
138            id,
139            genesis_hash,
140            public_key,
141            pieces_in_sector,
142            allocated_space,
143        }
144    }
145
146    /// Load `SingleDiskFarm` from path is supposed to be stored, `None` means no info file was
147    /// found, happens during first start.
148    pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
149        let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
150            Ok(bytes) => bytes,
151            Err(error) => {
152                return if error.kind() == io::ErrorKind::NotFound {
153                    Ok(None)
154                } else {
155                    Err(error)
156                };
157            }
158        };
159
160        serde_json::from_slice(&bytes)
161            .map(Some)
162            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
163    }
164
165    /// Store `SingleDiskFarm` info to path, so it can be loaded again upon restart.
166    ///
167    /// Can optionally return a lock.
168    pub fn store_to(
169        &self,
170        directory: &Path,
171        lock: bool,
172    ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
173        let mut file = OpenOptions::new()
174            .write(true)
175            .create(true)
176            .truncate(false)
177            .open(directory.join(Self::FILE_NAME))?;
178        if lock {
179            fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
180        }
181        file.set_len(0)?;
182        file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
183
184        Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
185    }
186
187    /// Try to acquire exclusive lock on the single disk farm info file, ensuring no concurrent edits by cooperating
188    /// processes is done
189    pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
190        let file = File::open(directory.join(Self::FILE_NAME))?;
191        fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
192
193        Ok(SingleDiskFarmInfoLock { _file: file })
194    }
195
196    /// ID of the farm
197    pub fn id(&self) -> &FarmId {
198        let Self::V0 { id, .. } = self;
199        id
200    }
201
202    /// Genesis hash of the chain used for farm creation
203    pub fn genesis_hash(&self) -> &[u8; 32] {
204        let Self::V0 { genesis_hash, .. } = self;
205        genesis_hash
206    }
207
208    /// Public key of identity used for farm creation
209    pub fn public_key(&self) -> &PublicKey {
210        let Self::V0 { public_key, .. } = self;
211        public_key
212    }
213
214    /// How many pieces does one sector contain.
215    pub fn pieces_in_sector(&self) -> u16 {
216        match self {
217            SingleDiskFarmInfo::V0 {
218                pieces_in_sector, ..
219            } => *pieces_in_sector,
220        }
221    }
222
223    /// How much space in bytes is allocated for this farm
224    pub fn allocated_space(&self) -> u64 {
225        match self {
226            SingleDiskFarmInfo::V0 {
227                allocated_space, ..
228            } => *allocated_space,
229        }
230    }
231}
232
233/// Summary of single disk farm for presentational purposes
234#[derive(Debug)]
235pub enum SingleDiskFarmSummary {
236    /// Farm was found and read successfully
237    Found {
238        /// Farm info
239        info: SingleDiskFarmInfo,
240        /// Path to directory where farm is stored.
241        directory: PathBuf,
242    },
243    /// Farm was not found
244    NotFound {
245        /// Path to directory where farm is stored.
246        directory: PathBuf,
247    },
248    /// Failed to open farm
249    Error {
250        /// Path to directory where farm is stored.
251        directory: PathBuf,
252        /// Error itself
253        error: io::Error,
254    },
255}
256
257#[derive(Debug, Encode, Decode)]
258struct PlotMetadataHeader {
259    version: u8,
260    plotted_sector_count: SectorIndex,
261}
262
263impl PlotMetadataHeader {
264    #[inline]
265    fn encoded_size() -> usize {
266        let default = PlotMetadataHeader {
267            version: 0,
268            plotted_sector_count: 0,
269        };
270
271        default.encoded_size()
272    }
273}
274
275/// Options used to open single disk farm
276#[derive(Debug)]
277pub struct SingleDiskFarmOptions<'a, NC>
278where
279    NC: Clone,
280{
281    /// Path to directory where farm is stored.
282    pub directory: PathBuf,
283    /// Information necessary for farmer application
284    pub farmer_app_info: FarmerAppInfo,
285    /// How much space in bytes was allocated
286    pub allocated_space: u64,
287    /// How many pieces one sector is supposed to contain (max)
288    pub max_pieces_in_sector: u16,
289    /// RPC client connected to Subspace node
290    pub node_client: NC,
291    /// Address where farming rewards should go
292    pub reward_address: PublicKey,
293    /// Plotter
294    pub plotter: Arc<dyn Plotter + Send + Sync>,
295    /// Kzg instance to use.
296    pub kzg: Kzg,
297    /// Erasure coding instance to use.
298    pub erasure_coding: ErasureCoding,
299    /// Percentage of allocated space dedicated for caching purposes
300    pub cache_percentage: u8,
301    /// Thread pool size used for farming (mostly for blocking I/O, but also for some
302    /// compute-intensive operations during proving)
303    pub farming_thread_pool_size: usize,
304    /// Notification for plotter to start, can be used to delay plotting until some initialization
305    /// has happened externally
306    pub plotting_delay: Option<oneshot::Receiver<()>>,
307    /// Global mutex that can restrict concurrency of resource-intensive operations and make sure
308    /// that those operations that are very sensitive (like proving) have all the resources
309    /// available to them for the highest probability of success
310    pub global_mutex: Arc<AsyncMutex<()>>,
311    /// How many sectors a will be plotted concurrently per farm
312    pub max_plotting_sectors_per_farm: NonZeroUsize,
313    /// Disable farm locking, for example if file system doesn't support it
314    pub disable_farm_locking: bool,
315    /// Mode to use for reading of sector record chunks instead
316    pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
317    /// Prometheus registry
318    pub registry: Option<&'a Mutex<&'a mut Registry>>,
319    /// Whether to create a farm if it doesn't yet exist
320    pub create: bool,
321}
322
323/// Errors happening when trying to create/open single disk farm
324#[derive(Debug, Error)]
325pub enum SingleDiskFarmError {
326    /// Failed to open or create identity
327    #[error("Failed to open or create identity: {0}")]
328    FailedToOpenIdentity(#[from] IdentityError),
329    /// Farm is likely already in use, make sure no other farmer is using it
330    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
331    LikelyAlreadyInUse(io::Error),
332    /// I/O error occurred
333    #[error("Single disk farm I/O error: {0}")]
334    Io(#[from] io::Error),
335    /// Failed to spawn task for blocking thread
336    #[error("Failed to spawn task for blocking thread: {0}")]
337    TokioJoinError(#[from] task::JoinError),
338    /// Piece cache error
339    #[error("Piece cache error: {0}")]
340    PieceCacheError(#[from] DiskPieceCacheError),
341    /// Can't preallocate metadata file, probably not enough space on disk
342    #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
343    CantPreallocateMetadataFile(io::Error),
344    /// Can't preallocate plot file, probably not enough space on disk
345    #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
346    CantPreallocatePlotFile(io::Error),
347    /// Wrong chain (genesis hash)
348    #[error(
349        "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
350        created, it is not possible to use farm on a different chain"
351    )]
352    WrongChain {
353        /// Farm ID
354        id: FarmId,
355        /// Hex-encoded genesis hash during farm creation
356        // TODO: Wrapper type with `Display` impl for genesis hash
357        correct_chain: String,
358        /// Hex-encoded current genesis hash
359        wrong_chain: String,
360    },
361    /// Public key in identity doesn't match metadata
362    #[error(
363        "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
364        farm was created, something went wrong, likely due to manual edits"
365    )]
366    IdentityMismatch {
367        /// Farm ID
368        id: FarmId,
369        /// Public key used during farm creation
370        correct_public_key: PublicKey,
371        /// Current public key
372        wrong_public_key: PublicKey,
373    },
374    /// Invalid number pieces in sector
375    #[error(
376        "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
377        {initialized_with}"
378    )]
379    InvalidPiecesInSector {
380        /// Farm ID
381        id: FarmId,
382        /// Max supported pieces in sector
383        max_supported: u16,
384        /// Number of pieces in sector farm is initialized with
385        initialized_with: u16,
386    },
387    /// Failed to decode metadata header
388    #[error("Failed to decode metadata header: {0}")]
389    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
390    /// Unexpected metadata version
391    #[error("Unexpected metadata version {0}")]
392    UnexpectedMetadataVersion(u8),
393    /// Allocated space is not enough for one sector
394    #[error(
395        "Allocated space is not enough for one sector. \
396        The lowest acceptable value for allocated space is {min_space} bytes, \
397        provided {allocated_space} bytes."
398    )]
399    InsufficientAllocatedSpace {
400        /// Minimal allocated space
401        min_space: u64,
402        /// Current allocated space
403        allocated_space: u64,
404    },
405    /// Farm is too large
406    #[error(
407        "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
408        supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
409        instead."
410    )]
411    FarmTooLarge {
412        /// Allocated space
413        allocated_space: u64,
414        /// Allocated space in sectors
415        allocated_sectors: u64,
416        /// Max supported allocated space
417        max_space: u64,
418        /// Max supported allocated space in sectors
419        max_sectors: u16,
420    },
421    /// Failed to create thread pool
422    #[error("Failed to create thread pool: {0}")]
423    FailedToCreateThreadPool(ThreadPoolBuildError),
424}
425
426/// Errors happening during scrubbing
427#[derive(Debug, Error)]
428pub enum SingleDiskFarmScrubError {
429    /// Farm is likely already in use, make sure no other farmer is using it
430    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
431    LikelyAlreadyInUse(io::Error),
432    /// Failed to determine file size
433    #[error("Failed to file size of {file}: {error}")]
434    FailedToDetermineFileSize {
435        /// Affected file
436        file: PathBuf,
437        /// Low-level error
438        error: io::Error,
439    },
440    /// Failed to read bytes from file
441    #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
442    FailedToReadBytes {
443        /// Affected file
444        file: PathBuf,
445        /// Number of bytes to read
446        size: u64,
447        /// Offset in the file
448        offset: u64,
449        /// Low-level error
450        error: io::Error,
451    },
452    /// Failed to write bytes from file
453    #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
454    FailedToWriteBytes {
455        /// Affected file
456        file: PathBuf,
457        /// Number of bytes to read
458        size: u64,
459        /// Offset in the file
460        offset: u64,
461        /// Low-level error
462        error: io::Error,
463    },
464    /// Farm info file does not exist
465    #[error("Farm info file does not exist at {file}")]
466    FarmInfoFileDoesNotExist {
467        /// Info file
468        file: PathBuf,
469    },
470    /// Farm info can't be opened
471    #[error("Farm info at {file} can't be opened: {error}")]
472    FarmInfoCantBeOpened {
473        /// Info file
474        file: PathBuf,
475        /// Low-level error
476        error: io::Error,
477    },
478    /// Identity file does not exist
479    #[error("Identity file does not exist at {file}")]
480    IdentityFileDoesNotExist {
481        /// Identity file
482        file: PathBuf,
483    },
484    /// Identity can't be opened
485    #[error("Identity at {file} can't be opened: {error}")]
486    IdentityCantBeOpened {
487        /// Identity file
488        file: PathBuf,
489        /// Low-level error
490        error: IdentityError,
491    },
492    /// Identity public key doesn't match public key in the disk farm info
493    #[error(
494        "Identity public key {identity} doesn't match public key in the disk farm info {info}"
495    )]
496    PublicKeyMismatch {
497        /// Identity public key
498        identity: PublicKey,
499        /// Disk farm info public key
500        info: PublicKey,
501    },
502    /// Metadata file does not exist
503    #[error("Metadata file does not exist at {file}")]
504    MetadataFileDoesNotExist {
505        /// Metadata file
506        file: PathBuf,
507    },
508    /// Metadata can't be opened
509    #[error("Metadata at {file} can't be opened: {error}")]
510    MetadataCantBeOpened {
511        /// Metadata file
512        file: PathBuf,
513        /// Low-level error
514        error: io::Error,
515    },
516    /// Metadata file too small
517    #[error(
518        "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
519        is {size}"
520    )]
521    MetadataFileTooSmall {
522        /// Metadata file
523        file: PathBuf,
524        /// Reserved size
525        reserved_size: u64,
526        /// File size
527        size: u64,
528    },
529    /// Failed to decode metadata header
530    #[error("Failed to decode metadata header: {0}")]
531    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
532    /// Unexpected metadata version
533    #[error("Unexpected metadata version {0}")]
534    UnexpectedMetadataVersion(u8),
535    /// Cache can't be opened
536    #[error("Cache at {file} can't be opened: {error}")]
537    CacheCantBeOpened {
538        /// Cache file
539        file: PathBuf,
540        /// Low-level error
541        error: io::Error,
542    },
543}
544
545/// Errors that happen in background tasks
546#[derive(Debug, Error)]
547pub enum BackgroundTaskError {
548    /// Plotting error
549    #[error(transparent)]
550    Plotting(#[from] PlottingError),
551    /// Farming error
552    #[error(transparent)]
553    Farming(#[from] FarmingError),
554    /// Reward signing
555    #[error(transparent)]
556    RewardSigning(#[from] anyhow::Error),
557    /// Background task panicked
558    #[error("Background task {task} panicked")]
559    BackgroundTaskPanicked {
560        /// Name of the task
561        task: String,
562    },
563}
564
565type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
566
567/// Scrub target
568#[derive(Debug, Copy, Clone)]
569pub enum ScrubTarget {
570    /// Scrub everything
571    All,
572    /// Scrub just metadata
573    Metadata,
574    /// Scrub metadata and corresponding plot
575    Plot,
576    /// Only scrub cache
577    Cache,
578}
579
580impl fmt::Display for ScrubTarget {
581    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
582        match self {
583            Self::All => f.write_str("all"),
584            Self::Metadata => f.write_str("metadata"),
585            Self::Plot => f.write_str("plot"),
586            Self::Cache => f.write_str("cache"),
587        }
588    }
589}
590
591impl FromStr for ScrubTarget {
592    type Err = String;
593
594    fn from_str(s: &str) -> Result<Self, Self::Err> {
595        match s {
596            "all" => Ok(Self::All),
597            "metadata" => Ok(Self::Metadata),
598            "plot" => Ok(Self::Plot),
599            "cache" => Ok(Self::Cache),
600            s => Err(format!("Can't parse {s} as `ScrubTarget`")),
601        }
602    }
603}
604
605impl ScrubTarget {
606    fn metadata(&self) -> bool {
607        match self {
608            Self::All | Self::Metadata | Self::Plot => true,
609            Self::Cache => false,
610        }
611    }
612
613    fn plot(&self) -> bool {
614        match self {
615            Self::All | Self::Plot => true,
616            Self::Metadata | Self::Cache => false,
617        }
618    }
619
620    fn cache(&self) -> bool {
621        match self {
622            Self::All | Self::Cache => true,
623            Self::Metadata | Self::Plot => false,
624        }
625    }
626}
627
628struct AllocatedSpaceDistribution {
629    piece_cache_file_size: u64,
630    piece_cache_capacity: u32,
631    plot_file_size: u64,
632    target_sector_count: u16,
633    metadata_file_size: u64,
634}
635
636impl AllocatedSpaceDistribution {
637    fn new(
638        allocated_space: u64,
639        sector_size: u64,
640        cache_percentage: u8,
641        sector_metadata_size: u64,
642    ) -> Result<Self, SingleDiskFarmError> {
643        let single_sector_overhead = sector_size + sector_metadata_size;
644        // Fixed space usage regardless of plot size
645        let fixed_space_usage = RESERVED_PLOT_METADATA
646            + RESERVED_FARM_INFO
647            + Identity::file_size() as u64
648            + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
649        // Calculate how many sectors can fit
650        let target_sector_count = {
651            let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
652                / 100
653                * (100 - u64::from(cache_percentage));
654            // Do the rounding to make sure we have exactly as much space as fits whole number of
655            // sectors, account for disk sector size just in case
656            (potentially_plottable_space - DISK_SECTOR_SIZE as u64) / single_sector_overhead
657        };
658
659        if target_sector_count == 0 {
660            let mut single_plot_with_cache_space =
661                single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
662            // Cache must not be empty, ensure it contains at least one element even if
663            // percentage-wise it will use more space
664            if single_plot_with_cache_space - single_sector_overhead
665                < DiskPieceCache::element_size() as u64
666            {
667                single_plot_with_cache_space =
668                    single_sector_overhead + DiskPieceCache::element_size() as u64;
669            }
670
671            return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
672                min_space: fixed_space_usage + single_plot_with_cache_space,
673                allocated_space,
674            });
675        }
676        let plot_file_size = target_sector_count * sector_size;
677        // Align plot file size for disk sector size
678        let plot_file_size =
679            plot_file_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
680
681        // Remaining space will be used for caching purposes
682        let piece_cache_capacity = if cache_percentage > 0 {
683            let cache_space = allocated_space
684                - fixed_space_usage
685                - plot_file_size
686                - (sector_metadata_size * target_sector_count);
687            (cache_space / u64::from(DiskPieceCache::element_size())) as u32
688        } else {
689            0
690        };
691        let target_sector_count = match SectorIndex::try_from(target_sector_count) {
692            Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
693                target_sector_count
694            }
695            _ => {
696                // We use this for both count and index, hence index must not reach actual `MAX`
697                // (consensus doesn't care about this, just farmer implementation detail)
698                let max_sectors = SectorIndex::MAX - 1;
699                return Err(SingleDiskFarmError::FarmTooLarge {
700                    allocated_space: target_sector_count * sector_size,
701                    allocated_sectors: target_sector_count,
702                    max_space: max_sectors as u64 * sector_size,
703                    max_sectors,
704                });
705            }
706        };
707
708        Ok(Self {
709            piece_cache_file_size: u64::from(piece_cache_capacity)
710                * u64::from(DiskPieceCache::element_size()),
711            piece_cache_capacity,
712            plot_file_size,
713            target_sector_count,
714            metadata_file_size: RESERVED_PLOT_METADATA
715                + sector_metadata_size * u64::from(target_sector_count),
716        })
717    }
718}
719
720type Handler<A> = Bag<HandlerFn<A>, A>;
721
722#[derive(Default, Debug)]
723struct Handlers {
724    sector_update: Handler<(SectorIndex, SectorUpdate)>,
725    farming_notification: Handler<FarmingNotification>,
726    solution: Handler<SolutionResponse>,
727}
728
729struct SingleDiskFarmInit {
730    identity: Identity,
731    single_disk_farm_info: SingleDiskFarmInfo,
732    single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
733    plot_file: Arc<DirectIoFile>,
734    metadata_file: DirectIoFile,
735    metadata_header: PlotMetadataHeader,
736    target_sector_count: u16,
737    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
738    piece_cache_capacity: u32,
739    plot_cache: DiskPlotCache,
740}
741
742/// Single disk farm abstraction is a container for everything necessary to plot/farm with a single
743/// disk.
744///
745/// Farm starts operating during creation and doesn't stop until dropped (or error happens).
746#[derive(Debug)]
747#[must_use = "Plot does not function properly unless run() method is called"]
748pub struct SingleDiskFarm {
749    farmer_protocol_info: FarmerProtocolInfo,
750    single_disk_farm_info: SingleDiskFarmInfo,
751    /// Metadata of all sectors plotted so far
752    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
753    pieces_in_sector: u16,
754    total_sectors_count: SectorIndex,
755    span: Span,
756    tasks: FuturesUnordered<BackgroundTask>,
757    handlers: Arc<Handlers>,
758    piece_cache: SingleDiskPieceCache,
759    plot_cache: DiskPlotCache,
760    piece_reader: DiskPieceReader,
761    /// Sender that will be used to signal to background threads that they should start
762    start_sender: Option<broadcast::Sender<()>>,
763    /// Sender that will be used to signal to background threads that they must stop
764    stop_sender: Option<broadcast::Sender<()>>,
765    _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
766}
767
768impl Drop for SingleDiskFarm {
769    #[inline]
770    fn drop(&mut self) {
771        self.piece_reader.close_all_readers();
772        // Make background threads that are waiting to do something exit immediately
773        self.start_sender.take();
774        // Notify background tasks that they must stop
775        self.stop_sender.take();
776    }
777}
778
779#[async_trait(?Send)]
780impl Farm for SingleDiskFarm {
781    fn id(&self) -> &FarmId {
782        self.id()
783    }
784
785    fn total_sectors_count(&self) -> SectorIndex {
786        self.total_sectors_count
787    }
788
789    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
790        Arc::new(self.plotted_sectors())
791    }
792
793    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
794        Arc::new(self.piece_reader())
795    }
796
797    fn on_sector_update(
798        &self,
799        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
800    ) -> Box<dyn farm::HandlerId> {
801        Box::new(self.on_sector_update(callback))
802    }
803
804    fn on_farming_notification(
805        &self,
806        callback: HandlerFn<FarmingNotification>,
807    ) -> Box<dyn farm::HandlerId> {
808        Box::new(self.on_farming_notification(callback))
809    }
810
811    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
812        Box::new(self.on_solution(callback))
813    }
814
815    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
816        Box::pin((*self).run())
817    }
818}
819
820impl SingleDiskFarm {
821    /// Name of the plot file
822    pub const PLOT_FILE: &'static str = "plot.bin";
823    /// Name of the metadata file
824    pub const METADATA_FILE: &'static str = "metadata.bin";
825    const SUPPORTED_PLOT_VERSION: u8 = 0;
826
827    /// Create new single disk farm instance
828    pub async fn new<NC, PosTable>(
829        options: SingleDiskFarmOptions<'_, NC>,
830        farm_index: usize,
831    ) -> Result<Self, SingleDiskFarmError>
832    where
833        NC: NodeClient + Clone,
834        PosTable: Table,
835    {
836        let span = Span::current();
837
838        let SingleDiskFarmOptions {
839            directory,
840            farmer_app_info,
841            allocated_space,
842            max_pieces_in_sector,
843            node_client,
844            reward_address,
845            plotter,
846            kzg,
847            erasure_coding,
848            cache_percentage,
849            farming_thread_pool_size,
850            plotting_delay,
851            global_mutex,
852            max_plotting_sectors_per_farm,
853            disable_farm_locking,
854            read_sector_record_chunks_mode,
855            registry,
856            create,
857        } = options;
858
859        let single_disk_farm_init_fut = task::spawn_blocking({
860            let directory = directory.clone();
861            let farmer_app_info = farmer_app_info.clone();
862            let span = span.clone();
863
864            move || {
865                let _span_guard = span.enter();
866                Self::init(
867                    &directory,
868                    &farmer_app_info,
869                    allocated_space,
870                    max_pieces_in_sector,
871                    cache_percentage,
872                    disable_farm_locking,
873                    create,
874                )
875            }
876        });
877
878        let single_disk_farm_init =
879            AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
880
881        let SingleDiskFarmInit {
882            identity,
883            single_disk_farm_info,
884            single_disk_farm_info_lock,
885            plot_file,
886            metadata_file,
887            metadata_header,
888            target_sector_count,
889            sectors_metadata,
890            piece_cache_capacity,
891            plot_cache,
892        } = single_disk_farm_init;
893
894        let piece_cache = {
895            // Convert farm ID into cache ID for single disk farm
896            let FarmId::Ulid(id) = *single_disk_farm_info.id();
897            let id = PieceCacheId::Ulid(id);
898
899            SingleDiskPieceCache::new(
900                id,
901                if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
902                    Some(task::block_in_place(|| {
903                        if let Some(registry) = registry {
904                            DiskPieceCache::open(
905                                &directory,
906                                piece_cache_capacity,
907                                Some(id),
908                                Some(*registry.lock()),
909                            )
910                        } else {
911                            DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
912                        }
913                    })?)
914                } else {
915                    None
916                },
917            )
918        };
919
920        let public_key = *single_disk_farm_info.public_key();
921        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
922        let sector_size = sector_size(pieces_in_sector);
923
924        let metrics = registry.map(|registry| {
925            Arc::new(SingleDiskFarmMetrics::new(
926                *registry.lock(),
927                single_disk_farm_info.id(),
928                target_sector_count,
929                sectors_metadata.read_blocking().len() as SectorIndex,
930            ))
931        });
932
933        let (error_sender, error_receiver) = oneshot::channel();
934        let error_sender = Arc::new(Mutex::new(Some(error_sender)));
935
936        let tasks = FuturesUnordered::<BackgroundTask>::new();
937
938        tasks.push(Box::pin(async move {
939            if let Ok(error) = error_receiver.await {
940                return Err(error);
941            }
942
943            Ok(())
944        }));
945
946        let handlers = Arc::<Handlers>::default();
947        let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
948        let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
949        let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
950        let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
951        // Some sectors may already be plotted, skip them
952        let sectors_indices_left_to_plot =
953            metadata_header.plotted_sector_count..target_sector_count;
954
955        let farming_thread_pool = ThreadPoolBuilder::new()
956            .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}"))
957            .num_threads(farming_thread_pool_size)
958            .spawn_handler(tokio_rayon_spawn_handler())
959            .build()
960            .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
961        let farming_plot_fut = task::spawn_blocking(|| {
962            farming_thread_pool
963                .install(move || {
964                    RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
965                        DirectIoFile::open(path)
966                    })
967                })
968                .map(|farming_plot| (farming_plot, farming_thread_pool))
969        });
970
971        let (farming_plot, farming_thread_pool) =
972            AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
973
974        let plotting_join_handle = task::spawn_blocking({
975            let sectors_metadata = Arc::clone(&sectors_metadata);
976            let handlers = Arc::clone(&handlers);
977            let sectors_being_modified = Arc::clone(&sectors_being_modified);
978            let node_client = node_client.clone();
979            let plot_file = Arc::clone(&plot_file);
980            let error_sender = Arc::clone(&error_sender);
981            let span = span.clone();
982            let global_mutex = Arc::clone(&global_mutex);
983            let metrics = metrics.clone();
984
985            move || {
986                let _span_guard = span.enter();
987
988                let plotting_options = PlottingOptions {
989                    metadata_header,
990                    sectors_metadata: &sectors_metadata,
991                    sectors_being_modified: &sectors_being_modified,
992                    sectors_to_plot_receiver,
993                    sector_plotting_options: SectorPlottingOptions {
994                        public_key,
995                        node_client: &node_client,
996                        pieces_in_sector,
997                        sector_size,
998                        plot_file,
999                        metadata_file: Arc::new(metadata_file),
1000                        handlers: &handlers,
1001                        global_mutex: &global_mutex,
1002                        plotter,
1003                        metrics,
1004                    },
1005                    max_plotting_sectors_per_farm,
1006                };
1007
1008                let plotting_fut = async {
1009                    if start_receiver.recv().await.is_err() {
1010                        // Dropped before starting
1011                        return Ok(());
1012                    }
1013
1014                    if let Some(plotting_delay) = plotting_delay {
1015                        if plotting_delay.await.is_err() {
1016                            // Dropped before resolving
1017                            return Ok(());
1018                        }
1019                    }
1020
1021                    plotting(plotting_options).await
1022                };
1023
1024                Handle::current().block_on(async {
1025                    select! {
1026                        plotting_result = plotting_fut.fuse() => {
1027                            if let Err(error) = plotting_result
1028                                && let Some(error_sender) = error_sender.lock().take()
1029                                && let Err(error) = error_sender.send(error.into())
1030                            {
1031                                error!(
1032                                    %error,
1033                                    "Plotting failed to send error to background task"
1034                                );
1035                            }
1036                        }
1037                        _ = stop_receiver.recv().fuse() => {
1038                            // Nothing, just exit
1039                        }
1040                    }
1041                });
1042            }
1043        });
1044        let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1045
1046        tasks.push(Box::pin(async move {
1047            // Panic will already be printed by now
1048            plotting_join_handle.await.map_err(|_error| {
1049                BackgroundTaskError::BackgroundTaskPanicked {
1050                    task: format!("plotting-{farm_index}"),
1051                }
1052            })
1053        }));
1054
1055        let plotting_scheduler_options = PlottingSchedulerOptions {
1056            public_key_hash: public_key.hash(),
1057            sectors_indices_left_to_plot,
1058            target_sector_count,
1059            last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1060            min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1061            node_client: node_client.clone(),
1062            handlers: Arc::clone(&handlers),
1063            sectors_metadata: Arc::clone(&sectors_metadata),
1064            sectors_to_plot_sender,
1065            new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1066            metrics: metrics.clone(),
1067        };
1068        tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1069
1070        let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1071
1072        tasks.push(Box::pin({
1073            let node_client = node_client.clone();
1074            let metrics = metrics.clone();
1075
1076            async move {
1077                slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1078                    .await
1079                    .map_err(BackgroundTaskError::Farming)
1080            }
1081        }));
1082
1083        let farming_join_handle = task::spawn_blocking({
1084            let erasure_coding = erasure_coding.clone();
1085            let handlers = Arc::clone(&handlers);
1086            let sectors_being_modified = Arc::clone(&sectors_being_modified);
1087            let sectors_metadata = Arc::clone(&sectors_metadata);
1088            let mut start_receiver = start_sender.subscribe();
1089            let mut stop_receiver = stop_sender.subscribe();
1090            let node_client = node_client.clone();
1091            let span = span.clone();
1092            let global_mutex = Arc::clone(&global_mutex);
1093
1094            move || {
1095                let _span_guard = span.enter();
1096
1097                let farming_fut = async move {
1098                    if start_receiver.recv().await.is_err() {
1099                        // Dropped before starting
1100                        return Ok(());
1101                    }
1102
1103                    let plot_audit = PlotAudit::new(&farming_plot);
1104
1105                    let farming_options = FarmingOptions {
1106                        public_key,
1107                        reward_address,
1108                        node_client,
1109                        plot_audit,
1110                        sectors_metadata,
1111                        kzg,
1112                        erasure_coding,
1113                        handlers,
1114                        sectors_being_modified,
1115                        slot_info_notifications: slot_info_forwarder_receiver,
1116                        thread_pool: farming_thread_pool,
1117                        read_sector_record_chunks_mode,
1118                        global_mutex,
1119                        metrics,
1120                    };
1121                    farming::<PosTable, _, _>(farming_options).await
1122                };
1123
1124                Handle::current().block_on(async {
1125                    select! {
1126                        farming_result = farming_fut.fuse() => {
1127                            if let Err(error) = farming_result
1128                                && let Some(error_sender) = error_sender.lock().take()
1129                                && let Err(error) = error_sender.send(error.into())
1130                            {
1131                                error!(
1132                                    %error,
1133                                    "Farming failed to send error to background task",
1134                                );
1135                            }
1136                        }
1137                        _ = stop_receiver.recv().fuse() => {
1138                            // Nothing, just exit
1139                        }
1140                    }
1141                });
1142            }
1143        });
1144        let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1145
1146        tasks.push(Box::pin(async move {
1147            // Panic will already be printed by now
1148            farming_join_handle.await.map_err(|_error| {
1149                BackgroundTaskError::BackgroundTaskPanicked {
1150                    task: format!("farming-{farm_index}"),
1151                }
1152            })
1153        }));
1154
1155        let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1156            public_key,
1157            pieces_in_sector,
1158            plot_file,
1159            Arc::clone(&sectors_metadata),
1160            erasure_coding,
1161            sectors_being_modified,
1162            read_sector_record_chunks_mode,
1163            global_mutex,
1164        );
1165
1166        let reading_join_handle = task::spawn_blocking({
1167            let mut stop_receiver = stop_sender.subscribe();
1168            let reading_fut = reading_fut.instrument(span.clone());
1169
1170            move || {
1171                Handle::current().block_on(async {
1172                    select! {
1173                        _ = reading_fut.fuse() => {
1174                            // Nothing, just exit
1175                        }
1176                        _ = stop_receiver.recv().fuse() => {
1177                            // Nothing, just exit
1178                        }
1179                    }
1180                });
1181            }
1182        });
1183
1184        let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1185
1186        tasks.push(Box::pin(async move {
1187            // Panic will already be printed by now
1188            reading_join_handle.await.map_err(|_error| {
1189                BackgroundTaskError::BackgroundTaskPanicked {
1190                    task: format!("reading-{farm_index}"),
1191                }
1192            })
1193        }));
1194
1195        tasks.push(Box::pin(async move {
1196            match reward_signing(node_client, identity).await {
1197                Ok(reward_signing_fut) => {
1198                    reward_signing_fut.await;
1199                }
1200                Err(error) => {
1201                    return Err(BackgroundTaskError::RewardSigning(anyhow::anyhow!(
1202                        "Failed to subscribe to reward signing notifications: {error}"
1203                    )));
1204                }
1205            }
1206
1207            Ok(())
1208        }));
1209
1210        let farm = Self {
1211            farmer_protocol_info: farmer_app_info.protocol_info,
1212            single_disk_farm_info,
1213            sectors_metadata,
1214            pieces_in_sector,
1215            total_sectors_count: target_sector_count,
1216            span,
1217            tasks,
1218            handlers,
1219            piece_cache,
1220            plot_cache,
1221            piece_reader,
1222            start_sender: Some(start_sender),
1223            stop_sender: Some(stop_sender),
1224            _single_disk_farm_info_lock: single_disk_farm_info_lock,
1225        };
1226        Ok(farm)
1227    }
1228
1229    fn init(
1230        directory: &PathBuf,
1231        farmer_app_info: &FarmerAppInfo,
1232        allocated_space: u64,
1233        max_pieces_in_sector: u16,
1234        cache_percentage: u8,
1235        disable_farm_locking: bool,
1236        create: bool,
1237    ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1238        fs::create_dir_all(directory)?;
1239
1240        let identity = if create {
1241            Identity::open_or_create(directory)?
1242        } else {
1243            Identity::open(directory)?.ok_or_else(|| {
1244                IdentityError::Io(io::Error::new(
1245                    io::ErrorKind::NotFound,
1246                    "Farm does not exist and creation was explicitly disabled",
1247                ))
1248            })?
1249        };
1250        let public_key = identity.public_key().to_bytes().into();
1251
1252        let (single_disk_farm_info, single_disk_farm_info_lock) =
1253            match SingleDiskFarmInfo::load_from(directory)? {
1254                Some(mut single_disk_farm_info) => {
1255                    if &farmer_app_info.genesis_hash != single_disk_farm_info.genesis_hash() {
1256                        return Err(SingleDiskFarmError::WrongChain {
1257                            id: *single_disk_farm_info.id(),
1258                            correct_chain: hex::encode(single_disk_farm_info.genesis_hash()),
1259                            wrong_chain: hex::encode(farmer_app_info.genesis_hash),
1260                        });
1261                    }
1262
1263                    if &public_key != single_disk_farm_info.public_key() {
1264                        return Err(SingleDiskFarmError::IdentityMismatch {
1265                            id: *single_disk_farm_info.id(),
1266                            correct_public_key: *single_disk_farm_info.public_key(),
1267                            wrong_public_key: public_key,
1268                        });
1269                    }
1270
1271                    let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1272
1273                    if max_pieces_in_sector < pieces_in_sector {
1274                        return Err(SingleDiskFarmError::InvalidPiecesInSector {
1275                            id: *single_disk_farm_info.id(),
1276                            max_supported: max_pieces_in_sector,
1277                            initialized_with: pieces_in_sector,
1278                        });
1279                    }
1280
1281                    if max_pieces_in_sector > pieces_in_sector {
1282                        info!(
1283                            pieces_in_sector,
1284                            max_pieces_in_sector,
1285                            "Farm initialized with smaller number of pieces in sector, farm needs \
1286                            to be re-created for increase"
1287                        );
1288                    }
1289
1290                    let mut single_disk_farm_info_lock = None;
1291
1292                    if allocated_space != single_disk_farm_info.allocated_space() {
1293                        info!(
1294                            old_space = %bytesize::to_string(single_disk_farm_info.allocated_space(), true),
1295                            new_space = %bytesize::to_string(allocated_space, true),
1296                            "Farm size has changed"
1297                        );
1298
1299                        let new_allocated_space = allocated_space;
1300                        match &mut single_disk_farm_info {
1301                            SingleDiskFarmInfo::V0 {
1302                                allocated_space, ..
1303                            } => {
1304                                *allocated_space = new_allocated_space;
1305                            }
1306                        }
1307
1308                        single_disk_farm_info_lock =
1309                            single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1310                    } else if !disable_farm_locking {
1311                        single_disk_farm_info_lock = Some(
1312                            SingleDiskFarmInfo::try_lock(directory)
1313                                .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1314                        );
1315                    }
1316
1317                    (single_disk_farm_info, single_disk_farm_info_lock)
1318                }
1319                None => {
1320                    let single_disk_farm_info = SingleDiskFarmInfo::new(
1321                        FarmId::new(),
1322                        farmer_app_info.genesis_hash,
1323                        public_key,
1324                        max_pieces_in_sector,
1325                        allocated_space,
1326                    );
1327
1328                    let single_disk_farm_info_lock =
1329                        single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1330
1331                    (single_disk_farm_info, single_disk_farm_info_lock)
1332                }
1333            };
1334
1335        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1336        let sector_size = sector_size(pieces_in_sector) as u64;
1337        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1338        let allocated_space_distribution = AllocatedSpaceDistribution::new(
1339            allocated_space,
1340            sector_size,
1341            cache_percentage,
1342            sector_metadata_size as u64,
1343        )?;
1344        let target_sector_count = allocated_space_distribution.target_sector_count;
1345
1346        let metadata_file_path = directory.join(Self::METADATA_FILE);
1347        let metadata_file = DirectIoFile::open(&metadata_file_path)?;
1348
1349        let metadata_size = metadata_file.size()?;
1350        let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1351        // Align plot file size for disk sector size
1352        let expected_metadata_size =
1353            expected_metadata_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
1354        let metadata_header = if metadata_size == 0 {
1355            let metadata_header = PlotMetadataHeader {
1356                version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1357                plotted_sector_count: 0,
1358            };
1359
1360            metadata_file
1361                .preallocate(expected_metadata_size)
1362                .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1363            metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1364
1365            metadata_header
1366        } else {
1367            if metadata_size != expected_metadata_size {
1368                // Allocating the whole file (`set_len` below can create a sparse file, which will
1369                // cause writes to fail later)
1370                metadata_file
1371                    .preallocate(expected_metadata_size)
1372                    .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1373                // Truncating file (if necessary)
1374                metadata_file.set_len(expected_metadata_size)?;
1375            }
1376
1377            let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1378            metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1379
1380            let mut metadata_header =
1381                PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1382                    .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1383
1384            if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1385                return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1386                    metadata_header.version,
1387                ));
1388            }
1389
1390            if metadata_header.plotted_sector_count > target_sector_count {
1391                metadata_header.plotted_sector_count = target_sector_count;
1392                metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1393            }
1394
1395            metadata_header
1396        };
1397
1398        let sectors_metadata = {
1399            let mut sectors_metadata =
1400                Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1401
1402            let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1403            for sector_index in 0..metadata_header.plotted_sector_count {
1404                let sector_offset =
1405                    RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1406                metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1407
1408                let sector_metadata =
1409                    match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1410                        Ok(sector_metadata) => sector_metadata,
1411                        Err(error) => {
1412                            warn!(
1413                                path = %metadata_file_path.display(),
1414                                %error,
1415                                %sector_index,
1416                                "Failed to decode sector metadata, replacing with dummy expired \
1417                                sector metadata"
1418                            );
1419
1420                            let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1421                                sector_index,
1422                                pieces_in_sector,
1423                                s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1424                                history_size: HistorySize::from(SegmentIndex::ZERO),
1425                            });
1426                            metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1427
1428                            dummy_sector
1429                        }
1430                    };
1431                sectors_metadata.push(sector_metadata);
1432            }
1433
1434            Arc::new(AsyncRwLock::new(sectors_metadata))
1435        };
1436
1437        let plot_file = DirectIoFile::open(directory.join(Self::PLOT_FILE))?;
1438
1439        if plot_file.size()? != allocated_space_distribution.plot_file_size {
1440            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
1441            // writes to fail later)
1442            plot_file
1443                .preallocate(allocated_space_distribution.plot_file_size)
1444                .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1445            // Truncating file (if necessary)
1446            plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1447        }
1448
1449        let plot_file = Arc::new(plot_file);
1450
1451        let plot_cache = DiskPlotCache::new(
1452            &plot_file,
1453            &sectors_metadata,
1454            target_sector_count,
1455            sector_size,
1456        );
1457
1458        Ok(SingleDiskFarmInit {
1459            identity,
1460            single_disk_farm_info,
1461            single_disk_farm_info_lock,
1462            plot_file,
1463            metadata_file,
1464            metadata_header,
1465            target_sector_count,
1466            sectors_metadata,
1467            piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1468            plot_cache,
1469        })
1470    }
1471
1472    /// Collect summary of single disk farm for presentational purposes
1473    pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1474        let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1475            Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1476            Ok(None) => {
1477                return SingleDiskFarmSummary::NotFound { directory };
1478            }
1479            Err(error) => {
1480                return SingleDiskFarmSummary::Error { directory, error };
1481            }
1482        };
1483
1484        SingleDiskFarmSummary::Found {
1485            info: single_disk_farm_info,
1486            directory,
1487        }
1488    }
1489
1490    /// Effective on-disk allocation of the files related to the farm (takes some buffer space
1491    /// into consideration).
1492    ///
1493    /// This is a helpful number in case some files were not allocated properly or were removed and
1494    /// do not correspond to allocated space in the farm info accurately.
1495    pub fn effective_disk_usage(
1496        directory: &Path,
1497        cache_percentage: u8,
1498    ) -> Result<u64, SingleDiskFarmError> {
1499        let mut effective_disk_usage;
1500        match SingleDiskFarmInfo::load_from(directory)? {
1501            Some(single_disk_farm_info) => {
1502                let allocated_space_distribution = AllocatedSpaceDistribution::new(
1503                    single_disk_farm_info.allocated_space(),
1504                    sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1505                    cache_percentage,
1506                    SectorMetadataChecksummed::encoded_size() as u64,
1507                )?;
1508
1509                effective_disk_usage = single_disk_farm_info.allocated_space();
1510                effective_disk_usage -= Identity::file_size() as u64;
1511                effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1512                effective_disk_usage -= allocated_space_distribution.plot_file_size;
1513                effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1514            }
1515            None => {
1516                // No farm info, try to collect actual file sizes is any
1517                effective_disk_usage = 0;
1518            }
1519        };
1520
1521        if Identity::open(directory)?.is_some() {
1522            effective_disk_usage += Identity::file_size() as u64;
1523        }
1524
1525        match OpenOptions::new()
1526            .read(true)
1527            .open(directory.join(Self::METADATA_FILE))
1528        {
1529            Ok(metadata_file) => {
1530                effective_disk_usage += metadata_file.size()?;
1531            }
1532            Err(error) => {
1533                if error.kind() == io::ErrorKind::NotFound {
1534                    // File is not stored on disk
1535                } else {
1536                    return Err(error.into());
1537                }
1538            }
1539        };
1540
1541        match OpenOptions::new()
1542            .read(true)
1543            .open(directory.join(Self::PLOT_FILE))
1544        {
1545            Ok(plot_file) => {
1546                effective_disk_usage += plot_file.size()?;
1547            }
1548            Err(error) => {
1549                if error.kind() == io::ErrorKind::NotFound {
1550                    // File is not stored on disk
1551                } else {
1552                    return Err(error.into());
1553                }
1554            }
1555        };
1556
1557        match OpenOptions::new()
1558            .read(true)
1559            .open(directory.join(DiskPieceCache::FILE_NAME))
1560        {
1561            Ok(piece_cache) => {
1562                effective_disk_usage += piece_cache.size()?;
1563            }
1564            Err(error) => {
1565                if error.kind() == io::ErrorKind::NotFound {
1566                    // File is not stored on disk
1567                } else {
1568                    return Err(error.into());
1569                }
1570            }
1571        };
1572
1573        Ok(effective_disk_usage)
1574    }
1575
1576    /// Read all sectors metadata
1577    pub fn read_all_sectors_metadata(
1578        directory: &Path,
1579    ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1580        let metadata_file = DirectIoFile::open(directory.join(Self::METADATA_FILE))?;
1581
1582        let metadata_size = metadata_file.size()?;
1583        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1584
1585        let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1586        metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1587
1588        let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1589            .map_err(|error| {
1590                io::Error::other(format!("Failed to decode metadata header: {}", error))
1591            })?;
1592
1593        if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1594            return Err(io::Error::other(format!(
1595                "Unsupported metadata version {}",
1596                metadata_header.version
1597            )));
1598        }
1599
1600        let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1601            ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1602        );
1603
1604        let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1605        for sector_index in 0..metadata_header.plotted_sector_count {
1606            metadata_file.read_exact_at(
1607                &mut sector_metadata_bytes,
1608                RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1609            )?;
1610            sectors_metadata.push(
1611                SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1612                    |error| {
1613                        io::Error::other(format!("Failed to decode sector metadata: {}", error))
1614                    },
1615                )?,
1616            );
1617        }
1618
1619        Ok(sectors_metadata)
1620    }
1621
1622    /// ID of this farm
1623    pub fn id(&self) -> &FarmId {
1624        self.single_disk_farm_info.id()
1625    }
1626
1627    /// Info of this farm
1628    pub fn info(&self) -> &SingleDiskFarmInfo {
1629        &self.single_disk_farm_info
1630    }
1631
1632    /// Number of sectors in this farm
1633    pub fn total_sectors_count(&self) -> SectorIndex {
1634        self.total_sectors_count
1635    }
1636
1637    /// Read information about sectors plotted so far
1638    pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1639        SingleDiskPlottedSectors {
1640            public_key: *self.single_disk_farm_info.public_key(),
1641            pieces_in_sector: self.pieces_in_sector,
1642            farmer_protocol_info: self.farmer_protocol_info,
1643            sectors_metadata: Arc::clone(&self.sectors_metadata),
1644        }
1645    }
1646
1647    /// Get piece cache instance
1648    pub fn piece_cache(&self) -> SingleDiskPieceCache {
1649        self.piece_cache.clone()
1650    }
1651
1652    /// Get plot cache instance
1653    pub fn plot_cache(&self) -> DiskPlotCache {
1654        self.plot_cache.clone()
1655    }
1656
1657    /// Get piece reader to read plotted pieces later
1658    pub fn piece_reader(&self) -> DiskPieceReader {
1659        self.piece_reader.clone()
1660    }
1661
1662    /// Subscribe to sector updates
1663    pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1664        self.handlers.sector_update.add(callback)
1665    }
1666
1667    /// Subscribe to farming notifications
1668    pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1669        self.handlers.farming_notification.add(callback)
1670    }
1671
1672    /// Subscribe to new solution notification
1673    pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1674        self.handlers.solution.add(callback)
1675    }
1676
1677    /// Run and wait for background threads to exit or return an error
1678    pub async fn run(mut self) -> anyhow::Result<()> {
1679        if let Some(start_sender) = self.start_sender.take() {
1680            // Do not care if anyone is listening on the other side
1681            let _ = start_sender.send(());
1682        }
1683
1684        while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1685            result?;
1686        }
1687
1688        Ok(())
1689    }
1690
1691    /// Wipe everything that belongs to this single disk farm
1692    pub fn wipe(directory: &Path) -> io::Result<()> {
1693        let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1694        match SingleDiskFarmInfo::load_from(directory) {
1695            Ok(Some(single_disk_farm_info)) => {
1696                info!("Found single disk farm {}", single_disk_farm_info.id());
1697            }
1698            Ok(None) => {
1699                return Err(io::Error::new(
1700                    io::ErrorKind::NotFound,
1701                    format!(
1702                        "Single disk farm info not found at {}",
1703                        single_disk_info_info_path.display()
1704                    ),
1705                ));
1706            }
1707            Err(error) => {
1708                warn!("Found unknown single disk farm: {}", error);
1709            }
1710        }
1711
1712        {
1713            let plot = directory.join(Self::PLOT_FILE);
1714            if plot.exists() {
1715                info!("Deleting plot file at {}", plot.display());
1716                fs::remove_file(plot)?;
1717            }
1718        }
1719        {
1720            let metadata = directory.join(Self::METADATA_FILE);
1721            if metadata.exists() {
1722                info!("Deleting metadata file at {}", metadata.display());
1723                fs::remove_file(metadata)?;
1724            }
1725        }
1726        // TODO: Identity should be able to wipe itself instead of assuming a specific file name
1727        //  here
1728        {
1729            let identity = directory.join("identity.bin");
1730            if identity.exists() {
1731                info!("Deleting identity file at {}", identity.display());
1732                fs::remove_file(identity)?;
1733            }
1734        }
1735
1736        DiskPieceCache::wipe(directory)?;
1737
1738        info!(
1739            "Deleting info file at {}",
1740            single_disk_info_info_path.display()
1741        );
1742        fs::remove_file(single_disk_info_info_path)
1743    }
1744
1745    /// Check the farm for corruption and repair errors (caused by disk errors or something else),
1746    /// returns an error when irrecoverable errors occur.
1747    pub fn scrub(
1748        directory: &Path,
1749        disable_farm_locking: bool,
1750        target: ScrubTarget,
1751        dry_run: bool,
1752    ) -> Result<(), SingleDiskFarmScrubError> {
1753        let span = Span::current();
1754
1755        if dry_run {
1756            info!("Dry run is used, no changes will be written to disk");
1757        }
1758
1759        if target.metadata() || target.plot() {
1760            let info = {
1761                let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1762                info!(path = %file.display(), "Checking info file");
1763
1764                match SingleDiskFarmInfo::load_from(directory) {
1765                    Ok(Some(info)) => info,
1766                    Ok(None) => {
1767                        return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1768                    }
1769                    Err(error) => {
1770                        return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1771                    }
1772                }
1773            };
1774
1775            let _single_disk_farm_info_lock = if disable_farm_locking {
1776                None
1777            } else {
1778                Some(
1779                    SingleDiskFarmInfo::try_lock(directory)
1780                        .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1781                )
1782            };
1783
1784            let identity = {
1785                let file = directory.join(Identity::FILE_NAME);
1786                info!(path = %file.display(), "Checking identity file");
1787
1788                match Identity::open(directory) {
1789                    Ok(Some(identity)) => identity,
1790                    Ok(None) => {
1791                        return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1792                    }
1793                    Err(error) => {
1794                        return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1795                    }
1796                }
1797            };
1798
1799            if PublicKey::from(identity.public.to_bytes()) != *info.public_key() {
1800                return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1801                    identity: PublicKey::from(identity.public.to_bytes()),
1802                    info: *info.public_key(),
1803                });
1804            }
1805
1806            let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1807
1808            let metadata_file_path = directory.join(Self::METADATA_FILE);
1809            let (metadata_file, mut metadata_header) = {
1810                info!(path = %metadata_file_path.display(), "Checking metadata file");
1811
1812                let metadata_file = match OpenOptions::new()
1813                    .read(true)
1814                    .write(!dry_run)
1815                    .open(&metadata_file_path)
1816                {
1817                    Ok(metadata_file) => metadata_file,
1818                    Err(error) => {
1819                        return Err(if error.kind() == io::ErrorKind::NotFound {
1820                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1821                                file: metadata_file_path,
1822                            }
1823                        } else {
1824                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1825                                file: metadata_file_path,
1826                                error,
1827                            }
1828                        });
1829                    }
1830                };
1831
1832                // Error doesn't matter here
1833                let _ = metadata_file.advise_sequential_access();
1834
1835                let metadata_size = match metadata_file.size() {
1836                    Ok(metadata_size) => metadata_size,
1837                    Err(error) => {
1838                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1839                            file: metadata_file_path,
1840                            error,
1841                        });
1842                    }
1843                };
1844
1845                if metadata_size < RESERVED_PLOT_METADATA {
1846                    return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1847                        file: metadata_file_path,
1848                        reserved_size: RESERVED_PLOT_METADATA,
1849                        size: metadata_size,
1850                    });
1851                }
1852
1853                let mut metadata_header = {
1854                    let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1855
1856                    if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1857                        return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1858                            file: metadata_file_path,
1859                            size: RESERVED_PLOT_METADATA,
1860                            offset: 0,
1861                            error,
1862                        });
1863                    }
1864
1865                    PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1866                        .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1867                };
1868
1869                if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1870                    return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1871                        metadata_header.version,
1872                    ));
1873                }
1874
1875                let plotted_sector_count = metadata_header.plotted_sector_count;
1876
1877                let expected_metadata_size = RESERVED_PLOT_METADATA
1878                    + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1879
1880                if metadata_size < expected_metadata_size {
1881                    warn!(
1882                        %metadata_size,
1883                        %expected_metadata_size,
1884                        "Metadata file size is smaller than expected, shrinking number of plotted \
1885                        sectors to correct value"
1886                    );
1887
1888                    metadata_header.plotted_sector_count =
1889                        ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1890                            as SectorIndex;
1891                    let metadata_header_bytes = metadata_header.encode();
1892
1893                    if !dry_run {
1894                        if let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0) {
1895                            return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1896                                file: metadata_file_path,
1897                                size: metadata_header_bytes.len() as u64,
1898                                offset: 0,
1899                                error,
1900                            });
1901                        }
1902                    }
1903                }
1904
1905                (metadata_file, metadata_header)
1906            };
1907
1908            let pieces_in_sector = info.pieces_in_sector();
1909            let sector_size = sector_size(pieces_in_sector) as u64;
1910
1911            let plot_file_path = directory.join(Self::PLOT_FILE);
1912            let plot_file = {
1913                let plot_file_path = directory.join(Self::PLOT_FILE);
1914                info!(path = %plot_file_path.display(), "Checking plot file");
1915
1916                let plot_file = match OpenOptions::new()
1917                    .read(true)
1918                    .write(!dry_run)
1919                    .open(&plot_file_path)
1920                {
1921                    Ok(plot_file) => plot_file,
1922                    Err(error) => {
1923                        return Err(if error.kind() == io::ErrorKind::NotFound {
1924                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1925                                file: plot_file_path,
1926                            }
1927                        } else {
1928                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1929                                file: plot_file_path,
1930                                error,
1931                            }
1932                        });
1933                    }
1934                };
1935
1936                // Error doesn't matter here
1937                let _ = plot_file.advise_sequential_access();
1938
1939                let plot_size = match plot_file.size() {
1940                    Ok(metadata_size) => metadata_size,
1941                    Err(error) => {
1942                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1943                            file: plot_file_path,
1944                            error,
1945                        });
1946                    }
1947                };
1948
1949                let min_expected_plot_size =
1950                    u64::from(metadata_header.plotted_sector_count) * sector_size;
1951                if plot_size < min_expected_plot_size {
1952                    warn!(
1953                        %plot_size,
1954                        %min_expected_plot_size,
1955                        "Plot file size is smaller than expected, shrinking number of plotted \
1956                        sectors to correct value"
1957                    );
1958
1959                    metadata_header.plotted_sector_count = (plot_size / sector_size) as SectorIndex;
1960                    let metadata_header_bytes = metadata_header.encode();
1961
1962                    if !dry_run {
1963                        if let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0) {
1964                            return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1965                                file: plot_file_path,
1966                                size: metadata_header_bytes.len() as u64,
1967                                offset: 0,
1968                                error,
1969                            });
1970                        }
1971                    }
1972                }
1973
1974                plot_file
1975            };
1976
1977            let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1978
1979            info!("Checking sectors and corresponding metadata");
1980            (0..metadata_header.plotted_sector_count)
1981                .into_par_iter()
1982                .map_init(
1983                    || vec![0u8; Record::SIZE],
1984                    |scratch_buffer, sector_index| {
1985                        let _span_guard = span.enter();
1986
1987                        let offset = RESERVED_PLOT_METADATA
1988                            + u64::from(sector_index) * sector_metadata_size as u64;
1989                        if let Err(error) = metadata_file
1990                            .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
1991                        {
1992                            warn!(
1993                                path = %metadata_file_path.display(),
1994                                %error,
1995                                %offset,
1996                                size = %sector_metadata_size,
1997                                %sector_index,
1998                                "Failed to read sector metadata, replacing with dummy expired \
1999                                sector metadata"
2000                            );
2001
2002                            if !dry_run {
2003                                write_dummy_sector_metadata(
2004                                    &metadata_file,
2005                                    &metadata_file_path,
2006                                    sector_index,
2007                                    pieces_in_sector,
2008                                )?;
2009                            }
2010                            return Ok(());
2011                        }
2012
2013                        let sector_metadata = match SectorMetadataChecksummed::decode(
2014                            &mut &scratch_buffer[..sector_metadata_size],
2015                        ) {
2016                            Ok(sector_metadata) => sector_metadata,
2017                            Err(error) => {
2018                                warn!(
2019                                    path = %metadata_file_path.display(),
2020                                    %error,
2021                                    %sector_index,
2022                                    "Failed to decode sector metadata, replacing with dummy \
2023                                    expired sector metadata"
2024                                );
2025
2026                                if !dry_run {
2027                                    write_dummy_sector_metadata(
2028                                        &metadata_file,
2029                                        &metadata_file_path,
2030                                        sector_index,
2031                                        pieces_in_sector,
2032                                    )?;
2033                                }
2034                                return Ok(());
2035                            }
2036                        };
2037
2038                        if sector_metadata.sector_index != sector_index {
2039                            warn!(
2040                                path = %metadata_file_path.display(),
2041                                %sector_index,
2042                                found_sector_index = sector_metadata.sector_index,
2043                                "Sector index mismatch, replacing with dummy expired sector \
2044                                metadata"
2045                            );
2046
2047                            if !dry_run {
2048                                write_dummy_sector_metadata(
2049                                    &metadata_file,
2050                                    &metadata_file_path,
2051                                    sector_index,
2052                                    pieces_in_sector,
2053                                )?;
2054                            }
2055                            return Ok(());
2056                        }
2057
2058                        if sector_metadata.pieces_in_sector != pieces_in_sector {
2059                            warn!(
2060                                path = %metadata_file_path.display(),
2061                                %sector_index,
2062                                %pieces_in_sector,
2063                                found_pieces_in_sector = sector_metadata.pieces_in_sector,
2064                                "Pieces in sector mismatch, replacing with dummy expired sector \
2065                                metadata"
2066                            );
2067
2068                            if !dry_run {
2069                                write_dummy_sector_metadata(
2070                                    &metadata_file,
2071                                    &metadata_file_path,
2072                                    sector_index,
2073                                    pieces_in_sector,
2074                                )?;
2075                            }
2076                            return Ok(());
2077                        }
2078
2079                        if target.plot() {
2080                            let mut hasher = blake3::Hasher::new();
2081                            // Read sector bytes and compute checksum
2082                            for offset_in_sector in
2083                                sector_bytes_range.clone().step_by(scratch_buffer.len())
2084                            {
2085                                let offset =
2086                                    u64::from(sector_index) * sector_size + offset_in_sector as u64;
2087                                let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2088                                    .min(sector_bytes_range.end)
2089                                    - offset_in_sector;
2090
2091                                let bytes = &mut scratch_buffer[..bytes_to_read];
2092
2093                                if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2094                                    warn!(
2095                                        path = %plot_file_path.display(),
2096                                        %error,
2097                                        %sector_index,
2098                                        %offset,
2099                                        size = %bytes.len() as u64,
2100                                        "Failed to read sector bytes"
2101                                    );
2102
2103                                    continue;
2104                                }
2105
2106                                hasher.update(bytes);
2107                            }
2108
2109                            let actual_checksum = *hasher.finalize().as_bytes();
2110                            let mut expected_checksum = [0; Blake3Hash::SIZE];
2111                            {
2112                                let offset = u64::from(sector_index) * sector_size
2113                                    + sector_bytes_range.end as u64;
2114                                if let Err(error) =
2115                                    plot_file.read_exact_at(&mut expected_checksum, offset)
2116                                {
2117                                    warn!(
2118                                        path = %plot_file_path.display(),
2119                                        %error,
2120                                        %sector_index,
2121                                        %offset,
2122                                        size = %expected_checksum.len() as u64,
2123                                        "Failed to read sector checksum bytes"
2124                                    );
2125                                }
2126                            }
2127
2128                            // Verify checksum
2129                            if actual_checksum != expected_checksum {
2130                                warn!(
2131                                    path = %plot_file_path.display(),
2132                                    %sector_index,
2133                                    actual_checksum = %hex::encode(actual_checksum),
2134                                    expected_checksum = %hex::encode(expected_checksum),
2135                                    "Plotted sector checksum mismatch, replacing with dummy \
2136                                    expired sector"
2137                                );
2138
2139                                if !dry_run {
2140                                    write_dummy_sector_metadata(
2141                                        &metadata_file,
2142                                        &metadata_file_path,
2143                                        sector_index,
2144                                        pieces_in_sector,
2145                                    )?;
2146                                }
2147
2148                                scratch_buffer.fill(0);
2149
2150                                hasher.reset();
2151                                // Fill sector with zeroes and compute checksum
2152                                for offset_in_sector in
2153                                    sector_bytes_range.clone().step_by(scratch_buffer.len())
2154                                {
2155                                    let offset = u64::from(sector_index) * sector_size
2156                                        + offset_in_sector as u64;
2157                                    let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2158                                        .min(sector_bytes_range.end)
2159                                        - offset_in_sector;
2160                                    let bytes = &mut scratch_buffer[..bytes_to_write];
2161
2162                                    if !dry_run {
2163                                        if let Err(error) = plot_file.write_all_at(bytes, offset) {
2164                                            return Err(
2165                                                SingleDiskFarmScrubError::FailedToWriteBytes {
2166                                                    file: plot_file_path.clone(),
2167                                                    size: scratch_buffer.len() as u64,
2168                                                    offset,
2169                                                    error,
2170                                                },
2171                                            );
2172                                        }
2173                                    }
2174
2175                                    hasher.update(bytes);
2176                                }
2177                                // Write checksum
2178                                {
2179                                    let checksum = *hasher.finalize().as_bytes();
2180                                    let offset = u64::from(sector_index) * sector_size
2181                                        + sector_bytes_range.end as u64;
2182                                    if !dry_run {
2183                                        if let Err(error) =
2184                                            plot_file.write_all_at(&checksum, offset)
2185                                        {
2186                                            return Err(
2187                                                SingleDiskFarmScrubError::FailedToWriteBytes {
2188                                                    file: plot_file_path.clone(),
2189                                                    size: checksum.len() as u64,
2190                                                    offset,
2191                                                    error,
2192                                                },
2193                                            );
2194                                        }
2195                                    }
2196                                }
2197
2198                                return Ok(());
2199                            }
2200                        }
2201
2202                        trace!(%sector_index, "Sector is in good shape");
2203
2204                        Ok(())
2205                    },
2206                )
2207                .try_for_each({
2208                    let span = &span;
2209                    let checked_sectors = AtomicUsize::new(0);
2210
2211                    move |result| {
2212                        let _span_guard = span.enter();
2213
2214                        let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2215                        if checked_sectors > 1 && checked_sectors % 10 == 0 {
2216                            info!(
2217                                "Checked {}/{} sectors",
2218                                checked_sectors, metadata_header.plotted_sector_count
2219                            );
2220                        }
2221
2222                        result
2223                    }
2224                })?;
2225        }
2226
2227        if target.cache() {
2228            Self::scrub_cache(directory, dry_run)?;
2229        }
2230
2231        info!("Farm check completed");
2232
2233        Ok(())
2234    }
2235
2236    fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2237        let span = Span::current();
2238
2239        let file = directory.join(DiskPieceCache::FILE_NAME);
2240        info!(path = %file.display(), "Checking cache file");
2241
2242        let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2243            Ok(plot_file) => plot_file,
2244            Err(error) => {
2245                return if error.kind() == io::ErrorKind::NotFound {
2246                    warn!(
2247                        file = %file.display(),
2248                        "Cache file does not exist, this is expected in farming cluster"
2249                    );
2250                    Ok(())
2251                } else {
2252                    Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2253                };
2254            }
2255        };
2256
2257        // Error doesn't matter here
2258        let _ = cache_file.advise_sequential_access();
2259
2260        let cache_size = match cache_file.size() {
2261            Ok(cache_size) => cache_size,
2262            Err(error) => {
2263                return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2264            }
2265        };
2266
2267        let element_size = DiskPieceCache::element_size();
2268        let number_of_cached_elements = cache_size / u64::from(element_size);
2269        let dummy_element = vec![0; element_size as usize];
2270        (0..number_of_cached_elements)
2271            .into_par_iter()
2272            .map_with(vec![0; element_size as usize], |element, cache_offset| {
2273                let _span_guard = span.enter();
2274
2275                let offset = cache_offset * u64::from(element_size);
2276                if let Err(error) = cache_file.read_exact_at(element, offset) {
2277                    warn!(
2278                        path = %file.display(),
2279                        %cache_offset,
2280                        size = %element.len() as u64,
2281                        %offset,
2282                        %error,
2283                        "Failed to read cached piece, replacing with dummy element"
2284                    );
2285
2286                    if !dry_run {
2287                        if let Err(error) = cache_file.write_all_at(&dummy_element, offset) {
2288                            return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2289                                file: file.clone(),
2290                                size: u64::from(element_size),
2291                                offset,
2292                                error,
2293                            });
2294                        }
2295                    }
2296
2297                    return Ok(());
2298                }
2299
2300                let (index_and_piece_bytes, expected_checksum) =
2301                    element.split_at(element_size as usize - Blake3Hash::SIZE);
2302                let actual_checksum = blake3_hash(index_and_piece_bytes);
2303                if *actual_checksum != *expected_checksum && element != &dummy_element {
2304                    warn!(
2305                        %cache_offset,
2306                        actual_checksum = %hex::encode(actual_checksum),
2307                        expected_checksum = %hex::encode(expected_checksum),
2308                        "Cached piece checksum mismatch, replacing with dummy element"
2309                    );
2310
2311                    if !dry_run {
2312                        if let Err(error) = cache_file.write_all_at(&dummy_element, offset) {
2313                            return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2314                                file: file.clone(),
2315                                size: u64::from(element_size),
2316                                offset,
2317                                error,
2318                            });
2319                        }
2320                    }
2321
2322                    return Ok(());
2323                }
2324
2325                Ok(())
2326            })
2327            .try_for_each({
2328                let span = &span;
2329                let checked_elements = AtomicUsize::new(0);
2330
2331                move |result| {
2332                    let _span_guard = span.enter();
2333
2334                    let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2335                    if checked_elements > 1 && checked_elements % 1000 == 0 {
2336                        info!(
2337                            "Checked {}/{} cache elements",
2338                            checked_elements, number_of_cached_elements
2339                        );
2340                    }
2341
2342                    result
2343                }
2344            })?;
2345
2346        Ok(())
2347    }
2348}
2349
2350fn write_dummy_sector_metadata(
2351    metadata_file: &File,
2352    metadata_file_path: &Path,
2353    sector_index: SectorIndex,
2354    pieces_in_sector: u16,
2355) -> Result<(), SingleDiskFarmScrubError> {
2356    let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2357        sector_index,
2358        pieces_in_sector,
2359        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2360        history_size: HistorySize::from(SegmentIndex::ZERO),
2361    })
2362    .encode();
2363    let sector_offset = RESERVED_PLOT_METADATA
2364        + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2365    metadata_file
2366        .write_all_at(&dummy_sector_bytes, sector_offset)
2367        .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2368            file: metadata_file_path.to_path_buf(),
2369            size: dummy_sector_bytes.len() as u64,
2370            offset: sector_offset,
2371            error,
2372        })
2373}