subspace_farmer/
farm.rs

1//! Abstract farm API
2//!
3//! This module provides a bunch of traits and simple data structures that serve as a layer of
4//! abstraction that improves composition without having assumptions about implementation details.
5//!
6//! Implementations can be local (backed by local disk) and remote (connected via network in some
7//! way). This crate provides a few of such implementations, but more can be created externally as
8//! well if needed without modifying the library itself.
9
10use async_trait::async_trait;
11use derive_more::{Display, From};
12use futures::Stream;
13use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
14use serde::{Deserialize, Serialize};
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::time::Duration;
19use std::{fmt, io};
20use subspace_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
21use subspace_core_primitives::sectors::SectorIndex;
22use subspace_core_primitives::segments::SegmentIndex;
23use subspace_farmer_components::auditing::AuditingError;
24use subspace_farmer_components::plotting::PlottedSector;
25use subspace_farmer_components::proving::ProvingError;
26use subspace_networking::libp2p::kad::RecordKey;
27use subspace_rpc_primitives::SolutionResponse;
28use thiserror::Error;
29use ulid::Ulid;
30
31pub mod plotted_pieces;
32
33/// Erased error type
34pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
35/// Type alias used for event handlers
36pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
37
38/// Getter for plotted sectors
39#[async_trait]
40pub trait PlottedSectors: Send + Sync + fmt::Debug {
41    /// Get already plotted sectors
42    async fn get(
43        &self,
44    ) -> Result<
45        Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
46        FarmError,
47    >;
48}
49
50/// An identifier for a cache, can be used for in logs, thread names, etc.
51#[derive(
52    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
53)]
54#[serde(untagged)]
55pub enum PieceCacheId {
56    /// Cache ID
57    Ulid(Ulid),
58}
59
60impl Encode for PieceCacheId {
61    #[inline]
62    fn size_hint(&self) -> usize {
63        1_usize
64            + match self {
65                PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
66            }
67    }
68
69    #[inline]
70    fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
71        match self {
72            PieceCacheId::Ulid(ulid) => {
73                output.push_byte(0);
74                Encode::encode_to(&ulid.0, output);
75            }
76        }
77    }
78}
79
80impl EncodeLike for PieceCacheId {}
81
82impl Decode for PieceCacheId {
83    #[inline]
84    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
85        match input
86            .read_byte()
87            .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))?
88        {
89            0 => u128::decode(input)
90                .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
91                .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")),
92            _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()),
93        }
94    }
95}
96
97#[allow(clippy::new_without_default)]
98impl PieceCacheId {
99    /// Creates new ID
100    #[inline]
101    pub fn new() -> Self {
102        Self::Ulid(Ulid::new())
103    }
104}
105
106/// Offset wrapper for pieces in [`PieceCache`]
107#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
108#[repr(transparent)]
109pub struct PieceCacheOffset(pub(crate) u32);
110
111/// Abstract piece cache implementation.
112///
113/// Piece cache is a simple container that stores concatenated pieces in a flat file at specific
114/// offsets. Implementation doesn't have to be local though, cache can be remote somewhere on the
115/// network, APIs are intentionally async to account for that.
116#[async_trait]
117pub trait PieceCache: Send + Sync + fmt::Debug {
118    /// ID of this cache
119    fn id(&self) -> &PieceCacheId;
120
121    /// Max number of elements in this cache
122    fn max_num_elements(&self) -> u32;
123
124    /// Contents of this piece cache.
125    ///
126    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
127    /// doesn't happen for the same piece being accessed!
128    async fn contents(
129        &self,
130    ) -> Result<
131        Box<
132            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
133                + Unpin
134                + Send
135                + '_,
136        >,
137        FarmError,
138    >;
139
140    /// Store piece in cache at specified offset, replacing existing piece if there is one.
141    ///
142    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
143    /// doesn't happen for the same piece being accessed!
144    async fn write_piece(
145        &self,
146        offset: PieceCacheOffset,
147        piece_index: PieceIndex,
148        piece: &Piece,
149    ) -> Result<(), FarmError>;
150
151    /// Read piece index from cache at specified offset.
152    ///
153    /// Returns `None` if offset is out of range.
154    ///
155    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
156    /// doesn't happen for the same piece being accessed!
157    async fn read_piece_index(
158        &self,
159        offset: PieceCacheOffset,
160    ) -> Result<Option<PieceIndex>, FarmError>;
161
162    /// Read piece from cache at specified offset.
163    ///
164    /// Returns `None` if offset is out of range.
165    ///
166    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
167    /// doesn't happen for the same piece being accessed!
168    async fn read_piece(
169        &self,
170        offset: PieceCacheOffset,
171    ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
172
173    /// Read pieces from cache at specified offsets.
174    ///
175    /// Number of elements in returned stream is the same as number of unique `offsets`.
176    /// Returns `None` for offsets that are out of range.
177    ///
178    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
179    /// doesn't happen for the same pieces being accessed!
180    async fn read_pieces(
181        &self,
182        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
183    ) -> Result<
184        Box<
185            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
186                + Send
187                + Unpin
188                + '_,
189        >,
190        FarmError,
191    >;
192}
193
194/// Result of piece storing check
195#[derive(Debug, Copy, Clone, Encode, Decode)]
196pub enum MaybePieceStoredResult {
197    /// Piece is not stored already, and can't be added because the cache/plot is full.
198    No,
199    /// Cache might have a vacant slot to store this piece.
200    /// Vacant slots are not guaranteed, they can be overwritten by another piece or newly plotted
201    /// sector at any time.
202    Vacant,
203    /// Piece is already stored in the cache.
204    Yes,
205}
206
207/// Abstract plot cache implementation.
208///
209/// Plot cache is a cache that exploits space towards the end of the plot that is not yet occupied
210/// by sectors in order to increase effective caching space, which helps with plotting speed for
211/// small farmers since they don't need to retrieve the same pieces from the network over and over
212/// again, which is slower and uses a lot of Internet bandwidth.
213#[async_trait]
214pub trait PlotCache: Send + Sync + fmt::Debug {
215    /// Check if a piece is already stored in this cache, or it can be added to this cache.
216    /// The piece is not guaranteed to be stored, because it might be overwritten with a new
217    /// sector any time.
218    async fn is_piece_maybe_stored(
219        &self,
220        key: &RecordKey,
221    ) -> Result<MaybePieceStoredResult, FarmError>;
222
223    /// Store piece in cache if there is free space, and return `Ok(true)`.
224    /// Returns `Ok(false)` if there is no free space, or the farm or process is shutting down.
225    async fn try_store_piece(
226        &self,
227        piece_index: PieceIndex,
228        piece: &Piece,
229    ) -> Result<bool, FarmError>;
230
231    /// Read piece from cache.
232    ///
233    /// Returns `None` if not cached.
234    async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
235}
236
237/// Auditing details
238#[derive(Debug, Copy, Clone, Encode, Decode)]
239pub struct AuditingDetails {
240    /// Number of sectors that were audited
241    pub sectors_count: SectorIndex,
242    /// Audit duration
243    pub time: Duration,
244}
245
246/// Result of the proving
247#[derive(Debug, Copy, Clone, Encode, Decode)]
248pub enum ProvingResult {
249    /// Proved successfully and accepted by the node
250    Success,
251    /// Proving took too long
252    Timeout,
253    /// Managed to prove within time limit, but node rejected solution, likely due to timeout on its
254    /// end
255    Rejected,
256    /// Proving failed altogether
257    Failed,
258}
259
260impl fmt::Display for ProvingResult {
261    #[inline]
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        f.write_str(match self {
264            Self::Success => "Success",
265            Self::Timeout => "Timeout",
266            Self::Rejected => "Rejected",
267            Self::Failed => "Failed",
268        })
269    }
270}
271
272/// Proving details
273#[derive(Debug, Copy, Clone, Encode, Decode)]
274pub struct ProvingDetails {
275    /// Whether proving ended up being successful
276    pub result: ProvingResult,
277    /// Audit duration
278    pub time: Duration,
279}
280
281/// Special decoded farming error
282#[derive(Debug, Encode, Decode)]
283pub struct DecodedFarmingError {
284    /// String representation of an error
285    error: String,
286    /// Whether error is fatal
287    is_fatal: bool,
288}
289
290impl fmt::Display for DecodedFarmingError {
291    #[inline]
292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293        self.error.fmt(f)
294    }
295}
296
297/// Errors that happen during farming
298#[derive(Debug, Error)]
299pub enum FarmingError {
300    /// Failed to subscribe to slot info notifications
301    #[error("Failed to subscribe to slot info notifications: {error}")]
302    FailedToSubscribeSlotInfo {
303        /// Lower-level error
304        error: anyhow::Error,
305    },
306    /// Failed to retrieve farmer info
307    #[error("Failed to retrieve farmer info: {error}")]
308    FailedToGetFarmerInfo {
309        /// Lower-level error
310        error: anyhow::Error,
311    },
312    /// Slot info notification stream ended
313    #[error("Slot info notification stream ended")]
314    SlotNotificationStreamEnded,
315    /// Low-level auditing error
316    #[error("Low-level auditing error: {0}")]
317    LowLevelAuditing(#[from] AuditingError),
318    /// Low-level proving error
319    #[error("Low-level proving error: {0}")]
320    LowLevelProving(#[from] ProvingError),
321    /// I/O error occurred
322    #[error("Farming I/O error: {0}")]
323    Io(#[from] io::Error),
324    /// Decoded farming error
325    #[error("Decoded farming error {0}")]
326    Decoded(DecodedFarmingError),
327}
328
329impl Encode for FarmingError {
330    #[inline]
331    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
332        let error = DecodedFarmingError {
333            error: self.to_string(),
334            is_fatal: self.is_fatal(),
335        };
336
337        error.encode_to(dest)
338    }
339}
340
341impl Decode for FarmingError {
342    #[inline]
343    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
344        DecodedFarmingError::decode(input).map(FarmingError::Decoded)
345    }
346}
347
348impl FarmingError {
349    /// String variant of the error, primarily for monitoring purposes
350    #[inline]
351    pub fn str_variant(&self) -> &str {
352        match self {
353            FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
354            FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
355            FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
356            FarmingError::LowLevelProving(_) => "LowLevelProving",
357            FarmingError::Io(_) => "Io",
358            FarmingError::Decoded(_) => "Decoded",
359            FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
360        }
361    }
362
363    /// Whether this error is fatal and makes farm unusable
364    pub fn is_fatal(&self) -> bool {
365        match self {
366            FarmingError::FailedToSubscribeSlotInfo { .. } => true,
367            FarmingError::FailedToGetFarmerInfo { .. } => true,
368            FarmingError::LowLevelAuditing(_) => true,
369            FarmingError::LowLevelProving(error) => error.is_fatal(),
370            FarmingError::Io(_) => true,
371            FarmingError::Decoded(error) => error.is_fatal,
372            FarmingError::SlotNotificationStreamEnded => true,
373        }
374    }
375}
376
377/// Various farming notifications
378#[derive(Debug, Clone, Encode, Decode)]
379pub enum FarmingNotification {
380    /// Auditing
381    Auditing(AuditingDetails),
382    /// Proving
383    Proving(ProvingDetails),
384    /// Non-fatal farming error
385    NonFatalError(Arc<FarmingError>),
386}
387
388/// Details about sector currently being plotted
389#[derive(Debug, Clone, Encode, Decode)]
390pub enum SectorPlottingDetails {
391    /// Starting plotting of a sector
392    Starting {
393        /// Progress so far in % (not including this sector)
394        progress: f32,
395        /// Whether sector is being replotted
396        replotting: bool,
397        /// Whether this is the last sector queued so far
398        last_queued: bool,
399    },
400    /// Downloading sector pieces
401    Downloading,
402    /// Downloaded sector pieces
403    Downloaded(Duration),
404    /// Encoding sector pieces
405    Encoding,
406    /// Encoded sector pieces
407    Encoded(Duration),
408    /// Writing sector
409    Writing,
410    /// Written sector
411    Written(Duration),
412    /// Finished plotting
413    Finished {
414        /// Information about plotted sector
415        plotted_sector: PlottedSector,
416        /// Information about old plotted sector that was replaced
417        old_plotted_sector: Option<PlottedSector>,
418        /// How much time it took to plot a sector
419        time: Duration,
420    },
421    /// Plotting failed
422    Error(String),
423}
424
425/// Details about sector expiration
426#[derive(Debug, Clone, Encode, Decode)]
427pub enum SectorExpirationDetails {
428    /// Sector expiration became known
429    Determined {
430        /// Segment index at which sector expires
431        expires_at: SegmentIndex,
432    },
433    /// Sector will expire at the next segment index and should be replotted
434    AboutToExpire,
435    /// Sector already expired
436    Expired,
437}
438
439/// Various sector updates
440#[derive(Debug, Clone, Encode, Decode)]
441pub enum SectorUpdate {
442    /// Sector is being plotted
443    Plotting(SectorPlottingDetails),
444    /// Sector expiration information updated
445    Expiration(SectorExpirationDetails),
446}
447
448/// Abstract piece reader implementation
449#[async_trait]
450pub trait PieceReader: Send + Sync + fmt::Debug {
451    /// Read piece from sector by offset, `None` means input parameters are incorrect or piece
452    /// reader was shut down
453    async fn read_piece(
454        &self,
455        sector_index: SectorIndex,
456        piece_offset: PieceOffset,
457    ) -> Result<Option<Piece>, FarmError>;
458}
459
460/// Opaque handler ID for event handlers, once dropped handler will be removed automatically
461pub trait HandlerId: Send + Sync + fmt::Debug {
462    /// Consumes [`HandlerId`] and prevents handler from being removed automatically.
463    fn detach(&self);
464}
465
466impl HandlerId for event_listener_primitives::HandlerId {
467    #[inline]
468    fn detach(&self) {
469        self.detach();
470    }
471}
472
473/// An identifier for a farm, can be used for in logs, thread names, etc.
474#[derive(
475    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
476)]
477#[serde(untagged)]
478pub enum FarmId {
479    /// Farm ID
480    Ulid(Ulid),
481}
482
483impl Encode for FarmId {
484    #[inline]
485    fn size_hint(&self) -> usize {
486        1_usize
487            + match self {
488                FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
489            }
490    }
491
492    #[inline]
493    fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
494        match self {
495            FarmId::Ulid(ulid) => {
496                output.push_byte(0);
497                Encode::encode_to(&ulid.0, output);
498            }
499        }
500    }
501}
502
503impl EncodeLike for FarmId {}
504
505impl Decode for FarmId {
506    #[inline]
507    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
508        match input
509            .read_byte()
510            .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
511        {
512            0 => u128::decode(input)
513                .map(|ulid| FarmId::Ulid(Ulid(ulid)))
514                .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
515            _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
516        }
517    }
518}
519
520#[allow(clippy::new_without_default)]
521impl FarmId {
522    /// Creates new ID
523    #[inline]
524    pub fn new() -> Self {
525        Self::Ulid(Ulid::new())
526    }
527}
528
529/// Abstract farm implementation
530#[async_trait(?Send)]
531pub trait Farm {
532    /// ID of this farm
533    fn id(&self) -> &FarmId;
534
535    /// Number of sectors in this farm
536    fn total_sectors_count(&self) -> SectorIndex;
537
538    /// Get plotted sectors instance
539    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;
540
541    /// Get piece reader to read plotted pieces later
542    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;
543
544    /// Subscribe to sector updates
545    fn on_sector_update(
546        &self,
547        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
548    ) -> Box<dyn HandlerId>;
549
550    /// Subscribe to farming notifications
551    fn on_farming_notification(
552        &self,
553        callback: HandlerFn<FarmingNotification>,
554    ) -> Box<dyn HandlerId>;
555
556    /// Subscribe to new solution notification
557    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;
558
559    /// Run and wait for background threads to exit or return an error
560    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
561}
562
563#[async_trait]
564impl<T> Farm for Box<T>
565where
566    T: Farm + ?Sized,
567{
568    #[inline]
569    fn id(&self) -> &FarmId {
570        self.as_ref().id()
571    }
572
573    #[inline]
574    fn total_sectors_count(&self) -> SectorIndex {
575        self.as_ref().total_sectors_count()
576    }
577
578    #[inline]
579    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
580        self.as_ref().plotted_sectors()
581    }
582
583    #[inline]
584    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
585        self.as_ref().piece_reader()
586    }
587
588    #[inline]
589    fn on_sector_update(
590        &self,
591        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
592    ) -> Box<dyn HandlerId> {
593        self.as_ref().on_sector_update(callback)
594    }
595
596    #[inline]
597    fn on_farming_notification(
598        &self,
599        callback: HandlerFn<FarmingNotification>,
600    ) -> Box<dyn HandlerId> {
601        self.as_ref().on_farming_notification(callback)
602    }
603
604    #[inline]
605    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
606        self.as_ref().on_solution(callback)
607    }
608
609    #[inline]
610    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
611        (*self).run()
612    }
613}