subspace_farmer/
single_disk_farm.rs

1//! Primary [`Farm`] implementation that deals with hardware directly
2//!
3//! Single disk farm is an abstraction that contains an identity, associated plot with metadata and
4//! a small piece cache. It fully manages farming and plotting process, including listening to node
5//! notifications, producing solutions and singing rewards.
6
7pub mod direct_io_file;
8pub mod farming;
9pub mod identity;
10mod metrics;
11pub mod piece_cache;
12pub mod piece_reader;
13pub mod plot_cache;
14mod plotted_sectors;
15mod plotting;
16mod reward_signing;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20    Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21    PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::direct_io_file::{DISK_SECTOR_SIZE, DirectIoFile};
26use crate::single_disk_farm::farming::rayon_files::RayonFiles;
27use crate::single_disk_farm::farming::{
28    FarmingOptions, PlotAudit, farming, slot_notification_forwarder,
29};
30use crate::single_disk_farm::identity::{Identity, IdentityError};
31use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
32use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
33use crate::single_disk_farm::piece_reader::DiskPieceReader;
34use crate::single_disk_farm::plot_cache::DiskPlotCache;
35use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
36pub use crate::single_disk_farm::plotting::PlottingError;
37use crate::single_disk_farm::plotting::{
38    PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, plotting, plotting_scheduler,
39};
40use crate::single_disk_farm::reward_signing::reward_signing;
41use crate::utils::tokio_rayon_spawn_handler;
42use crate::{KNOWN_PEERS_CACHE_SIZE, farm};
43use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
44use async_trait::async_trait;
45use event_listener_primitives::{Bag, HandlerId};
46use futures::channel::{mpsc, oneshot};
47use futures::stream::FuturesUnordered;
48use futures::{FutureExt, StreamExt, select};
49use parity_scale_codec::{Decode, Encode};
50use parking_lot::Mutex;
51use prometheus_client::registry::Registry;
52use rayon::prelude::*;
53use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
54use serde::{Deserialize, Serialize};
55use static_assertions::const_assert;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::future::Future;
59use std::io::Write;
60use std::num::{NonZeroU32, NonZeroUsize};
61use std::path::{Path, PathBuf};
62use std::pin::Pin;
63use std::str::FromStr;
64use std::sync::Arc;
65use std::sync::atomic::{AtomicUsize, Ordering};
66use std::time::Duration;
67use std::{fmt, fs, io, mem};
68use subspace_core_primitives::PublicKey;
69use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash};
70use subspace_core_primitives::pieces::Record;
71use subspace_core_primitives::sectors::SectorIndex;
72use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
73use subspace_erasure_coding::ErasureCoding;
74use subspace_farmer_components::FarmerProtocolInfo;
75use subspace_farmer_components::file_ext::FileExt;
76use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
77use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed, sector_size};
78use subspace_kzg::Kzg;
79use subspace_networking::KnownPeersManager;
80use subspace_process::AsyncJoinOnDrop;
81use subspace_proof_of_space::Table;
82use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
83use thiserror::Error;
84use tokio::runtime::Handle;
85use tokio::sync::broadcast;
86use tokio::task;
87use tracing::{Instrument, Span, error, info, trace, warn};
88
89// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
90// usize depending on chain parameters
91const_assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
92
93/// Reserve 1M of space for plot metadata (for potential future expansion)
94const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
95/// Reserve 1M of space for farm info (for potential future expansion)
96const RESERVED_FARM_INFO: u64 = 1024 * 1024;
97const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
98
99/// Exclusive lock for single disk farm info file, ensuring no concurrent edits by cooperating processes is done
100#[derive(Debug)]
101#[must_use = "Lock file must be kept around or as long as farm is used"]
102pub struct SingleDiskFarmInfoLock {
103    _file: File,
104}
105
106/// Important information about the contents of the `SingleDiskFarm`
107#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "camelCase")]
109pub enum SingleDiskFarmInfo {
110    /// V0 of the info
111    #[serde(rename_all = "camelCase")]
112    V0 {
113        /// ID of the farm
114        id: FarmId,
115        /// Genesis hash of the chain used for farm creation
116        #[serde(with = "hex")]
117        genesis_hash: [u8; 32],
118        /// Public key of identity used for farm creation
119        public_key: PublicKey,
120        /// How many pieces does one sector contain.
121        pieces_in_sector: u16,
122        /// How much space in bytes is allocated for this farm
123        allocated_space: u64,
124    },
125}
126
127impl SingleDiskFarmInfo {
128    const FILE_NAME: &'static str = "single_disk_farm.json";
129
130    /// Create new instance
131    pub fn new(
132        id: FarmId,
133        genesis_hash: [u8; 32],
134        public_key: PublicKey,
135        pieces_in_sector: u16,
136        allocated_space: u64,
137    ) -> Self {
138        Self::V0 {
139            id,
140            genesis_hash,
141            public_key,
142            pieces_in_sector,
143            allocated_space,
144        }
145    }
146
147    /// Load `SingleDiskFarm` from path is supposed to be stored, `None` means no info file was
148    /// found, happens during first start.
149    pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
150        let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
151            Ok(bytes) => bytes,
152            Err(error) => {
153                return if error.kind() == io::ErrorKind::NotFound {
154                    Ok(None)
155                } else {
156                    Err(error)
157                };
158            }
159        };
160
161        serde_json::from_slice(&bytes)
162            .map(Some)
163            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
164    }
165
166    /// Store `SingleDiskFarm` info to path, so it can be loaded again upon restart.
167    ///
168    /// Can optionally return a lock.
169    pub fn store_to(
170        &self,
171        directory: &Path,
172        lock: bool,
173    ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
174        let mut file = OpenOptions::new()
175            .write(true)
176            .create(true)
177            .truncate(false)
178            .open(directory.join(Self::FILE_NAME))?;
179        if lock {
180            fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
181        }
182        file.set_len(0)?;
183        file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
184
185        Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
186    }
187
188    /// Try to acquire exclusive lock on the single disk farm info file, ensuring no concurrent edits by cooperating
189    /// processes is done
190    pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
191        let file = File::open(directory.join(Self::FILE_NAME))?;
192        fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
193
194        Ok(SingleDiskFarmInfoLock { _file: file })
195    }
196
197    /// ID of the farm
198    pub fn id(&self) -> &FarmId {
199        let Self::V0 { id, .. } = self;
200        id
201    }
202
203    /// Genesis hash of the chain used for farm creation
204    pub fn genesis_hash(&self) -> &[u8; 32] {
205        let Self::V0 { genesis_hash, .. } = self;
206        genesis_hash
207    }
208
209    /// Public key of identity used for farm creation
210    pub fn public_key(&self) -> &PublicKey {
211        let Self::V0 { public_key, .. } = self;
212        public_key
213    }
214
215    /// How many pieces does one sector contain.
216    pub fn pieces_in_sector(&self) -> u16 {
217        match self {
218            SingleDiskFarmInfo::V0 {
219                pieces_in_sector, ..
220            } => *pieces_in_sector,
221        }
222    }
223
224    /// How much space in bytes is allocated for this farm
225    pub fn allocated_space(&self) -> u64 {
226        match self {
227            SingleDiskFarmInfo::V0 {
228                allocated_space, ..
229            } => *allocated_space,
230        }
231    }
232}
233
234/// Summary of single disk farm for presentational purposes
235#[derive(Debug)]
236pub enum SingleDiskFarmSummary {
237    /// Farm was found and read successfully
238    Found {
239        /// Farm info
240        info: SingleDiskFarmInfo,
241        /// Path to directory where farm is stored.
242        directory: PathBuf,
243    },
244    /// Farm was not found
245    NotFound {
246        /// Path to directory where farm is stored.
247        directory: PathBuf,
248    },
249    /// Failed to open farm
250    Error {
251        /// Path to directory where farm is stored.
252        directory: PathBuf,
253        /// Error itself
254        error: io::Error,
255    },
256}
257
258#[derive(Debug, Encode, Decode)]
259struct PlotMetadataHeader {
260    version: u8,
261    plotted_sector_count: SectorIndex,
262}
263
264impl PlotMetadataHeader {
265    #[inline]
266    fn encoded_size() -> usize {
267        let default = PlotMetadataHeader {
268            version: 0,
269            plotted_sector_count: 0,
270        };
271
272        default.encoded_size()
273    }
274}
275
276/// Options used to open single disk farm
277#[derive(Debug)]
278pub struct SingleDiskFarmOptions<'a, NC>
279where
280    NC: Clone,
281{
282    /// Path to directory where farm is stored.
283    pub directory: PathBuf,
284    /// Information necessary for farmer application
285    pub farmer_app_info: FarmerAppInfo,
286    /// How much space in bytes was allocated
287    pub allocated_space: u64,
288    /// How many pieces one sector is supposed to contain (max)
289    pub max_pieces_in_sector: u16,
290    /// RPC client connected to Subspace node
291    pub node_client: NC,
292    /// Address where farming rewards should go
293    pub reward_address: PublicKey,
294    /// Plotter
295    pub plotter: Arc<dyn Plotter + Send + Sync>,
296    /// Kzg instance to use.
297    pub kzg: Kzg,
298    /// Erasure coding instance to use.
299    pub erasure_coding: ErasureCoding,
300    /// Percentage of allocated space dedicated for caching purposes
301    pub cache_percentage: u8,
302    /// Thread pool size used for farming (mostly for blocking I/O, but also for some
303    /// compute-intensive operations during proving)
304    pub farming_thread_pool_size: usize,
305    /// Notification for plotter to start, can be used to delay plotting until some initialization
306    /// has happened externally
307    pub plotting_delay: Option<oneshot::Receiver<()>>,
308    /// Global mutex that can restrict concurrency of resource-intensive operations and make sure
309    /// that those operations that are very sensitive (like proving) have all the resources
310    /// available to them for the highest probability of success
311    pub global_mutex: Arc<AsyncMutex<()>>,
312    /// How many sectors a will be plotted concurrently per farm
313    pub max_plotting_sectors_per_farm: NonZeroUsize,
314    /// Disable farm locking, for example if file system doesn't support it
315    pub disable_farm_locking: bool,
316    /// Mode to use for reading of sector record chunks instead
317    pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
318    /// Prometheus registry
319    pub registry: Option<&'a Mutex<&'a mut Registry>>,
320    /// Whether to create a farm if it doesn't yet exist
321    pub create: bool,
322}
323
324/// Errors happening when trying to create/open single disk farm
325#[derive(Debug, Error)]
326pub enum SingleDiskFarmError {
327    /// Failed to open or create identity
328    #[error("Failed to open or create identity: {0}")]
329    FailedToOpenIdentity(#[from] IdentityError),
330    /// Farm is likely already in use, make sure no other farmer is using it
331    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
332    LikelyAlreadyInUse(io::Error),
333    /// I/O error occurred
334    #[error("Single disk farm I/O error: {0}")]
335    Io(#[from] io::Error),
336    /// Failed to spawn task for blocking thread
337    #[error("Failed to spawn task for blocking thread: {0}")]
338    TokioJoinError(#[from] task::JoinError),
339    /// Piece cache error
340    #[error("Piece cache error: {0}")]
341    PieceCacheError(#[from] DiskPieceCacheError),
342    /// Can't preallocate metadata file, probably not enough space on disk
343    #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
344    CantPreallocateMetadataFile(io::Error),
345    /// Can't preallocate plot file, probably not enough space on disk
346    #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
347    CantPreallocatePlotFile(io::Error),
348    /// Wrong chain (genesis hash)
349    #[error(
350        "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
351        created, it is not possible to use farm on a different chain"
352    )]
353    WrongChain {
354        /// Farm ID
355        id: FarmId,
356        /// Hex-encoded genesis hash during farm creation
357        // TODO: Wrapper type with `Display` impl for genesis hash
358        correct_chain: String,
359        /// Hex-encoded current genesis hash
360        wrong_chain: String,
361    },
362    /// Public key in identity doesn't match metadata
363    #[error(
364        "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
365        farm was created, something went wrong, likely due to manual edits"
366    )]
367    IdentityMismatch {
368        /// Farm ID
369        id: FarmId,
370        /// Public key used during farm creation
371        correct_public_key: PublicKey,
372        /// Current public key
373        wrong_public_key: PublicKey,
374    },
375    /// Invalid number pieces in sector
376    #[error(
377        "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
378        {initialized_with}"
379    )]
380    InvalidPiecesInSector {
381        /// Farm ID
382        id: FarmId,
383        /// Max supported pieces in sector
384        max_supported: u16,
385        /// Number of pieces in sector farm is initialized with
386        initialized_with: u16,
387    },
388    /// Failed to decode metadata header
389    #[error("Failed to decode metadata header: {0}")]
390    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
391    /// Unexpected metadata version
392    #[error("Unexpected metadata version {0}")]
393    UnexpectedMetadataVersion(u8),
394    /// Allocated space is not enough for one sector
395    #[error(
396        "Allocated space is not enough for one sector. \
397        The lowest acceptable value for allocated space is {min_space} bytes, \
398        provided {allocated_space} bytes."
399    )]
400    InsufficientAllocatedSpace {
401        /// Minimal allocated space
402        min_space: u64,
403        /// Current allocated space
404        allocated_space: u64,
405    },
406    /// Farm is too large
407    #[error(
408        "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
409        supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
410        instead."
411    )]
412    FarmTooLarge {
413        /// Allocated space
414        allocated_space: u64,
415        /// Allocated space in sectors
416        allocated_sectors: u64,
417        /// Max supported allocated space
418        max_space: u64,
419        /// Max supported allocated space in sectors
420        max_sectors: u16,
421    },
422    /// Failed to create thread pool
423    #[error("Failed to create thread pool: {0}")]
424    FailedToCreateThreadPool(ThreadPoolBuildError),
425}
426
427/// Errors happening during scrubbing
428#[derive(Debug, Error)]
429pub enum SingleDiskFarmScrubError {
430    /// Farm is likely already in use, make sure no other farmer is using it
431    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
432    LikelyAlreadyInUse(io::Error),
433    /// Failed to determine file size
434    #[error("Failed to file size of {file}: {error}")]
435    FailedToDetermineFileSize {
436        /// Affected file
437        file: PathBuf,
438        /// Low-level error
439        error: io::Error,
440    },
441    /// Failed to read bytes from file
442    #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
443    FailedToReadBytes {
444        /// Affected file
445        file: PathBuf,
446        /// Number of bytes to read
447        size: u64,
448        /// Offset in the file
449        offset: u64,
450        /// Low-level error
451        error: io::Error,
452    },
453    /// Failed to write bytes from file
454    #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
455    FailedToWriteBytes {
456        /// Affected file
457        file: PathBuf,
458        /// Number of bytes to read
459        size: u64,
460        /// Offset in the file
461        offset: u64,
462        /// Low-level error
463        error: io::Error,
464    },
465    /// Farm info file does not exist
466    #[error("Farm info file does not exist at {file}")]
467    FarmInfoFileDoesNotExist {
468        /// Info file
469        file: PathBuf,
470    },
471    /// Farm info can't be opened
472    #[error("Farm info at {file} can't be opened: {error}")]
473    FarmInfoCantBeOpened {
474        /// Info file
475        file: PathBuf,
476        /// Low-level error
477        error: io::Error,
478    },
479    /// Identity file does not exist
480    #[error("Identity file does not exist at {file}")]
481    IdentityFileDoesNotExist {
482        /// Identity file
483        file: PathBuf,
484    },
485    /// Identity can't be opened
486    #[error("Identity at {file} can't be opened: {error}")]
487    IdentityCantBeOpened {
488        /// Identity file
489        file: PathBuf,
490        /// Low-level error
491        error: IdentityError,
492    },
493    /// Identity public key doesn't match public key in the disk farm info
494    #[error("Identity public key {identity} doesn't match public key in the disk farm info {info}")]
495    PublicKeyMismatch {
496        /// Identity public key
497        identity: PublicKey,
498        /// Disk farm info public key
499        info: PublicKey,
500    },
501    /// Metadata file does not exist
502    #[error("Metadata file does not exist at {file}")]
503    MetadataFileDoesNotExist {
504        /// Metadata file
505        file: PathBuf,
506    },
507    /// Metadata can't be opened
508    #[error("Metadata at {file} can't be opened: {error}")]
509    MetadataCantBeOpened {
510        /// Metadata file
511        file: PathBuf,
512        /// Low-level error
513        error: io::Error,
514    },
515    /// Metadata file too small
516    #[error(
517        "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
518        is {size}"
519    )]
520    MetadataFileTooSmall {
521        /// Metadata file
522        file: PathBuf,
523        /// Reserved size
524        reserved_size: u64,
525        /// File size
526        size: u64,
527    },
528    /// Failed to decode metadata header
529    #[error("Failed to decode metadata header: {0}")]
530    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
531    /// Unexpected metadata version
532    #[error("Unexpected metadata version {0}")]
533    UnexpectedMetadataVersion(u8),
534    /// Cache can't be opened
535    #[error("Cache at {file} can't be opened: {error}")]
536    CacheCantBeOpened {
537        /// Cache file
538        file: PathBuf,
539        /// Low-level error
540        error: io::Error,
541    },
542}
543
544/// Errors that happen in background tasks
545#[derive(Debug, Error)]
546pub enum BackgroundTaskError {
547    /// Plotting error
548    #[error(transparent)]
549    Plotting(#[from] PlottingError),
550    /// Farming error
551    #[error(transparent)]
552    Farming(#[from] FarmingError),
553    /// Reward signing
554    #[error(transparent)]
555    RewardSigning(#[from] anyhow::Error),
556    /// Background task panicked
557    #[error("Background task {task} panicked")]
558    BackgroundTaskPanicked {
559        /// Name of the task
560        task: String,
561    },
562}
563
564type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
565
566/// Scrub target
567#[derive(Debug, Copy, Clone)]
568pub enum ScrubTarget {
569    /// Scrub everything
570    All,
571    /// Scrub just metadata
572    Metadata,
573    /// Scrub metadata and corresponding plot
574    Plot,
575    /// Only scrub cache
576    Cache,
577}
578
579impl fmt::Display for ScrubTarget {
580    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
581        match self {
582            Self::All => f.write_str("all"),
583            Self::Metadata => f.write_str("metadata"),
584            Self::Plot => f.write_str("plot"),
585            Self::Cache => f.write_str("cache"),
586        }
587    }
588}
589
590impl FromStr for ScrubTarget {
591    type Err = String;
592
593    fn from_str(s: &str) -> Result<Self, Self::Err> {
594        match s {
595            "all" => Ok(Self::All),
596            "metadata" => Ok(Self::Metadata),
597            "plot" => Ok(Self::Plot),
598            "cache" => Ok(Self::Cache),
599            s => Err(format!("Can't parse {s} as `ScrubTarget`")),
600        }
601    }
602}
603
604impl ScrubTarget {
605    fn metadata(&self) -> bool {
606        match self {
607            Self::All | Self::Metadata | Self::Plot => true,
608            Self::Cache => false,
609        }
610    }
611
612    fn plot(&self) -> bool {
613        match self {
614            Self::All | Self::Plot => true,
615            Self::Metadata | Self::Cache => false,
616        }
617    }
618
619    fn cache(&self) -> bool {
620        match self {
621            Self::All | Self::Cache => true,
622            Self::Metadata | Self::Plot => false,
623        }
624    }
625}
626
627struct AllocatedSpaceDistribution {
628    piece_cache_file_size: u64,
629    piece_cache_capacity: u32,
630    plot_file_size: u64,
631    target_sector_count: u16,
632    metadata_file_size: u64,
633}
634
635impl AllocatedSpaceDistribution {
636    fn new(
637        allocated_space: u64,
638        sector_size: u64,
639        cache_percentage: u8,
640        sector_metadata_size: u64,
641    ) -> Result<Self, SingleDiskFarmError> {
642        let single_sector_overhead = sector_size + sector_metadata_size;
643        // Fixed space usage regardless of plot size
644        let fixed_space_usage = RESERVED_PLOT_METADATA
645            + RESERVED_FARM_INFO
646            + Identity::file_size() as u64
647            + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
648        // Calculate how many sectors can fit
649        let target_sector_count = {
650            let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
651                / 100
652                * (100 - u64::from(cache_percentage));
653            // Do the rounding to make sure we have exactly as much space as fits whole number of
654            // sectors, account for disk sector size just in case
655            (potentially_plottable_space - DISK_SECTOR_SIZE as u64) / single_sector_overhead
656        };
657
658        if target_sector_count == 0 {
659            let mut single_plot_with_cache_space =
660                single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
661            // Cache must not be empty, ensure it contains at least one element even if
662            // percentage-wise it will use more space
663            if single_plot_with_cache_space - single_sector_overhead
664                < DiskPieceCache::element_size() as u64
665            {
666                single_plot_with_cache_space =
667                    single_sector_overhead + DiskPieceCache::element_size() as u64;
668            }
669
670            return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
671                min_space: fixed_space_usage + single_plot_with_cache_space,
672                allocated_space,
673            });
674        }
675        let plot_file_size = target_sector_count * sector_size;
676        // Align plot file size for disk sector size
677        let plot_file_size =
678            plot_file_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
679
680        // Remaining space will be used for caching purposes
681        let piece_cache_capacity = if cache_percentage > 0 {
682            let cache_space = allocated_space
683                - fixed_space_usage
684                - plot_file_size
685                - (sector_metadata_size * target_sector_count);
686            (cache_space / u64::from(DiskPieceCache::element_size())) as u32
687        } else {
688            0
689        };
690        let target_sector_count = match SectorIndex::try_from(target_sector_count) {
691            Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
692                target_sector_count
693            }
694            _ => {
695                // We use this for both count and index, hence index must not reach actual `MAX`
696                // (consensus doesn't care about this, just farmer implementation detail)
697                let max_sectors = SectorIndex::MAX - 1;
698                return Err(SingleDiskFarmError::FarmTooLarge {
699                    allocated_space: target_sector_count * sector_size,
700                    allocated_sectors: target_sector_count,
701                    max_space: max_sectors as u64 * sector_size,
702                    max_sectors,
703                });
704            }
705        };
706
707        Ok(Self {
708            piece_cache_file_size: u64::from(piece_cache_capacity)
709                * u64::from(DiskPieceCache::element_size()),
710            piece_cache_capacity,
711            plot_file_size,
712            target_sector_count,
713            metadata_file_size: RESERVED_PLOT_METADATA
714                + sector_metadata_size * u64::from(target_sector_count),
715        })
716    }
717}
718
719type Handler<A> = Bag<HandlerFn<A>, A>;
720
721#[derive(Default, Debug)]
722struct Handlers {
723    sector_update: Handler<(SectorIndex, SectorUpdate)>,
724    farming_notification: Handler<FarmingNotification>,
725    solution: Handler<SolutionResponse>,
726}
727
728struct SingleDiskFarmInit {
729    identity: Identity,
730    single_disk_farm_info: SingleDiskFarmInfo,
731    single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
732    plot_file: Arc<DirectIoFile>,
733    metadata_file: DirectIoFile,
734    metadata_header: PlotMetadataHeader,
735    target_sector_count: u16,
736    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
737    piece_cache_capacity: u32,
738    plot_cache: DiskPlotCache,
739}
740
741/// Single disk farm abstraction is a container for everything necessary to plot/farm with a single
742/// disk.
743///
744/// Farm starts operating during creation and doesn't stop until dropped (or error happens).
745#[derive(Debug)]
746#[must_use = "Plot does not function properly unless run() method is called"]
747pub struct SingleDiskFarm {
748    farmer_protocol_info: FarmerProtocolInfo,
749    single_disk_farm_info: SingleDiskFarmInfo,
750    /// Metadata of all sectors plotted so far
751    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
752    pieces_in_sector: u16,
753    total_sectors_count: SectorIndex,
754    span: Span,
755    tasks: FuturesUnordered<BackgroundTask>,
756    handlers: Arc<Handlers>,
757    piece_cache: SingleDiskPieceCache,
758    plot_cache: DiskPlotCache,
759    piece_reader: DiskPieceReader,
760    /// Sender that will be used to signal to background threads that they should start
761    start_sender: Option<broadcast::Sender<()>>,
762    /// Sender that will be used to signal to background threads that they must stop
763    stop_sender: Option<broadcast::Sender<()>>,
764    _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
765}
766
767impl Drop for SingleDiskFarm {
768    #[inline]
769    fn drop(&mut self) {
770        self.piece_reader.close_all_readers();
771        // Make background threads that are waiting to do something exit immediately
772        self.start_sender.take();
773        // Notify background tasks that they must stop
774        self.stop_sender.take();
775    }
776}
777
778#[async_trait(?Send)]
779impl Farm for SingleDiskFarm {
780    fn id(&self) -> &FarmId {
781        self.id()
782    }
783
784    fn total_sectors_count(&self) -> SectorIndex {
785        self.total_sectors_count
786    }
787
788    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
789        Arc::new(self.plotted_sectors())
790    }
791
792    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
793        Arc::new(self.piece_reader())
794    }
795
796    fn on_sector_update(
797        &self,
798        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
799    ) -> Box<dyn farm::HandlerId> {
800        Box::new(self.on_sector_update(callback))
801    }
802
803    fn on_farming_notification(
804        &self,
805        callback: HandlerFn<FarmingNotification>,
806    ) -> Box<dyn farm::HandlerId> {
807        Box::new(self.on_farming_notification(callback))
808    }
809
810    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
811        Box::new(self.on_solution(callback))
812    }
813
814    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
815        Box::pin((*self).run())
816    }
817}
818
819impl SingleDiskFarm {
820    /// Name of the plot file
821    pub const PLOT_FILE: &'static str = "plot.bin";
822    /// Name of the metadata file
823    pub const METADATA_FILE: &'static str = "metadata.bin";
824    const SUPPORTED_PLOT_VERSION: u8 = 0;
825
826    /// Create new single disk farm instance
827    pub async fn new<NC, PosTable>(
828        options: SingleDiskFarmOptions<'_, NC>,
829        farm_index: usize,
830    ) -> Result<Self, SingleDiskFarmError>
831    where
832        NC: NodeClient + Clone,
833        PosTable: Table,
834    {
835        let span = Span::current();
836
837        let SingleDiskFarmOptions {
838            directory,
839            farmer_app_info,
840            allocated_space,
841            max_pieces_in_sector,
842            node_client,
843            reward_address,
844            plotter,
845            kzg,
846            erasure_coding,
847            cache_percentage,
848            farming_thread_pool_size,
849            plotting_delay,
850            global_mutex,
851            max_plotting_sectors_per_farm,
852            disable_farm_locking,
853            read_sector_record_chunks_mode,
854            registry,
855            create,
856        } = options;
857
858        let single_disk_farm_init_fut = task::spawn_blocking({
859            let directory = directory.clone();
860            let farmer_app_info = farmer_app_info.clone();
861            let span = span.clone();
862
863            move || {
864                let _span_guard = span.enter();
865                Self::init(
866                    &directory,
867                    &farmer_app_info,
868                    allocated_space,
869                    max_pieces_in_sector,
870                    cache_percentage,
871                    disable_farm_locking,
872                    create,
873                )
874            }
875        });
876
877        let single_disk_farm_init =
878            AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
879
880        let SingleDiskFarmInit {
881            identity,
882            single_disk_farm_info,
883            single_disk_farm_info_lock,
884            plot_file,
885            metadata_file,
886            metadata_header,
887            target_sector_count,
888            sectors_metadata,
889            piece_cache_capacity,
890            plot_cache,
891        } = single_disk_farm_init;
892
893        let piece_cache = {
894            // Convert farm ID into cache ID for single disk farm
895            let FarmId::Ulid(id) = *single_disk_farm_info.id();
896            let id = PieceCacheId::Ulid(id);
897
898            SingleDiskPieceCache::new(
899                id,
900                if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
901                    Some(task::block_in_place(|| {
902                        if let Some(registry) = registry {
903                            DiskPieceCache::open(
904                                &directory,
905                                piece_cache_capacity,
906                                Some(id),
907                                Some(*registry.lock()),
908                            )
909                        } else {
910                            DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
911                        }
912                    })?)
913                } else {
914                    None
915                },
916            )
917        };
918
919        let public_key = *single_disk_farm_info.public_key();
920        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
921        let sector_size = sector_size(pieces_in_sector);
922
923        let metrics = registry.map(|registry| {
924            Arc::new(SingleDiskFarmMetrics::new(
925                *registry.lock(),
926                single_disk_farm_info.id(),
927                target_sector_count,
928                sectors_metadata.read_blocking().len() as SectorIndex,
929            ))
930        });
931
932        let (error_sender, error_receiver) = oneshot::channel();
933        let error_sender = Arc::new(Mutex::new(Some(error_sender)));
934
935        let tasks = FuturesUnordered::<BackgroundTask>::new();
936
937        tasks.push(Box::pin(async move {
938            if let Ok(error) = error_receiver.await {
939                return Err(error);
940            }
941
942            Ok(())
943        }));
944
945        let handlers = Arc::<Handlers>::default();
946        let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
947        let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
948        let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
949        let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
950        // Some sectors may already be plotted, skip them
951        let sectors_indices_left_to_plot =
952            metadata_header.plotted_sector_count..target_sector_count;
953
954        let farming_thread_pool = ThreadPoolBuilder::new()
955            .thread_name(move |thread_index| format!("farming-{farm_index:02}.{thread_index:02}"))
956            .num_threads(farming_thread_pool_size)
957            .spawn_handler(tokio_rayon_spawn_handler())
958            .build()
959            .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
960        let farming_plot_fut = task::spawn_blocking(|| {
961            farming_thread_pool
962                .install(move || {
963                    RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
964                        DirectIoFile::open(path)
965                    })
966                })
967                .map(|farming_plot| (farming_plot, farming_thread_pool))
968        });
969
970        let (farming_plot, farming_thread_pool) =
971            AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
972
973        let plotting_join_handle = task::spawn_blocking({
974            let sectors_metadata = Arc::clone(&sectors_metadata);
975            let handlers = Arc::clone(&handlers);
976            let sectors_being_modified = Arc::clone(&sectors_being_modified);
977            let node_client = node_client.clone();
978            let plot_file = Arc::clone(&plot_file);
979            let error_sender = Arc::clone(&error_sender);
980            let span = span.clone();
981            let global_mutex = Arc::clone(&global_mutex);
982            let metrics = metrics.clone();
983
984            move || {
985                let _span_guard = span.enter();
986
987                let plotting_options = PlottingOptions {
988                    metadata_header,
989                    sectors_metadata: &sectors_metadata,
990                    sectors_being_modified: &sectors_being_modified,
991                    sectors_to_plot_receiver,
992                    sector_plotting_options: SectorPlottingOptions {
993                        public_key,
994                        node_client: &node_client,
995                        pieces_in_sector,
996                        sector_size,
997                        plot_file,
998                        metadata_file: Arc::new(metadata_file),
999                        handlers: &handlers,
1000                        global_mutex: &global_mutex,
1001                        plotter,
1002                        metrics,
1003                    },
1004                    max_plotting_sectors_per_farm,
1005                };
1006
1007                let plotting_fut = async {
1008                    if start_receiver.recv().await.is_err() {
1009                        // Dropped before starting
1010                        return Ok(());
1011                    }
1012
1013                    if let Some(plotting_delay) = plotting_delay
1014                        && plotting_delay.await.is_err()
1015                    {
1016                        // Dropped before resolving
1017                        return Ok(());
1018                    }
1019
1020                    plotting(plotting_options).await
1021                };
1022
1023                Handle::current().block_on(async {
1024                    select! {
1025                        plotting_result = plotting_fut.fuse() => {
1026                            if let Err(error) = plotting_result
1027                                && let Some(error_sender) = error_sender.lock().take()
1028                                && let Err(error) = error_sender.send(error.into())
1029                            {
1030                                error!(
1031                                    %error,
1032                                    "Plotting failed to send error to background task"
1033                                );
1034                            }
1035                        }
1036                        _ = stop_receiver.recv().fuse() => {
1037                            // Nothing, just exit
1038                        }
1039                    }
1040                });
1041            }
1042        });
1043        let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1044
1045        tasks.push(Box::pin(async move {
1046            // Panic will already be printed by now
1047            plotting_join_handle.await.map_err(|_error| {
1048                BackgroundTaskError::BackgroundTaskPanicked {
1049                    task: format!("plotting-{farm_index}"),
1050                }
1051            })
1052        }));
1053
1054        let plotting_scheduler_options = PlottingSchedulerOptions {
1055            public_key_hash: public_key.hash(),
1056            sectors_indices_left_to_plot,
1057            target_sector_count,
1058            last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1059            min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1060            node_client: node_client.clone(),
1061            handlers: Arc::clone(&handlers),
1062            sectors_metadata: Arc::clone(&sectors_metadata),
1063            sectors_to_plot_sender,
1064            new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1065            metrics: metrics.clone(),
1066        };
1067        tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1068
1069        let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1070
1071        tasks.push(Box::pin({
1072            let node_client = node_client.clone();
1073            let metrics = metrics.clone();
1074
1075            async move {
1076                slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1077                    .await
1078                    .map_err(BackgroundTaskError::Farming)
1079            }
1080        }));
1081
1082        let farming_join_handle = task::spawn_blocking({
1083            let erasure_coding = erasure_coding.clone();
1084            let handlers = Arc::clone(&handlers);
1085            let sectors_being_modified = Arc::clone(&sectors_being_modified);
1086            let sectors_metadata = Arc::clone(&sectors_metadata);
1087            let mut start_receiver = start_sender.subscribe();
1088            let mut stop_receiver = stop_sender.subscribe();
1089            let node_client = node_client.clone();
1090            let span = span.clone();
1091            let global_mutex = Arc::clone(&global_mutex);
1092
1093            move || {
1094                let _span_guard = span.enter();
1095
1096                let farming_fut = async move {
1097                    if start_receiver.recv().await.is_err() {
1098                        // Dropped before starting
1099                        return Ok(());
1100                    }
1101
1102                    let plot_audit = PlotAudit::new(&farming_plot);
1103
1104                    let farming_options = FarmingOptions {
1105                        public_key,
1106                        reward_address,
1107                        node_client,
1108                        plot_audit,
1109                        sectors_metadata,
1110                        kzg,
1111                        erasure_coding,
1112                        handlers,
1113                        sectors_being_modified,
1114                        slot_info_notifications: slot_info_forwarder_receiver,
1115                        thread_pool: farming_thread_pool,
1116                        read_sector_record_chunks_mode,
1117                        global_mutex,
1118                        metrics,
1119                    };
1120                    farming::<PosTable, _, _>(farming_options).await
1121                };
1122
1123                Handle::current().block_on(async {
1124                    select! {
1125                        farming_result = farming_fut.fuse() => {
1126                            if let Err(error) = farming_result
1127                                && let Some(error_sender) = error_sender.lock().take()
1128                                && let Err(error) = error_sender.send(error.into())
1129                            {
1130                                error!(
1131                                    %error,
1132                                    "Farming failed to send error to background task",
1133                                );
1134                            }
1135                        }
1136                        _ = stop_receiver.recv().fuse() => {
1137                            // Nothing, just exit
1138                        }
1139                    }
1140                });
1141            }
1142        });
1143        let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1144
1145        tasks.push(Box::pin(async move {
1146            // Panic will already be printed by now
1147            farming_join_handle.await.map_err(|_error| {
1148                BackgroundTaskError::BackgroundTaskPanicked {
1149                    task: format!("farming-{farm_index}"),
1150                }
1151            })
1152        }));
1153
1154        let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1155            public_key,
1156            pieces_in_sector,
1157            plot_file,
1158            Arc::clone(&sectors_metadata),
1159            erasure_coding,
1160            sectors_being_modified,
1161            read_sector_record_chunks_mode,
1162            global_mutex,
1163        );
1164
1165        let reading_join_handle = task::spawn_blocking({
1166            let mut stop_receiver = stop_sender.subscribe();
1167            let reading_fut = reading_fut.instrument(span.clone());
1168
1169            move || {
1170                Handle::current().block_on(async {
1171                    select! {
1172                        _ = reading_fut.fuse() => {
1173                            // Nothing, just exit
1174                        }
1175                        _ = stop_receiver.recv().fuse() => {
1176                            // Nothing, just exit
1177                        }
1178                    }
1179                });
1180            }
1181        });
1182
1183        let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1184
1185        tasks.push(Box::pin(async move {
1186            // Panic will already be printed by now
1187            reading_join_handle.await.map_err(|_error| {
1188                BackgroundTaskError::BackgroundTaskPanicked {
1189                    task: format!("reading-{farm_index}"),
1190                }
1191            })
1192        }));
1193
1194        tasks.push(Box::pin(async move {
1195            match reward_signing(node_client, identity).await {
1196                Ok(reward_signing_fut) => {
1197                    reward_signing_fut.await;
1198                }
1199                Err(error) => {
1200                    return Err(BackgroundTaskError::RewardSigning(anyhow::anyhow!(
1201                        "Failed to subscribe to reward signing notifications: {error}"
1202                    )));
1203                }
1204            }
1205
1206            Ok(())
1207        }));
1208
1209        let farm = Self {
1210            farmer_protocol_info: farmer_app_info.protocol_info,
1211            single_disk_farm_info,
1212            sectors_metadata,
1213            pieces_in_sector,
1214            total_sectors_count: target_sector_count,
1215            span,
1216            tasks,
1217            handlers,
1218            piece_cache,
1219            plot_cache,
1220            piece_reader,
1221            start_sender: Some(start_sender),
1222            stop_sender: Some(stop_sender),
1223            _single_disk_farm_info_lock: single_disk_farm_info_lock,
1224        };
1225        Ok(farm)
1226    }
1227
1228    fn init(
1229        directory: &PathBuf,
1230        farmer_app_info: &FarmerAppInfo,
1231        allocated_space: u64,
1232        max_pieces_in_sector: u16,
1233        cache_percentage: u8,
1234        disable_farm_locking: bool,
1235        create: bool,
1236    ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1237        fs::create_dir_all(directory)?;
1238
1239        let identity = if create {
1240            Identity::open_or_create(directory)?
1241        } else {
1242            Identity::open(directory)?.ok_or_else(|| {
1243                IdentityError::Io(io::Error::new(
1244                    io::ErrorKind::NotFound,
1245                    "Farm does not exist and creation was explicitly disabled",
1246                ))
1247            })?
1248        };
1249        let public_key = identity.public_key().to_bytes().into();
1250
1251        let (single_disk_farm_info, single_disk_farm_info_lock) =
1252            match SingleDiskFarmInfo::load_from(directory)? {
1253                Some(mut single_disk_farm_info) => {
1254                    if &farmer_app_info.genesis_hash != single_disk_farm_info.genesis_hash() {
1255                        return Err(SingleDiskFarmError::WrongChain {
1256                            id: *single_disk_farm_info.id(),
1257                            correct_chain: hex::encode(single_disk_farm_info.genesis_hash()),
1258                            wrong_chain: hex::encode(farmer_app_info.genesis_hash),
1259                        });
1260                    }
1261
1262                    if &public_key != single_disk_farm_info.public_key() {
1263                        return Err(SingleDiskFarmError::IdentityMismatch {
1264                            id: *single_disk_farm_info.id(),
1265                            correct_public_key: *single_disk_farm_info.public_key(),
1266                            wrong_public_key: public_key,
1267                        });
1268                    }
1269
1270                    let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1271
1272                    if max_pieces_in_sector < pieces_in_sector {
1273                        return Err(SingleDiskFarmError::InvalidPiecesInSector {
1274                            id: *single_disk_farm_info.id(),
1275                            max_supported: max_pieces_in_sector,
1276                            initialized_with: pieces_in_sector,
1277                        });
1278                    }
1279
1280                    if max_pieces_in_sector > pieces_in_sector {
1281                        info!(
1282                            pieces_in_sector,
1283                            max_pieces_in_sector,
1284                            "Farm initialized with smaller number of pieces in sector, farm needs \
1285                            to be re-created for increase"
1286                        );
1287                    }
1288
1289                    let mut single_disk_farm_info_lock = None;
1290
1291                    if allocated_space != single_disk_farm_info.allocated_space() {
1292                        info!(
1293                            old_space = %bytesize::to_string(single_disk_farm_info.allocated_space(), true),
1294                            new_space = %bytesize::to_string(allocated_space, true),
1295                            "Farm size has changed"
1296                        );
1297
1298                        let new_allocated_space = allocated_space;
1299                        match &mut single_disk_farm_info {
1300                            SingleDiskFarmInfo::V0 {
1301                                allocated_space, ..
1302                            } => {
1303                                *allocated_space = new_allocated_space;
1304                            }
1305                        }
1306
1307                        single_disk_farm_info_lock =
1308                            single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1309                    } else if !disable_farm_locking {
1310                        single_disk_farm_info_lock = Some(
1311                            SingleDiskFarmInfo::try_lock(directory)
1312                                .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1313                        );
1314                    }
1315
1316                    (single_disk_farm_info, single_disk_farm_info_lock)
1317                }
1318                None => {
1319                    let single_disk_farm_info = SingleDiskFarmInfo::new(
1320                        FarmId::new(),
1321                        farmer_app_info.genesis_hash,
1322                        public_key,
1323                        max_pieces_in_sector,
1324                        allocated_space,
1325                    );
1326
1327                    let single_disk_farm_info_lock =
1328                        single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1329
1330                    (single_disk_farm_info, single_disk_farm_info_lock)
1331                }
1332            };
1333
1334        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1335        let sector_size = sector_size(pieces_in_sector) as u64;
1336        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1337        let allocated_space_distribution = AllocatedSpaceDistribution::new(
1338            allocated_space,
1339            sector_size,
1340            cache_percentage,
1341            sector_metadata_size as u64,
1342        )?;
1343        let target_sector_count = allocated_space_distribution.target_sector_count;
1344
1345        let metadata_file_path = directory.join(Self::METADATA_FILE);
1346        let metadata_file = DirectIoFile::open(&metadata_file_path)?;
1347
1348        let metadata_size = metadata_file.size()?;
1349        let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1350        // Align plot file size for disk sector size
1351        let expected_metadata_size =
1352            expected_metadata_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
1353        let metadata_header = if metadata_size == 0 {
1354            let metadata_header = PlotMetadataHeader {
1355                version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1356                plotted_sector_count: 0,
1357            };
1358
1359            metadata_file
1360                .preallocate(expected_metadata_size)
1361                .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1362            metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1363
1364            metadata_header
1365        } else {
1366            if metadata_size != expected_metadata_size {
1367                // Allocating the whole file (`set_len` below can create a sparse file, which will
1368                // cause writes to fail later)
1369                metadata_file
1370                    .preallocate(expected_metadata_size)
1371                    .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1372                // Truncating file (if necessary)
1373                metadata_file.set_len(expected_metadata_size)?;
1374            }
1375
1376            let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1377            metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1378
1379            let mut metadata_header =
1380                PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1381                    .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1382
1383            if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1384                return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1385                    metadata_header.version,
1386                ));
1387            }
1388
1389            if metadata_header.plotted_sector_count > target_sector_count {
1390                metadata_header.plotted_sector_count = target_sector_count;
1391                metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1392            }
1393
1394            metadata_header
1395        };
1396
1397        let sectors_metadata = {
1398            let mut sectors_metadata =
1399                Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1400
1401            let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1402            for sector_index in 0..metadata_header.plotted_sector_count {
1403                let sector_offset =
1404                    RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1405                metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1406
1407                let sector_metadata =
1408                    match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1409                        Ok(sector_metadata) => sector_metadata,
1410                        Err(error) => {
1411                            warn!(
1412                                path = %metadata_file_path.display(),
1413                                %error,
1414                                %sector_index,
1415                                "Failed to decode sector metadata, replacing with dummy expired \
1416                                sector metadata"
1417                            );
1418
1419                            let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1420                                sector_index,
1421                                pieces_in_sector,
1422                                s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1423                                history_size: HistorySize::from(SegmentIndex::ZERO),
1424                            });
1425                            metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1426
1427                            dummy_sector
1428                        }
1429                    };
1430                sectors_metadata.push(sector_metadata);
1431            }
1432
1433            Arc::new(AsyncRwLock::new(sectors_metadata))
1434        };
1435
1436        let plot_file = DirectIoFile::open(directory.join(Self::PLOT_FILE))?;
1437
1438        if plot_file.size()? != allocated_space_distribution.plot_file_size {
1439            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
1440            // writes to fail later)
1441            plot_file
1442                .preallocate(allocated_space_distribution.plot_file_size)
1443                .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1444            // Truncating file (if necessary)
1445            plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1446        }
1447
1448        let plot_file = Arc::new(plot_file);
1449
1450        let plot_cache = DiskPlotCache::new(
1451            &plot_file,
1452            &sectors_metadata,
1453            target_sector_count,
1454            sector_size,
1455        );
1456
1457        Ok(SingleDiskFarmInit {
1458            identity,
1459            single_disk_farm_info,
1460            single_disk_farm_info_lock,
1461            plot_file,
1462            metadata_file,
1463            metadata_header,
1464            target_sector_count,
1465            sectors_metadata,
1466            piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1467            plot_cache,
1468        })
1469    }
1470
1471    /// Collect summary of single disk farm for presentational purposes
1472    pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1473        let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1474            Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1475            Ok(None) => {
1476                return SingleDiskFarmSummary::NotFound { directory };
1477            }
1478            Err(error) => {
1479                return SingleDiskFarmSummary::Error { directory, error };
1480            }
1481        };
1482
1483        SingleDiskFarmSummary::Found {
1484            info: single_disk_farm_info,
1485            directory,
1486        }
1487    }
1488
1489    /// Effective on-disk allocation of the files related to the farm (takes some buffer space
1490    /// into consideration).
1491    ///
1492    /// This is a helpful number in case some files were not allocated properly or were removed and
1493    /// do not correspond to allocated space in the farm info accurately.
1494    pub fn effective_disk_usage(
1495        directory: &Path,
1496        cache_percentage: u8,
1497    ) -> Result<u64, SingleDiskFarmError> {
1498        let mut effective_disk_usage;
1499        match SingleDiskFarmInfo::load_from(directory)? {
1500            Some(single_disk_farm_info) => {
1501                let allocated_space_distribution = AllocatedSpaceDistribution::new(
1502                    single_disk_farm_info.allocated_space(),
1503                    sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1504                    cache_percentage,
1505                    SectorMetadataChecksummed::encoded_size() as u64,
1506                )?;
1507
1508                effective_disk_usage = single_disk_farm_info.allocated_space();
1509                effective_disk_usage -= Identity::file_size() as u64;
1510                effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1511                effective_disk_usage -= allocated_space_distribution.plot_file_size;
1512                effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1513            }
1514            None => {
1515                // No farm info, try to collect actual file sizes is any
1516                effective_disk_usage = 0;
1517            }
1518        };
1519
1520        if Identity::open(directory)?.is_some() {
1521            effective_disk_usage += Identity::file_size() as u64;
1522        }
1523
1524        match OpenOptions::new()
1525            .read(true)
1526            .open(directory.join(Self::METADATA_FILE))
1527        {
1528            Ok(metadata_file) => {
1529                effective_disk_usage += metadata_file.size()?;
1530            }
1531            Err(error) => {
1532                if error.kind() == io::ErrorKind::NotFound {
1533                    // File is not stored on disk
1534                } else {
1535                    return Err(error.into());
1536                }
1537            }
1538        };
1539
1540        match OpenOptions::new()
1541            .read(true)
1542            .open(directory.join(Self::PLOT_FILE))
1543        {
1544            Ok(plot_file) => {
1545                effective_disk_usage += plot_file.size()?;
1546            }
1547            Err(error) => {
1548                if error.kind() == io::ErrorKind::NotFound {
1549                    // File is not stored on disk
1550                } else {
1551                    return Err(error.into());
1552                }
1553            }
1554        };
1555
1556        match OpenOptions::new()
1557            .read(true)
1558            .open(directory.join(DiskPieceCache::FILE_NAME))
1559        {
1560            Ok(piece_cache) => {
1561                effective_disk_usage += piece_cache.size()?;
1562            }
1563            Err(error) => {
1564                if error.kind() == io::ErrorKind::NotFound {
1565                    // File is not stored on disk
1566                } else {
1567                    return Err(error.into());
1568                }
1569            }
1570        };
1571
1572        Ok(effective_disk_usage)
1573    }
1574
1575    /// Read all sectors metadata
1576    pub fn read_all_sectors_metadata(
1577        directory: &Path,
1578    ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1579        let metadata_file = DirectIoFile::open(directory.join(Self::METADATA_FILE))?;
1580
1581        let metadata_size = metadata_file.size()?;
1582        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1583
1584        let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1585        metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1586
1587        let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1588            .map_err(|error| {
1589                io::Error::other(format!("Failed to decode metadata header: {error}"))
1590            })?;
1591
1592        if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1593            return Err(io::Error::other(format!(
1594                "Unsupported metadata version {}",
1595                metadata_header.version
1596            )));
1597        }
1598
1599        let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1600            ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1601        );
1602
1603        let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1604        for sector_index in 0..metadata_header.plotted_sector_count {
1605            metadata_file.read_exact_at(
1606                &mut sector_metadata_bytes,
1607                RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1608            )?;
1609            sectors_metadata.push(
1610                SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1611                    |error| io::Error::other(format!("Failed to decode sector metadata: {error}")),
1612                )?,
1613            );
1614        }
1615
1616        Ok(sectors_metadata)
1617    }
1618
1619    /// ID of this farm
1620    pub fn id(&self) -> &FarmId {
1621        self.single_disk_farm_info.id()
1622    }
1623
1624    /// Info of this farm
1625    pub fn info(&self) -> &SingleDiskFarmInfo {
1626        &self.single_disk_farm_info
1627    }
1628
1629    /// Number of sectors in this farm
1630    pub fn total_sectors_count(&self) -> SectorIndex {
1631        self.total_sectors_count
1632    }
1633
1634    /// Read information about sectors plotted so far
1635    pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1636        SingleDiskPlottedSectors {
1637            public_key: *self.single_disk_farm_info.public_key(),
1638            pieces_in_sector: self.pieces_in_sector,
1639            farmer_protocol_info: self.farmer_protocol_info,
1640            sectors_metadata: Arc::clone(&self.sectors_metadata),
1641        }
1642    }
1643
1644    /// Get piece cache instance
1645    pub fn piece_cache(&self) -> SingleDiskPieceCache {
1646        self.piece_cache.clone()
1647    }
1648
1649    /// Get plot cache instance
1650    pub fn plot_cache(&self) -> DiskPlotCache {
1651        self.plot_cache.clone()
1652    }
1653
1654    /// Get piece reader to read plotted pieces later
1655    pub fn piece_reader(&self) -> DiskPieceReader {
1656        self.piece_reader.clone()
1657    }
1658
1659    /// Subscribe to sector updates
1660    pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1661        self.handlers.sector_update.add(callback)
1662    }
1663
1664    /// Subscribe to farming notifications
1665    pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1666        self.handlers.farming_notification.add(callback)
1667    }
1668
1669    /// Subscribe to new solution notification
1670    pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1671        self.handlers.solution.add(callback)
1672    }
1673
1674    /// Run and wait for background threads to exit or return an error
1675    pub async fn run(mut self) -> anyhow::Result<()> {
1676        if let Some(start_sender) = self.start_sender.take() {
1677            // Do not care if anyone is listening on the other side
1678            let _ = start_sender.send(());
1679        }
1680
1681        while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1682            result?;
1683        }
1684
1685        Ok(())
1686    }
1687
1688    /// Wipe everything that belongs to this single disk farm
1689    pub fn wipe(directory: &Path) -> io::Result<()> {
1690        let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1691        match SingleDiskFarmInfo::load_from(directory) {
1692            Ok(Some(single_disk_farm_info)) => {
1693                info!("Found single disk farm {}", single_disk_farm_info.id());
1694            }
1695            Ok(None) => {
1696                return Err(io::Error::new(
1697                    io::ErrorKind::NotFound,
1698                    format!(
1699                        "Single disk farm info not found at {}",
1700                        single_disk_info_info_path.display()
1701                    ),
1702                ));
1703            }
1704            Err(error) => {
1705                warn!("Found unknown single disk farm: {}", error);
1706            }
1707        }
1708
1709        {
1710            let plot = directory.join(Self::PLOT_FILE);
1711            if plot.exists() {
1712                info!("Deleting plot file at {}", plot.display());
1713                fs::remove_file(plot)?;
1714            }
1715        }
1716        {
1717            let metadata = directory.join(Self::METADATA_FILE);
1718            if metadata.exists() {
1719                info!("Deleting metadata file at {}", metadata.display());
1720                fs::remove_file(metadata)?;
1721            }
1722        }
1723        // TODO: Identity should be able to wipe itself instead of assuming a specific file name
1724        //  here
1725        {
1726            let identity = directory.join("identity.bin");
1727            if identity.exists() {
1728                info!("Deleting identity file at {}", identity.display());
1729                fs::remove_file(identity)?;
1730            }
1731        }
1732
1733        DiskPieceCache::wipe(directory)?;
1734
1735        info!(
1736            "Deleting info file at {}",
1737            single_disk_info_info_path.display()
1738        );
1739        fs::remove_file(single_disk_info_info_path)
1740    }
1741
1742    /// Check the farm for corruption and repair errors (caused by disk errors or something else),
1743    /// returns an error when irrecoverable errors occur.
1744    pub fn scrub(
1745        directory: &Path,
1746        disable_farm_locking: bool,
1747        target: ScrubTarget,
1748        dry_run: bool,
1749    ) -> Result<(), SingleDiskFarmScrubError> {
1750        let span = Span::current();
1751
1752        if dry_run {
1753            info!("Dry run is used, no changes will be written to disk");
1754        }
1755
1756        if target.metadata() || target.plot() {
1757            let info = {
1758                let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1759                info!(path = %file.display(), "Checking info file");
1760
1761                match SingleDiskFarmInfo::load_from(directory) {
1762                    Ok(Some(info)) => info,
1763                    Ok(None) => {
1764                        return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1765                    }
1766                    Err(error) => {
1767                        return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1768                    }
1769                }
1770            };
1771
1772            let _single_disk_farm_info_lock = if disable_farm_locking {
1773                None
1774            } else {
1775                Some(
1776                    SingleDiskFarmInfo::try_lock(directory)
1777                        .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1778                )
1779            };
1780
1781            let identity = {
1782                let file = directory.join(Identity::FILE_NAME);
1783                info!(path = %file.display(), "Checking identity file");
1784
1785                match Identity::open(directory) {
1786                    Ok(Some(identity)) => identity,
1787                    Ok(None) => {
1788                        return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1789                    }
1790                    Err(error) => {
1791                        return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1792                    }
1793                }
1794            };
1795
1796            if PublicKey::from(identity.public.to_bytes()) != *info.public_key() {
1797                return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1798                    identity: PublicKey::from(identity.public.to_bytes()),
1799                    info: *info.public_key(),
1800                });
1801            }
1802
1803            let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1804
1805            let metadata_file_path = directory.join(Self::METADATA_FILE);
1806            let (metadata_file, mut metadata_header) = {
1807                info!(path = %metadata_file_path.display(), "Checking metadata file");
1808
1809                let metadata_file = match OpenOptions::new()
1810                    .read(true)
1811                    .write(!dry_run)
1812                    .open(&metadata_file_path)
1813                {
1814                    Ok(metadata_file) => metadata_file,
1815                    Err(error) => {
1816                        return Err(if error.kind() == io::ErrorKind::NotFound {
1817                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1818                                file: metadata_file_path,
1819                            }
1820                        } else {
1821                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1822                                file: metadata_file_path,
1823                                error,
1824                            }
1825                        });
1826                    }
1827                };
1828
1829                // Error doesn't matter here
1830                let _ = metadata_file.advise_sequential_access();
1831
1832                let metadata_size = match metadata_file.size() {
1833                    Ok(metadata_size) => metadata_size,
1834                    Err(error) => {
1835                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1836                            file: metadata_file_path,
1837                            error,
1838                        });
1839                    }
1840                };
1841
1842                if metadata_size < RESERVED_PLOT_METADATA {
1843                    return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1844                        file: metadata_file_path,
1845                        reserved_size: RESERVED_PLOT_METADATA,
1846                        size: metadata_size,
1847                    });
1848                }
1849
1850                let mut metadata_header = {
1851                    let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1852
1853                    if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1854                        return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1855                            file: metadata_file_path,
1856                            size: RESERVED_PLOT_METADATA,
1857                            offset: 0,
1858                            error,
1859                        });
1860                    }
1861
1862                    PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1863                        .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1864                };
1865
1866                if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1867                    return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1868                        metadata_header.version,
1869                    ));
1870                }
1871
1872                let plotted_sector_count = metadata_header.plotted_sector_count;
1873
1874                let expected_metadata_size = RESERVED_PLOT_METADATA
1875                    + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1876
1877                if metadata_size < expected_metadata_size {
1878                    warn!(
1879                        %metadata_size,
1880                        %expected_metadata_size,
1881                        "Metadata file size is smaller than expected, shrinking number of plotted \
1882                        sectors to correct value"
1883                    );
1884
1885                    metadata_header.plotted_sector_count =
1886                        ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1887                            as SectorIndex;
1888                    let metadata_header_bytes = metadata_header.encode();
1889
1890                    if !dry_run
1891                        && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1892                    {
1893                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1894                            file: metadata_file_path,
1895                            size: metadata_header_bytes.len() as u64,
1896                            offset: 0,
1897                            error,
1898                        });
1899                    }
1900                }
1901
1902                (metadata_file, metadata_header)
1903            };
1904
1905            let pieces_in_sector = info.pieces_in_sector();
1906            let sector_size = sector_size(pieces_in_sector) as u64;
1907
1908            let plot_file_path = directory.join(Self::PLOT_FILE);
1909            let plot_file = {
1910                let plot_file_path = directory.join(Self::PLOT_FILE);
1911                info!(path = %plot_file_path.display(), "Checking plot file");
1912
1913                let plot_file = match OpenOptions::new()
1914                    .read(true)
1915                    .write(!dry_run)
1916                    .open(&plot_file_path)
1917                {
1918                    Ok(plot_file) => plot_file,
1919                    Err(error) => {
1920                        return Err(if error.kind() == io::ErrorKind::NotFound {
1921                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1922                                file: plot_file_path,
1923                            }
1924                        } else {
1925                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1926                                file: plot_file_path,
1927                                error,
1928                            }
1929                        });
1930                    }
1931                };
1932
1933                // Error doesn't matter here
1934                let _ = plot_file.advise_sequential_access();
1935
1936                let plot_size = match plot_file.size() {
1937                    Ok(metadata_size) => metadata_size,
1938                    Err(error) => {
1939                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1940                            file: plot_file_path,
1941                            error,
1942                        });
1943                    }
1944                };
1945
1946                let min_expected_plot_size =
1947                    u64::from(metadata_header.plotted_sector_count) * sector_size;
1948                if plot_size < min_expected_plot_size {
1949                    warn!(
1950                        %plot_size,
1951                        %min_expected_plot_size,
1952                        "Plot file size is smaller than expected, shrinking number of plotted \
1953                        sectors to correct value"
1954                    );
1955
1956                    metadata_header.plotted_sector_count = (plot_size / sector_size) as SectorIndex;
1957                    let metadata_header_bytes = metadata_header.encode();
1958
1959                    if !dry_run
1960                        && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1961                    {
1962                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1963                            file: plot_file_path,
1964                            size: metadata_header_bytes.len() as u64,
1965                            offset: 0,
1966                            error,
1967                        });
1968                    }
1969                }
1970
1971                plot_file
1972            };
1973
1974            let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1975
1976            info!("Checking sectors and corresponding metadata");
1977            (0..metadata_header.plotted_sector_count)
1978                .into_par_iter()
1979                .map_init(
1980                    || vec![0u8; Record::SIZE],
1981                    |scratch_buffer, sector_index| {
1982                        let _span_guard = span.enter();
1983
1984                        let offset = RESERVED_PLOT_METADATA
1985                            + u64::from(sector_index) * sector_metadata_size as u64;
1986                        if let Err(error) = metadata_file
1987                            .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
1988                        {
1989                            warn!(
1990                                path = %metadata_file_path.display(),
1991                                %error,
1992                                %offset,
1993                                size = %sector_metadata_size,
1994                                %sector_index,
1995                                "Failed to read sector metadata, replacing with dummy expired \
1996                                sector metadata"
1997                            );
1998
1999                            if !dry_run {
2000                                write_dummy_sector_metadata(
2001                                    &metadata_file,
2002                                    &metadata_file_path,
2003                                    sector_index,
2004                                    pieces_in_sector,
2005                                )?;
2006                            }
2007                            return Ok(());
2008                        }
2009
2010                        let sector_metadata = match SectorMetadataChecksummed::decode(
2011                            &mut &scratch_buffer[..sector_metadata_size],
2012                        ) {
2013                            Ok(sector_metadata) => sector_metadata,
2014                            Err(error) => {
2015                                warn!(
2016                                    path = %metadata_file_path.display(),
2017                                    %error,
2018                                    %sector_index,
2019                                    "Failed to decode sector metadata, replacing with dummy \
2020                                    expired sector metadata"
2021                                );
2022
2023                                if !dry_run {
2024                                    write_dummy_sector_metadata(
2025                                        &metadata_file,
2026                                        &metadata_file_path,
2027                                        sector_index,
2028                                        pieces_in_sector,
2029                                    )?;
2030                                }
2031                                return Ok(());
2032                            }
2033                        };
2034
2035                        if sector_metadata.sector_index != sector_index {
2036                            warn!(
2037                                path = %metadata_file_path.display(),
2038                                %sector_index,
2039                                found_sector_index = sector_metadata.sector_index,
2040                                "Sector index mismatch, replacing with dummy expired sector \
2041                                metadata"
2042                            );
2043
2044                            if !dry_run {
2045                                write_dummy_sector_metadata(
2046                                    &metadata_file,
2047                                    &metadata_file_path,
2048                                    sector_index,
2049                                    pieces_in_sector,
2050                                )?;
2051                            }
2052                            return Ok(());
2053                        }
2054
2055                        if sector_metadata.pieces_in_sector != pieces_in_sector {
2056                            warn!(
2057                                path = %metadata_file_path.display(),
2058                                %sector_index,
2059                                %pieces_in_sector,
2060                                found_pieces_in_sector = sector_metadata.pieces_in_sector,
2061                                "Pieces in sector mismatch, replacing with dummy expired sector \
2062                                metadata"
2063                            );
2064
2065                            if !dry_run {
2066                                write_dummy_sector_metadata(
2067                                    &metadata_file,
2068                                    &metadata_file_path,
2069                                    sector_index,
2070                                    pieces_in_sector,
2071                                )?;
2072                            }
2073                            return Ok(());
2074                        }
2075
2076                        if target.plot() {
2077                            let mut hasher = blake3::Hasher::new();
2078                            // Read sector bytes and compute checksum
2079                            for offset_in_sector in
2080                                sector_bytes_range.clone().step_by(scratch_buffer.len())
2081                            {
2082                                let offset =
2083                                    u64::from(sector_index) * sector_size + offset_in_sector as u64;
2084                                let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2085                                    .min(sector_bytes_range.end)
2086                                    - offset_in_sector;
2087
2088                                let bytes = &mut scratch_buffer[..bytes_to_read];
2089
2090                                if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2091                                    warn!(
2092                                        path = %plot_file_path.display(),
2093                                        %error,
2094                                        %sector_index,
2095                                        %offset,
2096                                        size = %bytes.len() as u64,
2097                                        "Failed to read sector bytes"
2098                                    );
2099
2100                                    continue;
2101                                }
2102
2103                                hasher.update(bytes);
2104                            }
2105
2106                            let actual_checksum = *hasher.finalize().as_bytes();
2107                            let mut expected_checksum = [0; Blake3Hash::SIZE];
2108                            {
2109                                let offset = u64::from(sector_index) * sector_size
2110                                    + sector_bytes_range.end as u64;
2111                                if let Err(error) =
2112                                    plot_file.read_exact_at(&mut expected_checksum, offset)
2113                                {
2114                                    warn!(
2115                                        path = %plot_file_path.display(),
2116                                        %error,
2117                                        %sector_index,
2118                                        %offset,
2119                                        size = %expected_checksum.len() as u64,
2120                                        "Failed to read sector checksum bytes"
2121                                    );
2122                                }
2123                            }
2124
2125                            // Verify checksum
2126                            if actual_checksum != expected_checksum {
2127                                warn!(
2128                                    path = %plot_file_path.display(),
2129                                    %sector_index,
2130                                    actual_checksum = %hex::encode(actual_checksum),
2131                                    expected_checksum = %hex::encode(expected_checksum),
2132                                    "Plotted sector checksum mismatch, replacing with dummy \
2133                                    expired sector"
2134                                );
2135
2136                                if !dry_run {
2137                                    write_dummy_sector_metadata(
2138                                        &metadata_file,
2139                                        &metadata_file_path,
2140                                        sector_index,
2141                                        pieces_in_sector,
2142                                    )?;
2143                                }
2144
2145                                scratch_buffer.fill(0);
2146
2147                                hasher.reset();
2148                                // Fill sector with zeroes and compute checksum
2149                                for offset_in_sector in
2150                                    sector_bytes_range.clone().step_by(scratch_buffer.len())
2151                                {
2152                                    let offset = u64::from(sector_index) * sector_size
2153                                        + offset_in_sector as u64;
2154                                    let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2155                                        .min(sector_bytes_range.end)
2156                                        - offset_in_sector;
2157                                    let bytes = &mut scratch_buffer[..bytes_to_write];
2158
2159                                    if !dry_run
2160                                        && let Err(error) = plot_file.write_all_at(bytes, offset)
2161                                    {
2162                                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2163                                            file: plot_file_path.clone(),
2164                                            size: scratch_buffer.len() as u64,
2165                                            offset,
2166                                            error,
2167                                        });
2168                                    }
2169
2170                                    hasher.update(bytes);
2171                                }
2172                                // Write checksum
2173                                {
2174                                    let checksum = *hasher.finalize().as_bytes();
2175                                    let offset = u64::from(sector_index) * sector_size
2176                                        + sector_bytes_range.end as u64;
2177                                    if !dry_run
2178                                        && let Err(error) =
2179                                            plot_file.write_all_at(&checksum, offset)
2180                                    {
2181                                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2182                                            file: plot_file_path.clone(),
2183                                            size: checksum.len() as u64,
2184                                            offset,
2185                                            error,
2186                                        });
2187                                    }
2188                                }
2189
2190                                return Ok(());
2191                            }
2192                        }
2193
2194                        trace!(%sector_index, "Sector is in good shape");
2195
2196                        Ok(())
2197                    },
2198                )
2199                .try_for_each({
2200                    let span = &span;
2201                    let checked_sectors = AtomicUsize::new(0);
2202
2203                    move |result| {
2204                        let _span_guard = span.enter();
2205
2206                        let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2207                        if checked_sectors > 1 && checked_sectors % 10 == 0 {
2208                            info!(
2209                                "Checked {}/{} sectors",
2210                                checked_sectors, metadata_header.plotted_sector_count
2211                            );
2212                        }
2213
2214                        result
2215                    }
2216                })?;
2217        }
2218
2219        if target.cache() {
2220            Self::scrub_cache(directory, dry_run)?;
2221        }
2222
2223        info!("Farm check completed");
2224
2225        Ok(())
2226    }
2227
2228    fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2229        let span = Span::current();
2230
2231        let file = directory.join(DiskPieceCache::FILE_NAME);
2232        info!(path = %file.display(), "Checking cache file");
2233
2234        let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2235            Ok(plot_file) => plot_file,
2236            Err(error) => {
2237                return if error.kind() == io::ErrorKind::NotFound {
2238                    warn!(
2239                        file = %file.display(),
2240                        "Cache file does not exist, this is expected in farming cluster"
2241                    );
2242                    Ok(())
2243                } else {
2244                    Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2245                };
2246            }
2247        };
2248
2249        // Error doesn't matter here
2250        let _ = cache_file.advise_sequential_access();
2251
2252        let cache_size = match cache_file.size() {
2253            Ok(cache_size) => cache_size,
2254            Err(error) => {
2255                return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2256            }
2257        };
2258
2259        let element_size = DiskPieceCache::element_size();
2260        let number_of_cached_elements = cache_size / u64::from(element_size);
2261        let dummy_element = vec![0; element_size as usize];
2262        (0..number_of_cached_elements)
2263            .into_par_iter()
2264            .map_with(vec![0; element_size as usize], |element, cache_offset| {
2265                let _span_guard = span.enter();
2266
2267                let offset = cache_offset * u64::from(element_size);
2268                if let Err(error) = cache_file.read_exact_at(element, offset) {
2269                    warn!(
2270                        path = %file.display(),
2271                        %cache_offset,
2272                        size = %element.len() as u64,
2273                        %offset,
2274                        %error,
2275                        "Failed to read cached piece, replacing with dummy element"
2276                    );
2277
2278                    if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2279                    {
2280                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2281                            file: file.clone(),
2282                            size: u64::from(element_size),
2283                            offset,
2284                            error,
2285                        });
2286                    }
2287
2288                    return Ok(());
2289                }
2290
2291                let (index_and_piece_bytes, expected_checksum) =
2292                    element.split_at(element_size as usize - Blake3Hash::SIZE);
2293                let actual_checksum = blake3_hash(index_and_piece_bytes);
2294                if *actual_checksum != *expected_checksum && element != &dummy_element {
2295                    warn!(
2296                        %cache_offset,
2297                        actual_checksum = %hex::encode(actual_checksum),
2298                        expected_checksum = %hex::encode(expected_checksum),
2299                        "Cached piece checksum mismatch, replacing with dummy element"
2300                    );
2301
2302                    if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2303                    {
2304                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2305                            file: file.clone(),
2306                            size: u64::from(element_size),
2307                            offset,
2308                            error,
2309                        });
2310                    }
2311
2312                    return Ok(());
2313                }
2314
2315                Ok(())
2316            })
2317            .try_for_each({
2318                let span = &span;
2319                let checked_elements = AtomicUsize::new(0);
2320
2321                move |result| {
2322                    let _span_guard = span.enter();
2323
2324                    let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2325                    if checked_elements > 1 && checked_elements % 1000 == 0 {
2326                        info!(
2327                            "Checked {}/{} cache elements",
2328                            checked_elements, number_of_cached_elements
2329                        );
2330                    }
2331
2332                    result
2333                }
2334            })?;
2335
2336        Ok(())
2337    }
2338}
2339
2340fn write_dummy_sector_metadata(
2341    metadata_file: &File,
2342    metadata_file_path: &Path,
2343    sector_index: SectorIndex,
2344    pieces_in_sector: u16,
2345) -> Result<(), SingleDiskFarmScrubError> {
2346    let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2347        sector_index,
2348        pieces_in_sector,
2349        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2350        history_size: HistorySize::from(SegmentIndex::ZERO),
2351    })
2352    .encode();
2353    let sector_offset = RESERVED_PLOT_METADATA
2354        + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2355    metadata_file
2356        .write_all_at(&dummy_sector_bytes, sector_offset)
2357        .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2358            file: metadata_file_path.to_path_buf(),
2359            size: dummy_sector_bytes.len() as u64,
2360            offset: sector_offset,
2361            error,
2362        })
2363}