subspace_farmer/
single_disk_farm.rs

1//! Primary [`Farm`] implementation that deals with hardware directly
2//!
3//! Single disk farm is an abstraction that contains an identity, associated plot with metadata and
4//! a small piece cache. It fully manages farming and plotting process, including listening to node
5//! notifications, producing solutions and singing rewards.
6
7pub mod direct_io_file;
8pub mod farming;
9pub mod identity;
10mod metrics;
11pub mod piece_cache;
12pub mod piece_reader;
13pub mod plot_cache;
14mod plotted_sectors;
15mod plotting;
16mod reward_signing;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20    Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21    PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::direct_io_file::{DISK_SECTOR_SIZE, DirectIoFile};
26use crate::single_disk_farm::farming::rayon_files::RayonFiles;
27use crate::single_disk_farm::farming::{
28    FarmingOptions, PlotAudit, farming, slot_notification_forwarder,
29};
30use crate::single_disk_farm::identity::{Identity, IdentityError};
31use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
32use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
33use crate::single_disk_farm::piece_reader::DiskPieceReader;
34use crate::single_disk_farm::plot_cache::DiskPlotCache;
35use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
36pub use crate::single_disk_farm::plotting::PlottingError;
37use crate::single_disk_farm::plotting::{
38    PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, plotting, plotting_scheduler,
39};
40use crate::single_disk_farm::reward_signing::reward_signing;
41use crate::utils::{AsyncJoinOnDrop, tokio_rayon_spawn_handler};
42use crate::{KNOWN_PEERS_CACHE_SIZE, farm};
43use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
44use async_trait::async_trait;
45use event_listener_primitives::{Bag, HandlerId};
46use futures::channel::{mpsc, oneshot};
47use futures::stream::FuturesUnordered;
48use futures::{FutureExt, StreamExt, select};
49use parity_scale_codec::{Decode, Encode};
50use parking_lot::Mutex;
51use prometheus_client::registry::Registry;
52use rayon::prelude::*;
53use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
54use serde::{Deserialize, Serialize};
55use static_assertions::const_assert;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::future::Future;
59use std::io::Write;
60use std::num::{NonZeroU32, NonZeroUsize};
61use std::path::{Path, PathBuf};
62use std::pin::Pin;
63use std::str::FromStr;
64use std::sync::Arc;
65use std::sync::atomic::{AtomicUsize, Ordering};
66use std::time::Duration;
67use std::{fmt, fs, io, mem};
68use subspace_core_primitives::PublicKey;
69use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash};
70use subspace_core_primitives::pieces::Record;
71use subspace_core_primitives::sectors::SectorIndex;
72use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
73use subspace_erasure_coding::ErasureCoding;
74use subspace_farmer_components::FarmerProtocolInfo;
75use subspace_farmer_components::file_ext::FileExt;
76use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
77use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed, sector_size};
78use subspace_kzg::Kzg;
79use subspace_networking::KnownPeersManager;
80use subspace_proof_of_space::Table;
81use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
82use thiserror::Error;
83use tokio::runtime::Handle;
84use tokio::sync::broadcast;
85use tokio::task;
86use tracing::{Instrument, Span, error, info, trace, warn};
87
88// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
89// usize depending on chain parameters
90const_assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
91
92/// Reserve 1M of space for plot metadata (for potential future expansion)
93const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
94/// Reserve 1M of space for farm info (for potential future expansion)
95const RESERVED_FARM_INFO: u64 = 1024 * 1024;
96const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
97
98/// Exclusive lock for single disk farm info file, ensuring no concurrent edits by cooperating processes is done
99#[derive(Debug)]
100#[must_use = "Lock file must be kept around or as long as farm is used"]
101pub struct SingleDiskFarmInfoLock {
102    _file: File,
103}
104
105/// Important information about the contents of the `SingleDiskFarm`
106#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")]
108pub enum SingleDiskFarmInfo {
109    /// V0 of the info
110    #[serde(rename_all = "camelCase")]
111    V0 {
112        /// ID of the farm
113        id: FarmId,
114        /// Genesis hash of the chain used for farm creation
115        #[serde(with = "hex")]
116        genesis_hash: [u8; 32],
117        /// Public key of identity used for farm creation
118        public_key: PublicKey,
119        /// How many pieces does one sector contain.
120        pieces_in_sector: u16,
121        /// How much space in bytes is allocated for this farm
122        allocated_space: u64,
123    },
124}
125
126impl SingleDiskFarmInfo {
127    const FILE_NAME: &'static str = "single_disk_farm.json";
128
129    /// Create new instance
130    pub fn new(
131        id: FarmId,
132        genesis_hash: [u8; 32],
133        public_key: PublicKey,
134        pieces_in_sector: u16,
135        allocated_space: u64,
136    ) -> Self {
137        Self::V0 {
138            id,
139            genesis_hash,
140            public_key,
141            pieces_in_sector,
142            allocated_space,
143        }
144    }
145
146    /// Load `SingleDiskFarm` from path is supposed to be stored, `None` means no info file was
147    /// found, happens during first start.
148    pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
149        let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
150            Ok(bytes) => bytes,
151            Err(error) => {
152                return if error.kind() == io::ErrorKind::NotFound {
153                    Ok(None)
154                } else {
155                    Err(error)
156                };
157            }
158        };
159
160        serde_json::from_slice(&bytes)
161            .map(Some)
162            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
163    }
164
165    /// Store `SingleDiskFarm` info to path, so it can be loaded again upon restart.
166    ///
167    /// Can optionally return a lock.
168    pub fn store_to(
169        &self,
170        directory: &Path,
171        lock: bool,
172    ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
173        let mut file = OpenOptions::new()
174            .write(true)
175            .create(true)
176            .truncate(false)
177            .open(directory.join(Self::FILE_NAME))?;
178        if lock {
179            fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
180        }
181        file.set_len(0)?;
182        file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
183
184        Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
185    }
186
187    /// Try to acquire exclusive lock on the single disk farm info file, ensuring no concurrent edits by cooperating
188    /// processes is done
189    pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
190        let file = File::open(directory.join(Self::FILE_NAME))?;
191        fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
192
193        Ok(SingleDiskFarmInfoLock { _file: file })
194    }
195
196    /// ID of the farm
197    pub fn id(&self) -> &FarmId {
198        let Self::V0 { id, .. } = self;
199        id
200    }
201
202    /// Genesis hash of the chain used for farm creation
203    pub fn genesis_hash(&self) -> &[u8; 32] {
204        let Self::V0 { genesis_hash, .. } = self;
205        genesis_hash
206    }
207
208    /// Public key of identity used for farm creation
209    pub fn public_key(&self) -> &PublicKey {
210        let Self::V0 { public_key, .. } = self;
211        public_key
212    }
213
214    /// How many pieces does one sector contain.
215    pub fn pieces_in_sector(&self) -> u16 {
216        match self {
217            SingleDiskFarmInfo::V0 {
218                pieces_in_sector, ..
219            } => *pieces_in_sector,
220        }
221    }
222
223    /// How much space in bytes is allocated for this farm
224    pub fn allocated_space(&self) -> u64 {
225        match self {
226            SingleDiskFarmInfo::V0 {
227                allocated_space, ..
228            } => *allocated_space,
229        }
230    }
231}
232
233/// Summary of single disk farm for presentational purposes
234#[derive(Debug)]
235pub enum SingleDiskFarmSummary {
236    /// Farm was found and read successfully
237    Found {
238        /// Farm info
239        info: SingleDiskFarmInfo,
240        /// Path to directory where farm is stored.
241        directory: PathBuf,
242    },
243    /// Farm was not found
244    NotFound {
245        /// Path to directory where farm is stored.
246        directory: PathBuf,
247    },
248    /// Failed to open farm
249    Error {
250        /// Path to directory where farm is stored.
251        directory: PathBuf,
252        /// Error itself
253        error: io::Error,
254    },
255}
256
257#[derive(Debug, Encode, Decode)]
258struct PlotMetadataHeader {
259    version: u8,
260    plotted_sector_count: SectorIndex,
261}
262
263impl PlotMetadataHeader {
264    #[inline]
265    fn encoded_size() -> usize {
266        let default = PlotMetadataHeader {
267            version: 0,
268            plotted_sector_count: 0,
269        };
270
271        default.encoded_size()
272    }
273}
274
275/// Options used to open single disk farm
276#[derive(Debug)]
277pub struct SingleDiskFarmOptions<'a, NC>
278where
279    NC: Clone,
280{
281    /// Path to directory where farm is stored.
282    pub directory: PathBuf,
283    /// Information necessary for farmer application
284    pub farmer_app_info: FarmerAppInfo,
285    /// How much space in bytes was allocated
286    pub allocated_space: u64,
287    /// How many pieces one sector is supposed to contain (max)
288    pub max_pieces_in_sector: u16,
289    /// RPC client connected to Subspace node
290    pub node_client: NC,
291    /// Address where farming rewards should go
292    pub reward_address: PublicKey,
293    /// Plotter
294    pub plotter: Arc<dyn Plotter + Send + Sync>,
295    /// Kzg instance to use.
296    pub kzg: Kzg,
297    /// Erasure coding instance to use.
298    pub erasure_coding: ErasureCoding,
299    /// Percentage of allocated space dedicated for caching purposes
300    pub cache_percentage: u8,
301    /// Thread pool size used for farming (mostly for blocking I/O, but also for some
302    /// compute-intensive operations during proving)
303    pub farming_thread_pool_size: usize,
304    /// Notification for plotter to start, can be used to delay plotting until some initialization
305    /// has happened externally
306    pub plotting_delay: Option<oneshot::Receiver<()>>,
307    /// Global mutex that can restrict concurrency of resource-intensive operations and make sure
308    /// that those operations that are very sensitive (like proving) have all the resources
309    /// available to them for the highest probability of success
310    pub global_mutex: Arc<AsyncMutex<()>>,
311    /// How many sectors a will be plotted concurrently per farm
312    pub max_plotting_sectors_per_farm: NonZeroUsize,
313    /// Disable farm locking, for example if file system doesn't support it
314    pub disable_farm_locking: bool,
315    /// Mode to use for reading of sector record chunks instead
316    pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
317    /// Prometheus registry
318    pub registry: Option<&'a Mutex<&'a mut Registry>>,
319    /// Whether to create a farm if it doesn't yet exist
320    pub create: bool,
321}
322
323/// Errors happening when trying to create/open single disk farm
324#[derive(Debug, Error)]
325pub enum SingleDiskFarmError {
326    /// Failed to open or create identity
327    #[error("Failed to open or create identity: {0}")]
328    FailedToOpenIdentity(#[from] IdentityError),
329    /// Farm is likely already in use, make sure no other farmer is using it
330    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
331    LikelyAlreadyInUse(io::Error),
332    /// I/O error occurred
333    #[error("Single disk farm I/O error: {0}")]
334    Io(#[from] io::Error),
335    /// Failed to spawn task for blocking thread
336    #[error("Failed to spawn task for blocking thread: {0}")]
337    TokioJoinError(#[from] task::JoinError),
338    /// Piece cache error
339    #[error("Piece cache error: {0}")]
340    PieceCacheError(#[from] DiskPieceCacheError),
341    /// Can't preallocate metadata file, probably not enough space on disk
342    #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
343    CantPreallocateMetadataFile(io::Error),
344    /// Can't preallocate plot file, probably not enough space on disk
345    #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
346    CantPreallocatePlotFile(io::Error),
347    /// Wrong chain (genesis hash)
348    #[error(
349        "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
350        created, it is not possible to use farm on a different chain"
351    )]
352    WrongChain {
353        /// Farm ID
354        id: FarmId,
355        /// Hex-encoded genesis hash during farm creation
356        // TODO: Wrapper type with `Display` impl for genesis hash
357        correct_chain: String,
358        /// Hex-encoded current genesis hash
359        wrong_chain: String,
360    },
361    /// Public key in identity doesn't match metadata
362    #[error(
363        "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
364        farm was created, something went wrong, likely due to manual edits"
365    )]
366    IdentityMismatch {
367        /// Farm ID
368        id: FarmId,
369        /// Public key used during farm creation
370        correct_public_key: PublicKey,
371        /// Current public key
372        wrong_public_key: PublicKey,
373    },
374    /// Invalid number pieces in sector
375    #[error(
376        "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
377        {initialized_with}"
378    )]
379    InvalidPiecesInSector {
380        /// Farm ID
381        id: FarmId,
382        /// Max supported pieces in sector
383        max_supported: u16,
384        /// Number of pieces in sector farm is initialized with
385        initialized_with: u16,
386    },
387    /// Failed to decode metadata header
388    #[error("Failed to decode metadata header: {0}")]
389    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
390    /// Unexpected metadata version
391    #[error("Unexpected metadata version {0}")]
392    UnexpectedMetadataVersion(u8),
393    /// Allocated space is not enough for one sector
394    #[error(
395        "Allocated space is not enough for one sector. \
396        The lowest acceptable value for allocated space is {min_space} bytes, \
397        provided {allocated_space} bytes."
398    )]
399    InsufficientAllocatedSpace {
400        /// Minimal allocated space
401        min_space: u64,
402        /// Current allocated space
403        allocated_space: u64,
404    },
405    /// Farm is too large
406    #[error(
407        "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
408        supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
409        instead."
410    )]
411    FarmTooLarge {
412        /// Allocated space
413        allocated_space: u64,
414        /// Allocated space in sectors
415        allocated_sectors: u64,
416        /// Max supported allocated space
417        max_space: u64,
418        /// Max supported allocated space in sectors
419        max_sectors: u16,
420    },
421    /// Failed to create thread pool
422    #[error("Failed to create thread pool: {0}")]
423    FailedToCreateThreadPool(ThreadPoolBuildError),
424}
425
426/// Errors happening during scrubbing
427#[derive(Debug, Error)]
428pub enum SingleDiskFarmScrubError {
429    /// Farm is likely already in use, make sure no other farmer is using it
430    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
431    LikelyAlreadyInUse(io::Error),
432    /// Failed to determine file size
433    #[error("Failed to file size of {file}: {error}")]
434    FailedToDetermineFileSize {
435        /// Affected file
436        file: PathBuf,
437        /// Low-level error
438        error: io::Error,
439    },
440    /// Failed to read bytes from file
441    #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
442    FailedToReadBytes {
443        /// Affected file
444        file: PathBuf,
445        /// Number of bytes to read
446        size: u64,
447        /// Offset in the file
448        offset: u64,
449        /// Low-level error
450        error: io::Error,
451    },
452    /// Failed to write bytes from file
453    #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
454    FailedToWriteBytes {
455        /// Affected file
456        file: PathBuf,
457        /// Number of bytes to read
458        size: u64,
459        /// Offset in the file
460        offset: u64,
461        /// Low-level error
462        error: io::Error,
463    },
464    /// Farm info file does not exist
465    #[error("Farm info file does not exist at {file}")]
466    FarmInfoFileDoesNotExist {
467        /// Info file
468        file: PathBuf,
469    },
470    /// Farm info can't be opened
471    #[error("Farm info at {file} can't be opened: {error}")]
472    FarmInfoCantBeOpened {
473        /// Info file
474        file: PathBuf,
475        /// Low-level error
476        error: io::Error,
477    },
478    /// Identity file does not exist
479    #[error("Identity file does not exist at {file}")]
480    IdentityFileDoesNotExist {
481        /// Identity file
482        file: PathBuf,
483    },
484    /// Identity can't be opened
485    #[error("Identity at {file} can't be opened: {error}")]
486    IdentityCantBeOpened {
487        /// Identity file
488        file: PathBuf,
489        /// Low-level error
490        error: IdentityError,
491    },
492    /// Identity public key doesn't match public key in the disk farm info
493    #[error("Identity public key {identity} doesn't match public key in the disk farm info {info}")]
494    PublicKeyMismatch {
495        /// Identity public key
496        identity: PublicKey,
497        /// Disk farm info public key
498        info: PublicKey,
499    },
500    /// Metadata file does not exist
501    #[error("Metadata file does not exist at {file}")]
502    MetadataFileDoesNotExist {
503        /// Metadata file
504        file: PathBuf,
505    },
506    /// Metadata can't be opened
507    #[error("Metadata at {file} can't be opened: {error}")]
508    MetadataCantBeOpened {
509        /// Metadata file
510        file: PathBuf,
511        /// Low-level error
512        error: io::Error,
513    },
514    /// Metadata file too small
515    #[error(
516        "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
517        is {size}"
518    )]
519    MetadataFileTooSmall {
520        /// Metadata file
521        file: PathBuf,
522        /// Reserved size
523        reserved_size: u64,
524        /// File size
525        size: u64,
526    },
527    /// Failed to decode metadata header
528    #[error("Failed to decode metadata header: {0}")]
529    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
530    /// Unexpected metadata version
531    #[error("Unexpected metadata version {0}")]
532    UnexpectedMetadataVersion(u8),
533    /// Cache can't be opened
534    #[error("Cache at {file} can't be opened: {error}")]
535    CacheCantBeOpened {
536        /// Cache file
537        file: PathBuf,
538        /// Low-level error
539        error: io::Error,
540    },
541}
542
543/// Errors that happen in background tasks
544#[derive(Debug, Error)]
545pub enum BackgroundTaskError {
546    /// Plotting error
547    #[error(transparent)]
548    Plotting(#[from] PlottingError),
549    /// Farming error
550    #[error(transparent)]
551    Farming(#[from] FarmingError),
552    /// Reward signing
553    #[error(transparent)]
554    RewardSigning(#[from] anyhow::Error),
555    /// Background task panicked
556    #[error("Background task {task} panicked")]
557    BackgroundTaskPanicked {
558        /// Name of the task
559        task: String,
560    },
561}
562
563type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
564
565/// Scrub target
566#[derive(Debug, Copy, Clone)]
567pub enum ScrubTarget {
568    /// Scrub everything
569    All,
570    /// Scrub just metadata
571    Metadata,
572    /// Scrub metadata and corresponding plot
573    Plot,
574    /// Only scrub cache
575    Cache,
576}
577
578impl fmt::Display for ScrubTarget {
579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580        match self {
581            Self::All => f.write_str("all"),
582            Self::Metadata => f.write_str("metadata"),
583            Self::Plot => f.write_str("plot"),
584            Self::Cache => f.write_str("cache"),
585        }
586    }
587}
588
589impl FromStr for ScrubTarget {
590    type Err = String;
591
592    fn from_str(s: &str) -> Result<Self, Self::Err> {
593        match s {
594            "all" => Ok(Self::All),
595            "metadata" => Ok(Self::Metadata),
596            "plot" => Ok(Self::Plot),
597            "cache" => Ok(Self::Cache),
598            s => Err(format!("Can't parse {s} as `ScrubTarget`")),
599        }
600    }
601}
602
603impl ScrubTarget {
604    fn metadata(&self) -> bool {
605        match self {
606            Self::All | Self::Metadata | Self::Plot => true,
607            Self::Cache => false,
608        }
609    }
610
611    fn plot(&self) -> bool {
612        match self {
613            Self::All | Self::Plot => true,
614            Self::Metadata | Self::Cache => false,
615        }
616    }
617
618    fn cache(&self) -> bool {
619        match self {
620            Self::All | Self::Cache => true,
621            Self::Metadata | Self::Plot => false,
622        }
623    }
624}
625
626struct AllocatedSpaceDistribution {
627    piece_cache_file_size: u64,
628    piece_cache_capacity: u32,
629    plot_file_size: u64,
630    target_sector_count: u16,
631    metadata_file_size: u64,
632}
633
634impl AllocatedSpaceDistribution {
635    fn new(
636        allocated_space: u64,
637        sector_size: u64,
638        cache_percentage: u8,
639        sector_metadata_size: u64,
640    ) -> Result<Self, SingleDiskFarmError> {
641        let single_sector_overhead = sector_size + sector_metadata_size;
642        // Fixed space usage regardless of plot size
643        let fixed_space_usage = RESERVED_PLOT_METADATA
644            + RESERVED_FARM_INFO
645            + Identity::file_size() as u64
646            + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
647        // Calculate how many sectors can fit
648        let target_sector_count = {
649            let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
650                / 100
651                * (100 - u64::from(cache_percentage));
652            // Do the rounding to make sure we have exactly as much space as fits whole number of
653            // sectors, account for disk sector size just in case
654            (potentially_plottable_space - DISK_SECTOR_SIZE as u64) / single_sector_overhead
655        };
656
657        if target_sector_count == 0 {
658            let mut single_plot_with_cache_space =
659                single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
660            // Cache must not be empty, ensure it contains at least one element even if
661            // percentage-wise it will use more space
662            if single_plot_with_cache_space - single_sector_overhead
663                < DiskPieceCache::element_size() as u64
664            {
665                single_plot_with_cache_space =
666                    single_sector_overhead + DiskPieceCache::element_size() as u64;
667            }
668
669            return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
670                min_space: fixed_space_usage + single_plot_with_cache_space,
671                allocated_space,
672            });
673        }
674        let plot_file_size = target_sector_count * sector_size;
675        // Align plot file size for disk sector size
676        let plot_file_size =
677            plot_file_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
678
679        // Remaining space will be used for caching purposes
680        let piece_cache_capacity = if cache_percentage > 0 {
681            let cache_space = allocated_space
682                - fixed_space_usage
683                - plot_file_size
684                - (sector_metadata_size * target_sector_count);
685            (cache_space / u64::from(DiskPieceCache::element_size())) as u32
686        } else {
687            0
688        };
689        let target_sector_count = match SectorIndex::try_from(target_sector_count) {
690            Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
691                target_sector_count
692            }
693            _ => {
694                // We use this for both count and index, hence index must not reach actual `MAX`
695                // (consensus doesn't care about this, just farmer implementation detail)
696                let max_sectors = SectorIndex::MAX - 1;
697                return Err(SingleDiskFarmError::FarmTooLarge {
698                    allocated_space: target_sector_count * sector_size,
699                    allocated_sectors: target_sector_count,
700                    max_space: max_sectors as u64 * sector_size,
701                    max_sectors,
702                });
703            }
704        };
705
706        Ok(Self {
707            piece_cache_file_size: u64::from(piece_cache_capacity)
708                * u64::from(DiskPieceCache::element_size()),
709            piece_cache_capacity,
710            plot_file_size,
711            target_sector_count,
712            metadata_file_size: RESERVED_PLOT_METADATA
713                + sector_metadata_size * u64::from(target_sector_count),
714        })
715    }
716}
717
718type Handler<A> = Bag<HandlerFn<A>, A>;
719
720#[derive(Default, Debug)]
721struct Handlers {
722    sector_update: Handler<(SectorIndex, SectorUpdate)>,
723    farming_notification: Handler<FarmingNotification>,
724    solution: Handler<SolutionResponse>,
725}
726
727struct SingleDiskFarmInit {
728    identity: Identity,
729    single_disk_farm_info: SingleDiskFarmInfo,
730    single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
731    plot_file: Arc<DirectIoFile>,
732    metadata_file: DirectIoFile,
733    metadata_header: PlotMetadataHeader,
734    target_sector_count: u16,
735    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
736    piece_cache_capacity: u32,
737    plot_cache: DiskPlotCache,
738}
739
740/// Single disk farm abstraction is a container for everything necessary to plot/farm with a single
741/// disk.
742///
743/// Farm starts operating during creation and doesn't stop until dropped (or error happens).
744#[derive(Debug)]
745#[must_use = "Plot does not function properly unless run() method is called"]
746pub struct SingleDiskFarm {
747    farmer_protocol_info: FarmerProtocolInfo,
748    single_disk_farm_info: SingleDiskFarmInfo,
749    /// Metadata of all sectors plotted so far
750    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
751    pieces_in_sector: u16,
752    total_sectors_count: SectorIndex,
753    span: Span,
754    tasks: FuturesUnordered<BackgroundTask>,
755    handlers: Arc<Handlers>,
756    piece_cache: SingleDiskPieceCache,
757    plot_cache: DiskPlotCache,
758    piece_reader: DiskPieceReader,
759    /// Sender that will be used to signal to background threads that they should start
760    start_sender: Option<broadcast::Sender<()>>,
761    /// Sender that will be used to signal to background threads that they must stop
762    stop_sender: Option<broadcast::Sender<()>>,
763    _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
764}
765
766impl Drop for SingleDiskFarm {
767    #[inline]
768    fn drop(&mut self) {
769        self.piece_reader.close_all_readers();
770        // Make background threads that are waiting to do something exit immediately
771        self.start_sender.take();
772        // Notify background tasks that they must stop
773        self.stop_sender.take();
774    }
775}
776
777#[async_trait(?Send)]
778impl Farm for SingleDiskFarm {
779    fn id(&self) -> &FarmId {
780        self.id()
781    }
782
783    fn total_sectors_count(&self) -> SectorIndex {
784        self.total_sectors_count
785    }
786
787    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
788        Arc::new(self.plotted_sectors())
789    }
790
791    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
792        Arc::new(self.piece_reader())
793    }
794
795    fn on_sector_update(
796        &self,
797        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
798    ) -> Box<dyn farm::HandlerId> {
799        Box::new(self.on_sector_update(callback))
800    }
801
802    fn on_farming_notification(
803        &self,
804        callback: HandlerFn<FarmingNotification>,
805    ) -> Box<dyn farm::HandlerId> {
806        Box::new(self.on_farming_notification(callback))
807    }
808
809    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
810        Box::new(self.on_solution(callback))
811    }
812
813    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
814        Box::pin((*self).run())
815    }
816}
817
818impl SingleDiskFarm {
819    /// Name of the plot file
820    pub const PLOT_FILE: &'static str = "plot.bin";
821    /// Name of the metadata file
822    pub const METADATA_FILE: &'static str = "metadata.bin";
823    const SUPPORTED_PLOT_VERSION: u8 = 0;
824
825    /// Create new single disk farm instance
826    pub async fn new<NC, PosTable>(
827        options: SingleDiskFarmOptions<'_, NC>,
828        farm_index: usize,
829    ) -> Result<Self, SingleDiskFarmError>
830    where
831        NC: NodeClient + Clone,
832        PosTable: Table,
833    {
834        let span = Span::current();
835
836        let SingleDiskFarmOptions {
837            directory,
838            farmer_app_info,
839            allocated_space,
840            max_pieces_in_sector,
841            node_client,
842            reward_address,
843            plotter,
844            kzg,
845            erasure_coding,
846            cache_percentage,
847            farming_thread_pool_size,
848            plotting_delay,
849            global_mutex,
850            max_plotting_sectors_per_farm,
851            disable_farm_locking,
852            read_sector_record_chunks_mode,
853            registry,
854            create,
855        } = options;
856
857        let single_disk_farm_init_fut = task::spawn_blocking({
858            let directory = directory.clone();
859            let farmer_app_info = farmer_app_info.clone();
860            let span = span.clone();
861
862            move || {
863                let _span_guard = span.enter();
864                Self::init(
865                    &directory,
866                    &farmer_app_info,
867                    allocated_space,
868                    max_pieces_in_sector,
869                    cache_percentage,
870                    disable_farm_locking,
871                    create,
872                )
873            }
874        });
875
876        let single_disk_farm_init =
877            AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
878
879        let SingleDiskFarmInit {
880            identity,
881            single_disk_farm_info,
882            single_disk_farm_info_lock,
883            plot_file,
884            metadata_file,
885            metadata_header,
886            target_sector_count,
887            sectors_metadata,
888            piece_cache_capacity,
889            plot_cache,
890        } = single_disk_farm_init;
891
892        let piece_cache = {
893            // Convert farm ID into cache ID for single disk farm
894            let FarmId::Ulid(id) = *single_disk_farm_info.id();
895            let id = PieceCacheId::Ulid(id);
896
897            SingleDiskPieceCache::new(
898                id,
899                if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
900                    Some(task::block_in_place(|| {
901                        if let Some(registry) = registry {
902                            DiskPieceCache::open(
903                                &directory,
904                                piece_cache_capacity,
905                                Some(id),
906                                Some(*registry.lock()),
907                            )
908                        } else {
909                            DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
910                        }
911                    })?)
912                } else {
913                    None
914                },
915            )
916        };
917
918        let public_key = *single_disk_farm_info.public_key();
919        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
920        let sector_size = sector_size(pieces_in_sector);
921
922        let metrics = registry.map(|registry| {
923            Arc::new(SingleDiskFarmMetrics::new(
924                *registry.lock(),
925                single_disk_farm_info.id(),
926                target_sector_count,
927                sectors_metadata.read_blocking().len() as SectorIndex,
928            ))
929        });
930
931        let (error_sender, error_receiver) = oneshot::channel();
932        let error_sender = Arc::new(Mutex::new(Some(error_sender)));
933
934        let tasks = FuturesUnordered::<BackgroundTask>::new();
935
936        tasks.push(Box::pin(async move {
937            if let Ok(error) = error_receiver.await {
938                return Err(error);
939            }
940
941            Ok(())
942        }));
943
944        let handlers = Arc::<Handlers>::default();
945        let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
946        let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
947        let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
948        let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
949        // Some sectors may already be plotted, skip them
950        let sectors_indices_left_to_plot =
951            metadata_header.plotted_sector_count..target_sector_count;
952
953        let farming_thread_pool = ThreadPoolBuilder::new()
954            .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}"))
955            .num_threads(farming_thread_pool_size)
956            .spawn_handler(tokio_rayon_spawn_handler())
957            .build()
958            .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
959        let farming_plot_fut = task::spawn_blocking(|| {
960            farming_thread_pool
961                .install(move || {
962                    RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
963                        DirectIoFile::open(path)
964                    })
965                })
966                .map(|farming_plot| (farming_plot, farming_thread_pool))
967        });
968
969        let (farming_plot, farming_thread_pool) =
970            AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
971
972        let plotting_join_handle = task::spawn_blocking({
973            let sectors_metadata = Arc::clone(&sectors_metadata);
974            let handlers = Arc::clone(&handlers);
975            let sectors_being_modified = Arc::clone(&sectors_being_modified);
976            let node_client = node_client.clone();
977            let plot_file = Arc::clone(&plot_file);
978            let error_sender = Arc::clone(&error_sender);
979            let span = span.clone();
980            let global_mutex = Arc::clone(&global_mutex);
981            let metrics = metrics.clone();
982
983            move || {
984                let _span_guard = span.enter();
985
986                let plotting_options = PlottingOptions {
987                    metadata_header,
988                    sectors_metadata: &sectors_metadata,
989                    sectors_being_modified: &sectors_being_modified,
990                    sectors_to_plot_receiver,
991                    sector_plotting_options: SectorPlottingOptions {
992                        public_key,
993                        node_client: &node_client,
994                        pieces_in_sector,
995                        sector_size,
996                        plot_file,
997                        metadata_file: Arc::new(metadata_file),
998                        handlers: &handlers,
999                        global_mutex: &global_mutex,
1000                        plotter,
1001                        metrics,
1002                    },
1003                    max_plotting_sectors_per_farm,
1004                };
1005
1006                let plotting_fut = async {
1007                    if start_receiver.recv().await.is_err() {
1008                        // Dropped before starting
1009                        return Ok(());
1010                    }
1011
1012                    if let Some(plotting_delay) = plotting_delay
1013                        && plotting_delay.await.is_err()
1014                    {
1015                        // Dropped before resolving
1016                        return Ok(());
1017                    }
1018
1019                    plotting(plotting_options).await
1020                };
1021
1022                Handle::current().block_on(async {
1023                    select! {
1024                        plotting_result = plotting_fut.fuse() => {
1025                            if let Err(error) = plotting_result
1026                                && let Some(error_sender) = error_sender.lock().take()
1027                                && let Err(error) = error_sender.send(error.into())
1028                            {
1029                                error!(
1030                                    %error,
1031                                    "Plotting failed to send error to background task"
1032                                );
1033                            }
1034                        }
1035                        _ = stop_receiver.recv().fuse() => {
1036                            // Nothing, just exit
1037                        }
1038                    }
1039                });
1040            }
1041        });
1042        let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1043
1044        tasks.push(Box::pin(async move {
1045            // Panic will already be printed by now
1046            plotting_join_handle.await.map_err(|_error| {
1047                BackgroundTaskError::BackgroundTaskPanicked {
1048                    task: format!("plotting-{farm_index}"),
1049                }
1050            })
1051        }));
1052
1053        let plotting_scheduler_options = PlottingSchedulerOptions {
1054            public_key_hash: public_key.hash(),
1055            sectors_indices_left_to_plot,
1056            target_sector_count,
1057            last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1058            min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1059            node_client: node_client.clone(),
1060            handlers: Arc::clone(&handlers),
1061            sectors_metadata: Arc::clone(&sectors_metadata),
1062            sectors_to_plot_sender,
1063            new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1064            metrics: metrics.clone(),
1065        };
1066        tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1067
1068        let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1069
1070        tasks.push(Box::pin({
1071            let node_client = node_client.clone();
1072            let metrics = metrics.clone();
1073
1074            async move {
1075                slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1076                    .await
1077                    .map_err(BackgroundTaskError::Farming)
1078            }
1079        }));
1080
1081        let farming_join_handle = task::spawn_blocking({
1082            let erasure_coding = erasure_coding.clone();
1083            let handlers = Arc::clone(&handlers);
1084            let sectors_being_modified = Arc::clone(&sectors_being_modified);
1085            let sectors_metadata = Arc::clone(&sectors_metadata);
1086            let mut start_receiver = start_sender.subscribe();
1087            let mut stop_receiver = stop_sender.subscribe();
1088            let node_client = node_client.clone();
1089            let span = span.clone();
1090            let global_mutex = Arc::clone(&global_mutex);
1091
1092            move || {
1093                let _span_guard = span.enter();
1094
1095                let farming_fut = async move {
1096                    if start_receiver.recv().await.is_err() {
1097                        // Dropped before starting
1098                        return Ok(());
1099                    }
1100
1101                    let plot_audit = PlotAudit::new(&farming_plot);
1102
1103                    let farming_options = FarmingOptions {
1104                        public_key,
1105                        reward_address,
1106                        node_client,
1107                        plot_audit,
1108                        sectors_metadata,
1109                        kzg,
1110                        erasure_coding,
1111                        handlers,
1112                        sectors_being_modified,
1113                        slot_info_notifications: slot_info_forwarder_receiver,
1114                        thread_pool: farming_thread_pool,
1115                        read_sector_record_chunks_mode,
1116                        global_mutex,
1117                        metrics,
1118                    };
1119                    farming::<PosTable, _, _>(farming_options).await
1120                };
1121
1122                Handle::current().block_on(async {
1123                    select! {
1124                        farming_result = farming_fut.fuse() => {
1125                            if let Err(error) = farming_result
1126                                && let Some(error_sender) = error_sender.lock().take()
1127                                && let Err(error) = error_sender.send(error.into())
1128                            {
1129                                error!(
1130                                    %error,
1131                                    "Farming failed to send error to background task",
1132                                );
1133                            }
1134                        }
1135                        _ = stop_receiver.recv().fuse() => {
1136                            // Nothing, just exit
1137                        }
1138                    }
1139                });
1140            }
1141        });
1142        let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1143
1144        tasks.push(Box::pin(async move {
1145            // Panic will already be printed by now
1146            farming_join_handle.await.map_err(|_error| {
1147                BackgroundTaskError::BackgroundTaskPanicked {
1148                    task: format!("farming-{farm_index}"),
1149                }
1150            })
1151        }));
1152
1153        let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1154            public_key,
1155            pieces_in_sector,
1156            plot_file,
1157            Arc::clone(&sectors_metadata),
1158            erasure_coding,
1159            sectors_being_modified,
1160            read_sector_record_chunks_mode,
1161            global_mutex,
1162        );
1163
1164        let reading_join_handle = task::spawn_blocking({
1165            let mut stop_receiver = stop_sender.subscribe();
1166            let reading_fut = reading_fut.instrument(span.clone());
1167
1168            move || {
1169                Handle::current().block_on(async {
1170                    select! {
1171                        _ = reading_fut.fuse() => {
1172                            // Nothing, just exit
1173                        }
1174                        _ = stop_receiver.recv().fuse() => {
1175                            // Nothing, just exit
1176                        }
1177                    }
1178                });
1179            }
1180        });
1181
1182        let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1183
1184        tasks.push(Box::pin(async move {
1185            // Panic will already be printed by now
1186            reading_join_handle.await.map_err(|_error| {
1187                BackgroundTaskError::BackgroundTaskPanicked {
1188                    task: format!("reading-{farm_index}"),
1189                }
1190            })
1191        }));
1192
1193        tasks.push(Box::pin(async move {
1194            match reward_signing(node_client, identity).await {
1195                Ok(reward_signing_fut) => {
1196                    reward_signing_fut.await;
1197                }
1198                Err(error) => {
1199                    return Err(BackgroundTaskError::RewardSigning(anyhow::anyhow!(
1200                        "Failed to subscribe to reward signing notifications: {error}"
1201                    )));
1202                }
1203            }
1204
1205            Ok(())
1206        }));
1207
1208        let farm = Self {
1209            farmer_protocol_info: farmer_app_info.protocol_info,
1210            single_disk_farm_info,
1211            sectors_metadata,
1212            pieces_in_sector,
1213            total_sectors_count: target_sector_count,
1214            span,
1215            tasks,
1216            handlers,
1217            piece_cache,
1218            plot_cache,
1219            piece_reader,
1220            start_sender: Some(start_sender),
1221            stop_sender: Some(stop_sender),
1222            _single_disk_farm_info_lock: single_disk_farm_info_lock,
1223        };
1224        Ok(farm)
1225    }
1226
1227    fn init(
1228        directory: &PathBuf,
1229        farmer_app_info: &FarmerAppInfo,
1230        allocated_space: u64,
1231        max_pieces_in_sector: u16,
1232        cache_percentage: u8,
1233        disable_farm_locking: bool,
1234        create: bool,
1235    ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1236        fs::create_dir_all(directory)?;
1237
1238        let identity = if create {
1239            Identity::open_or_create(directory)?
1240        } else {
1241            Identity::open(directory)?.ok_or_else(|| {
1242                IdentityError::Io(io::Error::new(
1243                    io::ErrorKind::NotFound,
1244                    "Farm does not exist and creation was explicitly disabled",
1245                ))
1246            })?
1247        };
1248        let public_key = identity.public_key().to_bytes().into();
1249
1250        let (single_disk_farm_info, single_disk_farm_info_lock) =
1251            match SingleDiskFarmInfo::load_from(directory)? {
1252                Some(mut single_disk_farm_info) => {
1253                    if &farmer_app_info.genesis_hash != single_disk_farm_info.genesis_hash() {
1254                        return Err(SingleDiskFarmError::WrongChain {
1255                            id: *single_disk_farm_info.id(),
1256                            correct_chain: hex::encode(single_disk_farm_info.genesis_hash()),
1257                            wrong_chain: hex::encode(farmer_app_info.genesis_hash),
1258                        });
1259                    }
1260
1261                    if &public_key != single_disk_farm_info.public_key() {
1262                        return Err(SingleDiskFarmError::IdentityMismatch {
1263                            id: *single_disk_farm_info.id(),
1264                            correct_public_key: *single_disk_farm_info.public_key(),
1265                            wrong_public_key: public_key,
1266                        });
1267                    }
1268
1269                    let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1270
1271                    if max_pieces_in_sector < pieces_in_sector {
1272                        return Err(SingleDiskFarmError::InvalidPiecesInSector {
1273                            id: *single_disk_farm_info.id(),
1274                            max_supported: max_pieces_in_sector,
1275                            initialized_with: pieces_in_sector,
1276                        });
1277                    }
1278
1279                    if max_pieces_in_sector > pieces_in_sector {
1280                        info!(
1281                            pieces_in_sector,
1282                            max_pieces_in_sector,
1283                            "Farm initialized with smaller number of pieces in sector, farm needs \
1284                            to be re-created for increase"
1285                        );
1286                    }
1287
1288                    let mut single_disk_farm_info_lock = None;
1289
1290                    if allocated_space != single_disk_farm_info.allocated_space() {
1291                        info!(
1292                            old_space = %bytesize::to_string(single_disk_farm_info.allocated_space(), true),
1293                            new_space = %bytesize::to_string(allocated_space, true),
1294                            "Farm size has changed"
1295                        );
1296
1297                        let new_allocated_space = allocated_space;
1298                        match &mut single_disk_farm_info {
1299                            SingleDiskFarmInfo::V0 {
1300                                allocated_space, ..
1301                            } => {
1302                                *allocated_space = new_allocated_space;
1303                            }
1304                        }
1305
1306                        single_disk_farm_info_lock =
1307                            single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1308                    } else if !disable_farm_locking {
1309                        single_disk_farm_info_lock = Some(
1310                            SingleDiskFarmInfo::try_lock(directory)
1311                                .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1312                        );
1313                    }
1314
1315                    (single_disk_farm_info, single_disk_farm_info_lock)
1316                }
1317                None => {
1318                    let single_disk_farm_info = SingleDiskFarmInfo::new(
1319                        FarmId::new(),
1320                        farmer_app_info.genesis_hash,
1321                        public_key,
1322                        max_pieces_in_sector,
1323                        allocated_space,
1324                    );
1325
1326                    let single_disk_farm_info_lock =
1327                        single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1328
1329                    (single_disk_farm_info, single_disk_farm_info_lock)
1330                }
1331            };
1332
1333        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1334        let sector_size = sector_size(pieces_in_sector) as u64;
1335        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1336        let allocated_space_distribution = AllocatedSpaceDistribution::new(
1337            allocated_space,
1338            sector_size,
1339            cache_percentage,
1340            sector_metadata_size as u64,
1341        )?;
1342        let target_sector_count = allocated_space_distribution.target_sector_count;
1343
1344        let metadata_file_path = directory.join(Self::METADATA_FILE);
1345        let metadata_file = DirectIoFile::open(&metadata_file_path)?;
1346
1347        let metadata_size = metadata_file.size()?;
1348        let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1349        // Align plot file size for disk sector size
1350        let expected_metadata_size =
1351            expected_metadata_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
1352        let metadata_header = if metadata_size == 0 {
1353            let metadata_header = PlotMetadataHeader {
1354                version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1355                plotted_sector_count: 0,
1356            };
1357
1358            metadata_file
1359                .preallocate(expected_metadata_size)
1360                .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1361            metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1362
1363            metadata_header
1364        } else {
1365            if metadata_size != expected_metadata_size {
1366                // Allocating the whole file (`set_len` below can create a sparse file, which will
1367                // cause writes to fail later)
1368                metadata_file
1369                    .preallocate(expected_metadata_size)
1370                    .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1371                // Truncating file (if necessary)
1372                metadata_file.set_len(expected_metadata_size)?;
1373            }
1374
1375            let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1376            metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1377
1378            let mut metadata_header =
1379                PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1380                    .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1381
1382            if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1383                return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1384                    metadata_header.version,
1385                ));
1386            }
1387
1388            if metadata_header.plotted_sector_count > target_sector_count {
1389                metadata_header.plotted_sector_count = target_sector_count;
1390                metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1391            }
1392
1393            metadata_header
1394        };
1395
1396        let sectors_metadata = {
1397            let mut sectors_metadata =
1398                Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1399
1400            let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1401            for sector_index in 0..metadata_header.plotted_sector_count {
1402                let sector_offset =
1403                    RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1404                metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1405
1406                let sector_metadata =
1407                    match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1408                        Ok(sector_metadata) => sector_metadata,
1409                        Err(error) => {
1410                            warn!(
1411                                path = %metadata_file_path.display(),
1412                                %error,
1413                                %sector_index,
1414                                "Failed to decode sector metadata, replacing with dummy expired \
1415                                sector metadata"
1416                            );
1417
1418                            let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1419                                sector_index,
1420                                pieces_in_sector,
1421                                s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1422                                history_size: HistorySize::from(SegmentIndex::ZERO),
1423                            });
1424                            metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1425
1426                            dummy_sector
1427                        }
1428                    };
1429                sectors_metadata.push(sector_metadata);
1430            }
1431
1432            Arc::new(AsyncRwLock::new(sectors_metadata))
1433        };
1434
1435        let plot_file = DirectIoFile::open(directory.join(Self::PLOT_FILE))?;
1436
1437        if plot_file.size()? != allocated_space_distribution.plot_file_size {
1438            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
1439            // writes to fail later)
1440            plot_file
1441                .preallocate(allocated_space_distribution.plot_file_size)
1442                .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1443            // Truncating file (if necessary)
1444            plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1445        }
1446
1447        let plot_file = Arc::new(plot_file);
1448
1449        let plot_cache = DiskPlotCache::new(
1450            &plot_file,
1451            &sectors_metadata,
1452            target_sector_count,
1453            sector_size,
1454        );
1455
1456        Ok(SingleDiskFarmInit {
1457            identity,
1458            single_disk_farm_info,
1459            single_disk_farm_info_lock,
1460            plot_file,
1461            metadata_file,
1462            metadata_header,
1463            target_sector_count,
1464            sectors_metadata,
1465            piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1466            plot_cache,
1467        })
1468    }
1469
1470    /// Collect summary of single disk farm for presentational purposes
1471    pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1472        let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1473            Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1474            Ok(None) => {
1475                return SingleDiskFarmSummary::NotFound { directory };
1476            }
1477            Err(error) => {
1478                return SingleDiskFarmSummary::Error { directory, error };
1479            }
1480        };
1481
1482        SingleDiskFarmSummary::Found {
1483            info: single_disk_farm_info,
1484            directory,
1485        }
1486    }
1487
1488    /// Effective on-disk allocation of the files related to the farm (takes some buffer space
1489    /// into consideration).
1490    ///
1491    /// This is a helpful number in case some files were not allocated properly or were removed and
1492    /// do not correspond to allocated space in the farm info accurately.
1493    pub fn effective_disk_usage(
1494        directory: &Path,
1495        cache_percentage: u8,
1496    ) -> Result<u64, SingleDiskFarmError> {
1497        let mut effective_disk_usage;
1498        match SingleDiskFarmInfo::load_from(directory)? {
1499            Some(single_disk_farm_info) => {
1500                let allocated_space_distribution = AllocatedSpaceDistribution::new(
1501                    single_disk_farm_info.allocated_space(),
1502                    sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1503                    cache_percentage,
1504                    SectorMetadataChecksummed::encoded_size() as u64,
1505                )?;
1506
1507                effective_disk_usage = single_disk_farm_info.allocated_space();
1508                effective_disk_usage -= Identity::file_size() as u64;
1509                effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1510                effective_disk_usage -= allocated_space_distribution.plot_file_size;
1511                effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1512            }
1513            None => {
1514                // No farm info, try to collect actual file sizes is any
1515                effective_disk_usage = 0;
1516            }
1517        };
1518
1519        if Identity::open(directory)?.is_some() {
1520            effective_disk_usage += Identity::file_size() as u64;
1521        }
1522
1523        match OpenOptions::new()
1524            .read(true)
1525            .open(directory.join(Self::METADATA_FILE))
1526        {
1527            Ok(metadata_file) => {
1528                effective_disk_usage += metadata_file.size()?;
1529            }
1530            Err(error) => {
1531                if error.kind() == io::ErrorKind::NotFound {
1532                    // File is not stored on disk
1533                } else {
1534                    return Err(error.into());
1535                }
1536            }
1537        };
1538
1539        match OpenOptions::new()
1540            .read(true)
1541            .open(directory.join(Self::PLOT_FILE))
1542        {
1543            Ok(plot_file) => {
1544                effective_disk_usage += plot_file.size()?;
1545            }
1546            Err(error) => {
1547                if error.kind() == io::ErrorKind::NotFound {
1548                    // File is not stored on disk
1549                } else {
1550                    return Err(error.into());
1551                }
1552            }
1553        };
1554
1555        match OpenOptions::new()
1556            .read(true)
1557            .open(directory.join(DiskPieceCache::FILE_NAME))
1558        {
1559            Ok(piece_cache) => {
1560                effective_disk_usage += piece_cache.size()?;
1561            }
1562            Err(error) => {
1563                if error.kind() == io::ErrorKind::NotFound {
1564                    // File is not stored on disk
1565                } else {
1566                    return Err(error.into());
1567                }
1568            }
1569        };
1570
1571        Ok(effective_disk_usage)
1572    }
1573
1574    /// Read all sectors metadata
1575    pub fn read_all_sectors_metadata(
1576        directory: &Path,
1577    ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1578        let metadata_file = DirectIoFile::open(directory.join(Self::METADATA_FILE))?;
1579
1580        let metadata_size = metadata_file.size()?;
1581        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1582
1583        let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1584        metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1585
1586        let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1587            .map_err(|error| {
1588                io::Error::other(format!("Failed to decode metadata header: {error}"))
1589            })?;
1590
1591        if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1592            return Err(io::Error::other(format!(
1593                "Unsupported metadata version {}",
1594                metadata_header.version
1595            )));
1596        }
1597
1598        let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1599            ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1600        );
1601
1602        let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1603        for sector_index in 0..metadata_header.plotted_sector_count {
1604            metadata_file.read_exact_at(
1605                &mut sector_metadata_bytes,
1606                RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1607            )?;
1608            sectors_metadata.push(
1609                SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1610                    |error| io::Error::other(format!("Failed to decode sector metadata: {error}")),
1611                )?,
1612            );
1613        }
1614
1615        Ok(sectors_metadata)
1616    }
1617
1618    /// ID of this farm
1619    pub fn id(&self) -> &FarmId {
1620        self.single_disk_farm_info.id()
1621    }
1622
1623    /// Info of this farm
1624    pub fn info(&self) -> &SingleDiskFarmInfo {
1625        &self.single_disk_farm_info
1626    }
1627
1628    /// Number of sectors in this farm
1629    pub fn total_sectors_count(&self) -> SectorIndex {
1630        self.total_sectors_count
1631    }
1632
1633    /// Read information about sectors plotted so far
1634    pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1635        SingleDiskPlottedSectors {
1636            public_key: *self.single_disk_farm_info.public_key(),
1637            pieces_in_sector: self.pieces_in_sector,
1638            farmer_protocol_info: self.farmer_protocol_info,
1639            sectors_metadata: Arc::clone(&self.sectors_metadata),
1640        }
1641    }
1642
1643    /// Get piece cache instance
1644    pub fn piece_cache(&self) -> SingleDiskPieceCache {
1645        self.piece_cache.clone()
1646    }
1647
1648    /// Get plot cache instance
1649    pub fn plot_cache(&self) -> DiskPlotCache {
1650        self.plot_cache.clone()
1651    }
1652
1653    /// Get piece reader to read plotted pieces later
1654    pub fn piece_reader(&self) -> DiskPieceReader {
1655        self.piece_reader.clone()
1656    }
1657
1658    /// Subscribe to sector updates
1659    pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1660        self.handlers.sector_update.add(callback)
1661    }
1662
1663    /// Subscribe to farming notifications
1664    pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1665        self.handlers.farming_notification.add(callback)
1666    }
1667
1668    /// Subscribe to new solution notification
1669    pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1670        self.handlers.solution.add(callback)
1671    }
1672
1673    /// Run and wait for background threads to exit or return an error
1674    pub async fn run(mut self) -> anyhow::Result<()> {
1675        if let Some(start_sender) = self.start_sender.take() {
1676            // Do not care if anyone is listening on the other side
1677            let _ = start_sender.send(());
1678        }
1679
1680        while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1681            result?;
1682        }
1683
1684        Ok(())
1685    }
1686
1687    /// Wipe everything that belongs to this single disk farm
1688    pub fn wipe(directory: &Path) -> io::Result<()> {
1689        let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1690        match SingleDiskFarmInfo::load_from(directory) {
1691            Ok(Some(single_disk_farm_info)) => {
1692                info!("Found single disk farm {}", single_disk_farm_info.id());
1693            }
1694            Ok(None) => {
1695                return Err(io::Error::new(
1696                    io::ErrorKind::NotFound,
1697                    format!(
1698                        "Single disk farm info not found at {}",
1699                        single_disk_info_info_path.display()
1700                    ),
1701                ));
1702            }
1703            Err(error) => {
1704                warn!("Found unknown single disk farm: {}", error);
1705            }
1706        }
1707
1708        {
1709            let plot = directory.join(Self::PLOT_FILE);
1710            if plot.exists() {
1711                info!("Deleting plot file at {}", plot.display());
1712                fs::remove_file(plot)?;
1713            }
1714        }
1715        {
1716            let metadata = directory.join(Self::METADATA_FILE);
1717            if metadata.exists() {
1718                info!("Deleting metadata file at {}", metadata.display());
1719                fs::remove_file(metadata)?;
1720            }
1721        }
1722        // TODO: Identity should be able to wipe itself instead of assuming a specific file name
1723        //  here
1724        {
1725            let identity = directory.join("identity.bin");
1726            if identity.exists() {
1727                info!("Deleting identity file at {}", identity.display());
1728                fs::remove_file(identity)?;
1729            }
1730        }
1731
1732        DiskPieceCache::wipe(directory)?;
1733
1734        info!(
1735            "Deleting info file at {}",
1736            single_disk_info_info_path.display()
1737        );
1738        fs::remove_file(single_disk_info_info_path)
1739    }
1740
1741    /// Check the farm for corruption and repair errors (caused by disk errors or something else),
1742    /// returns an error when irrecoverable errors occur.
1743    pub fn scrub(
1744        directory: &Path,
1745        disable_farm_locking: bool,
1746        target: ScrubTarget,
1747        dry_run: bool,
1748    ) -> Result<(), SingleDiskFarmScrubError> {
1749        let span = Span::current();
1750
1751        if dry_run {
1752            info!("Dry run is used, no changes will be written to disk");
1753        }
1754
1755        if target.metadata() || target.plot() {
1756            let info = {
1757                let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1758                info!(path = %file.display(), "Checking info file");
1759
1760                match SingleDiskFarmInfo::load_from(directory) {
1761                    Ok(Some(info)) => info,
1762                    Ok(None) => {
1763                        return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1764                    }
1765                    Err(error) => {
1766                        return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1767                    }
1768                }
1769            };
1770
1771            let _single_disk_farm_info_lock = if disable_farm_locking {
1772                None
1773            } else {
1774                Some(
1775                    SingleDiskFarmInfo::try_lock(directory)
1776                        .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1777                )
1778            };
1779
1780            let identity = {
1781                let file = directory.join(Identity::FILE_NAME);
1782                info!(path = %file.display(), "Checking identity file");
1783
1784                match Identity::open(directory) {
1785                    Ok(Some(identity)) => identity,
1786                    Ok(None) => {
1787                        return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1788                    }
1789                    Err(error) => {
1790                        return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1791                    }
1792                }
1793            };
1794
1795            if PublicKey::from(identity.public.to_bytes()) != *info.public_key() {
1796                return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1797                    identity: PublicKey::from(identity.public.to_bytes()),
1798                    info: *info.public_key(),
1799                });
1800            }
1801
1802            let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1803
1804            let metadata_file_path = directory.join(Self::METADATA_FILE);
1805            let (metadata_file, mut metadata_header) = {
1806                info!(path = %metadata_file_path.display(), "Checking metadata file");
1807
1808                let metadata_file = match OpenOptions::new()
1809                    .read(true)
1810                    .write(!dry_run)
1811                    .open(&metadata_file_path)
1812                {
1813                    Ok(metadata_file) => metadata_file,
1814                    Err(error) => {
1815                        return Err(if error.kind() == io::ErrorKind::NotFound {
1816                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1817                                file: metadata_file_path,
1818                            }
1819                        } else {
1820                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1821                                file: metadata_file_path,
1822                                error,
1823                            }
1824                        });
1825                    }
1826                };
1827
1828                // Error doesn't matter here
1829                let _ = metadata_file.advise_sequential_access();
1830
1831                let metadata_size = match metadata_file.size() {
1832                    Ok(metadata_size) => metadata_size,
1833                    Err(error) => {
1834                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1835                            file: metadata_file_path,
1836                            error,
1837                        });
1838                    }
1839                };
1840
1841                if metadata_size < RESERVED_PLOT_METADATA {
1842                    return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1843                        file: metadata_file_path,
1844                        reserved_size: RESERVED_PLOT_METADATA,
1845                        size: metadata_size,
1846                    });
1847                }
1848
1849                let mut metadata_header = {
1850                    let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1851
1852                    if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1853                        return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1854                            file: metadata_file_path,
1855                            size: RESERVED_PLOT_METADATA,
1856                            offset: 0,
1857                            error,
1858                        });
1859                    }
1860
1861                    PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1862                        .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1863                };
1864
1865                if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1866                    return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1867                        metadata_header.version,
1868                    ));
1869                }
1870
1871                let plotted_sector_count = metadata_header.plotted_sector_count;
1872
1873                let expected_metadata_size = RESERVED_PLOT_METADATA
1874                    + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1875
1876                if metadata_size < expected_metadata_size {
1877                    warn!(
1878                        %metadata_size,
1879                        %expected_metadata_size,
1880                        "Metadata file size is smaller than expected, shrinking number of plotted \
1881                        sectors to correct value"
1882                    );
1883
1884                    metadata_header.plotted_sector_count =
1885                        ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1886                            as SectorIndex;
1887                    let metadata_header_bytes = metadata_header.encode();
1888
1889                    if !dry_run
1890                        && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1891                    {
1892                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1893                            file: metadata_file_path,
1894                            size: metadata_header_bytes.len() as u64,
1895                            offset: 0,
1896                            error,
1897                        });
1898                    }
1899                }
1900
1901                (metadata_file, metadata_header)
1902            };
1903
1904            let pieces_in_sector = info.pieces_in_sector();
1905            let sector_size = sector_size(pieces_in_sector) as u64;
1906
1907            let plot_file_path = directory.join(Self::PLOT_FILE);
1908            let plot_file = {
1909                let plot_file_path = directory.join(Self::PLOT_FILE);
1910                info!(path = %plot_file_path.display(), "Checking plot file");
1911
1912                let plot_file = match OpenOptions::new()
1913                    .read(true)
1914                    .write(!dry_run)
1915                    .open(&plot_file_path)
1916                {
1917                    Ok(plot_file) => plot_file,
1918                    Err(error) => {
1919                        return Err(if error.kind() == io::ErrorKind::NotFound {
1920                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1921                                file: plot_file_path,
1922                            }
1923                        } else {
1924                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1925                                file: plot_file_path,
1926                                error,
1927                            }
1928                        });
1929                    }
1930                };
1931
1932                // Error doesn't matter here
1933                let _ = plot_file.advise_sequential_access();
1934
1935                let plot_size = match plot_file.size() {
1936                    Ok(metadata_size) => metadata_size,
1937                    Err(error) => {
1938                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1939                            file: plot_file_path,
1940                            error,
1941                        });
1942                    }
1943                };
1944
1945                let min_expected_plot_size =
1946                    u64::from(metadata_header.plotted_sector_count) * sector_size;
1947                if plot_size < min_expected_plot_size {
1948                    warn!(
1949                        %plot_size,
1950                        %min_expected_plot_size,
1951                        "Plot file size is smaller than expected, shrinking number of plotted \
1952                        sectors to correct value"
1953                    );
1954
1955                    metadata_header.plotted_sector_count = (plot_size / sector_size) as SectorIndex;
1956                    let metadata_header_bytes = metadata_header.encode();
1957
1958                    if !dry_run
1959                        && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1960                    {
1961                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1962                            file: plot_file_path,
1963                            size: metadata_header_bytes.len() as u64,
1964                            offset: 0,
1965                            error,
1966                        });
1967                    }
1968                }
1969
1970                plot_file
1971            };
1972
1973            let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1974
1975            info!("Checking sectors and corresponding metadata");
1976            (0..metadata_header.plotted_sector_count)
1977                .into_par_iter()
1978                .map_init(
1979                    || vec![0u8; Record::SIZE],
1980                    |scratch_buffer, sector_index| {
1981                        let _span_guard = span.enter();
1982
1983                        let offset = RESERVED_PLOT_METADATA
1984                            + u64::from(sector_index) * sector_metadata_size as u64;
1985                        if let Err(error) = metadata_file
1986                            .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
1987                        {
1988                            warn!(
1989                                path = %metadata_file_path.display(),
1990                                %error,
1991                                %offset,
1992                                size = %sector_metadata_size,
1993                                %sector_index,
1994                                "Failed to read sector metadata, replacing with dummy expired \
1995                                sector metadata"
1996                            );
1997
1998                            if !dry_run {
1999                                write_dummy_sector_metadata(
2000                                    &metadata_file,
2001                                    &metadata_file_path,
2002                                    sector_index,
2003                                    pieces_in_sector,
2004                                )?;
2005                            }
2006                            return Ok(());
2007                        }
2008
2009                        let sector_metadata = match SectorMetadataChecksummed::decode(
2010                            &mut &scratch_buffer[..sector_metadata_size],
2011                        ) {
2012                            Ok(sector_metadata) => sector_metadata,
2013                            Err(error) => {
2014                                warn!(
2015                                    path = %metadata_file_path.display(),
2016                                    %error,
2017                                    %sector_index,
2018                                    "Failed to decode sector metadata, replacing with dummy \
2019                                    expired sector metadata"
2020                                );
2021
2022                                if !dry_run {
2023                                    write_dummy_sector_metadata(
2024                                        &metadata_file,
2025                                        &metadata_file_path,
2026                                        sector_index,
2027                                        pieces_in_sector,
2028                                    )?;
2029                                }
2030                                return Ok(());
2031                            }
2032                        };
2033
2034                        if sector_metadata.sector_index != sector_index {
2035                            warn!(
2036                                path = %metadata_file_path.display(),
2037                                %sector_index,
2038                                found_sector_index = sector_metadata.sector_index,
2039                                "Sector index mismatch, replacing with dummy expired sector \
2040                                metadata"
2041                            );
2042
2043                            if !dry_run {
2044                                write_dummy_sector_metadata(
2045                                    &metadata_file,
2046                                    &metadata_file_path,
2047                                    sector_index,
2048                                    pieces_in_sector,
2049                                )?;
2050                            }
2051                            return Ok(());
2052                        }
2053
2054                        if sector_metadata.pieces_in_sector != pieces_in_sector {
2055                            warn!(
2056                                path = %metadata_file_path.display(),
2057                                %sector_index,
2058                                %pieces_in_sector,
2059                                found_pieces_in_sector = sector_metadata.pieces_in_sector,
2060                                "Pieces in sector mismatch, replacing with dummy expired sector \
2061                                metadata"
2062                            );
2063
2064                            if !dry_run {
2065                                write_dummy_sector_metadata(
2066                                    &metadata_file,
2067                                    &metadata_file_path,
2068                                    sector_index,
2069                                    pieces_in_sector,
2070                                )?;
2071                            }
2072                            return Ok(());
2073                        }
2074
2075                        if target.plot() {
2076                            let mut hasher = blake3::Hasher::new();
2077                            // Read sector bytes and compute checksum
2078                            for offset_in_sector in
2079                                sector_bytes_range.clone().step_by(scratch_buffer.len())
2080                            {
2081                                let offset =
2082                                    u64::from(sector_index) * sector_size + offset_in_sector as u64;
2083                                let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2084                                    .min(sector_bytes_range.end)
2085                                    - offset_in_sector;
2086
2087                                let bytes = &mut scratch_buffer[..bytes_to_read];
2088
2089                                if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2090                                    warn!(
2091                                        path = %plot_file_path.display(),
2092                                        %error,
2093                                        %sector_index,
2094                                        %offset,
2095                                        size = %bytes.len() as u64,
2096                                        "Failed to read sector bytes"
2097                                    );
2098
2099                                    continue;
2100                                }
2101
2102                                hasher.update(bytes);
2103                            }
2104
2105                            let actual_checksum = *hasher.finalize().as_bytes();
2106                            let mut expected_checksum = [0; Blake3Hash::SIZE];
2107                            {
2108                                let offset = u64::from(sector_index) * sector_size
2109                                    + sector_bytes_range.end as u64;
2110                                if let Err(error) =
2111                                    plot_file.read_exact_at(&mut expected_checksum, offset)
2112                                {
2113                                    warn!(
2114                                        path = %plot_file_path.display(),
2115                                        %error,
2116                                        %sector_index,
2117                                        %offset,
2118                                        size = %expected_checksum.len() as u64,
2119                                        "Failed to read sector checksum bytes"
2120                                    );
2121                                }
2122                            }
2123
2124                            // Verify checksum
2125                            if actual_checksum != expected_checksum {
2126                                warn!(
2127                                    path = %plot_file_path.display(),
2128                                    %sector_index,
2129                                    actual_checksum = %hex::encode(actual_checksum),
2130                                    expected_checksum = %hex::encode(expected_checksum),
2131                                    "Plotted sector checksum mismatch, replacing with dummy \
2132                                    expired sector"
2133                                );
2134
2135                                if !dry_run {
2136                                    write_dummy_sector_metadata(
2137                                        &metadata_file,
2138                                        &metadata_file_path,
2139                                        sector_index,
2140                                        pieces_in_sector,
2141                                    )?;
2142                                }
2143
2144                                scratch_buffer.fill(0);
2145
2146                                hasher.reset();
2147                                // Fill sector with zeroes and compute checksum
2148                                for offset_in_sector in
2149                                    sector_bytes_range.clone().step_by(scratch_buffer.len())
2150                                {
2151                                    let offset = u64::from(sector_index) * sector_size
2152                                        + offset_in_sector as u64;
2153                                    let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2154                                        .min(sector_bytes_range.end)
2155                                        - offset_in_sector;
2156                                    let bytes = &mut scratch_buffer[..bytes_to_write];
2157
2158                                    if !dry_run
2159                                        && let Err(error) = plot_file.write_all_at(bytes, offset)
2160                                    {
2161                                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2162                                            file: plot_file_path.clone(),
2163                                            size: scratch_buffer.len() as u64,
2164                                            offset,
2165                                            error,
2166                                        });
2167                                    }
2168
2169                                    hasher.update(bytes);
2170                                }
2171                                // Write checksum
2172                                {
2173                                    let checksum = *hasher.finalize().as_bytes();
2174                                    let offset = u64::from(sector_index) * sector_size
2175                                        + sector_bytes_range.end as u64;
2176                                    if !dry_run
2177                                        && let Err(error) =
2178                                            plot_file.write_all_at(&checksum, offset)
2179                                    {
2180                                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2181                                            file: plot_file_path.clone(),
2182                                            size: checksum.len() as u64,
2183                                            offset,
2184                                            error,
2185                                        });
2186                                    }
2187                                }
2188
2189                                return Ok(());
2190                            }
2191                        }
2192
2193                        trace!(%sector_index, "Sector is in good shape");
2194
2195                        Ok(())
2196                    },
2197                )
2198                .try_for_each({
2199                    let span = &span;
2200                    let checked_sectors = AtomicUsize::new(0);
2201
2202                    move |result| {
2203                        let _span_guard = span.enter();
2204
2205                        let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2206                        if checked_sectors > 1 && checked_sectors % 10 == 0 {
2207                            info!(
2208                                "Checked {}/{} sectors",
2209                                checked_sectors, metadata_header.plotted_sector_count
2210                            );
2211                        }
2212
2213                        result
2214                    }
2215                })?;
2216        }
2217
2218        if target.cache() {
2219            Self::scrub_cache(directory, dry_run)?;
2220        }
2221
2222        info!("Farm check completed");
2223
2224        Ok(())
2225    }
2226
2227    fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2228        let span = Span::current();
2229
2230        let file = directory.join(DiskPieceCache::FILE_NAME);
2231        info!(path = %file.display(), "Checking cache file");
2232
2233        let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2234            Ok(plot_file) => plot_file,
2235            Err(error) => {
2236                return if error.kind() == io::ErrorKind::NotFound {
2237                    warn!(
2238                        file = %file.display(),
2239                        "Cache file does not exist, this is expected in farming cluster"
2240                    );
2241                    Ok(())
2242                } else {
2243                    Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2244                };
2245            }
2246        };
2247
2248        // Error doesn't matter here
2249        let _ = cache_file.advise_sequential_access();
2250
2251        let cache_size = match cache_file.size() {
2252            Ok(cache_size) => cache_size,
2253            Err(error) => {
2254                return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2255            }
2256        };
2257
2258        let element_size = DiskPieceCache::element_size();
2259        let number_of_cached_elements = cache_size / u64::from(element_size);
2260        let dummy_element = vec![0; element_size as usize];
2261        (0..number_of_cached_elements)
2262            .into_par_iter()
2263            .map_with(vec![0; element_size as usize], |element, cache_offset| {
2264                let _span_guard = span.enter();
2265
2266                let offset = cache_offset * u64::from(element_size);
2267                if let Err(error) = cache_file.read_exact_at(element, offset) {
2268                    warn!(
2269                        path = %file.display(),
2270                        %cache_offset,
2271                        size = %element.len() as u64,
2272                        %offset,
2273                        %error,
2274                        "Failed to read cached piece, replacing with dummy element"
2275                    );
2276
2277                    if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2278                    {
2279                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2280                            file: file.clone(),
2281                            size: u64::from(element_size),
2282                            offset,
2283                            error,
2284                        });
2285                    }
2286
2287                    return Ok(());
2288                }
2289
2290                let (index_and_piece_bytes, expected_checksum) =
2291                    element.split_at(element_size as usize - Blake3Hash::SIZE);
2292                let actual_checksum = blake3_hash(index_and_piece_bytes);
2293                if *actual_checksum != *expected_checksum && element != &dummy_element {
2294                    warn!(
2295                        %cache_offset,
2296                        actual_checksum = %hex::encode(actual_checksum),
2297                        expected_checksum = %hex::encode(expected_checksum),
2298                        "Cached piece checksum mismatch, replacing with dummy element"
2299                    );
2300
2301                    if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2302                    {
2303                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2304                            file: file.clone(),
2305                            size: u64::from(element_size),
2306                            offset,
2307                            error,
2308                        });
2309                    }
2310
2311                    return Ok(());
2312                }
2313
2314                Ok(())
2315            })
2316            .try_for_each({
2317                let span = &span;
2318                let checked_elements = AtomicUsize::new(0);
2319
2320                move |result| {
2321                    let _span_guard = span.enter();
2322
2323                    let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2324                    if checked_elements > 1 && checked_elements % 1000 == 0 {
2325                        info!(
2326                            "Checked {}/{} cache elements",
2327                            checked_elements, number_of_cached_elements
2328                        );
2329                    }
2330
2331                    result
2332                }
2333            })?;
2334
2335        Ok(())
2336    }
2337}
2338
2339fn write_dummy_sector_metadata(
2340    metadata_file: &File,
2341    metadata_file_path: &Path,
2342    sector_index: SectorIndex,
2343    pieces_in_sector: u16,
2344) -> Result<(), SingleDiskFarmScrubError> {
2345    let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2346        sector_index,
2347        pieces_in_sector,
2348        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2349        history_size: HistorySize::from(SegmentIndex::ZERO),
2350    })
2351    .encode();
2352    let sector_offset = RESERVED_PLOT_METADATA
2353        + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2354    metadata_file
2355        .write_all_at(&dummy_sector_bytes, sector_offset)
2356        .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2357            file: metadata_file_path.to_path_buf(),
2358            size: dummy_sector_bytes.len() as u64,
2359            offset: sector_offset,
2360            error,
2361        })
2362}