subspace_farmer/
farm.rs

1//! Abstract farm API
2//!
3//! This module provides a bunch of traits and simple data structures that serve as a layer of
4//! abstraction that improves composition without having assumptions about implementation details.
5//!
6//! Implementations can be local (backed by local disk) and remote (connected via network in some
7//! way). This crate provides a few of such implementations, but more can be created externally as
8//! well if needed without modifying the library itself.
9
10use async_trait::async_trait;
11use derive_more::{Display, From};
12use futures::Stream;
13use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
14use serde::{Deserialize, Serialize};
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::time::Duration;
19use std::{fmt, io};
20use subspace_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
21use subspace_core_primitives::sectors::SectorIndex;
22use subspace_core_primitives::segments::SegmentIndex;
23use subspace_farmer_components::auditing::AuditingError;
24use subspace_farmer_components::plotting::PlottedSector;
25use subspace_farmer_components::proving::ProvingError;
26use subspace_networking::libp2p::kad::RecordKey;
27use subspace_rpc_primitives::SolutionResponse;
28use thiserror::Error;
29use ulid::Ulid;
30
31pub mod plotted_pieces;
32
33/// Erased error type
34pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
35/// Type alias used for event handlers
36pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
37
38/// Getter for plotted sectors
39#[async_trait]
40pub trait PlottedSectors: Send + Sync + fmt::Debug {
41    /// Get already plotted sectors
42    async fn get(
43        &self,
44    ) -> Result<
45        Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
46        FarmError,
47    >;
48}
49
50/// An identifier for a cache, can be used for in logs, thread names, etc.
51#[derive(
52    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
53)]
54#[serde(untagged)]
55pub enum PieceCacheId {
56    /// Cache ID
57    Ulid(Ulid),
58}
59
60impl Encode for PieceCacheId {
61    #[inline]
62    fn size_hint(&self) -> usize {
63        1_usize
64            + match self {
65                PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
66            }
67    }
68
69    #[inline]
70    fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
71        match self {
72            PieceCacheId::Ulid(ulid) => {
73                output.push_byte(0);
74                Encode::encode_to(&ulid.0, output);
75            }
76        }
77    }
78}
79
80impl EncodeLike for PieceCacheId {}
81
82impl Decode for PieceCacheId {
83    #[inline]
84    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
85        match input
86            .read_byte()
87            .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))?
88        {
89            0 => u128::decode(input)
90                .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
91                .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")),
92            _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()),
93        }
94    }
95}
96
97#[allow(clippy::new_without_default)]
98impl PieceCacheId {
99    /// Creates new ID
100    #[inline]
101    pub fn new() -> Self {
102        Self::Ulid(Ulid::new())
103    }
104}
105
106/// Offset wrapper for pieces in [`PieceCache`]
107#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
108#[repr(transparent)]
109pub struct PieceCacheOffset(pub(crate) u32);
110
111/// Abstract piece cache implementation.
112///
113/// Piece cache is a simple container that stores concatenated pieces in a flat file at specific
114/// offsets. Implementation doesn't have to be local though, cache can be remote somewhere on the
115/// network, APIs are intentionally async to account for that.
116#[async_trait]
117pub trait PieceCache: Send + Sync + fmt::Debug {
118    /// ID of this cache
119    fn id(&self) -> &PieceCacheId;
120
121    /// Max number of elements in this cache
122    fn max_num_elements(&self) -> u32;
123
124    /// Contents of this piece cache.
125    ///
126    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
127    /// doesn't happen for the same piece being accessed!
128    async fn contents(
129        &self,
130    ) -> Result<
131        Box<
132            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
133                + Unpin
134                + Send
135                + '_,
136        >,
137        FarmError,
138    >;
139
140    /// Store piece in cache at specified offset, replacing existing piece if there is any.
141    ///
142    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
143    /// doesn't happen for the same piece being accessed!
144    async fn write_piece(
145        &self,
146        offset: PieceCacheOffset,
147        piece_index: PieceIndex,
148        piece: &Piece,
149    ) -> Result<(), FarmError>;
150
151    /// Read piece index from cache at specified offset.
152    ///
153    /// Returns `None` if offset is out of range.
154    ///
155    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
156    /// doesn't happen for the same piece being accessed!
157    async fn read_piece_index(
158        &self,
159        offset: PieceCacheOffset,
160    ) -> Result<Option<PieceIndex>, FarmError>;
161
162    /// Read piece from cache at specified offset.
163    ///
164    /// Returns `None` if offset is out of range.
165    ///
166    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
167    /// doesn't happen for the same piece being accessed!
168    async fn read_piece(
169        &self,
170        offset: PieceCacheOffset,
171    ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
172
173    /// Read pieces from cache at specified offsets.
174    ///
175    /// Number of elements in returned stream is the same as number of unique `offsets`.
176    /// Returns `None` for offsets that are out of range.
177    ///
178    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
179    /// doesn't happen for the same pieces being accessed!
180    async fn read_pieces(
181        &self,
182        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
183    ) -> Result<
184        Box<
185            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
186                + Send
187                + Unpin
188                + '_,
189        >,
190        FarmError,
191    >;
192}
193
194/// Result of piece storing check
195#[derive(Debug, Copy, Clone, Encode, Decode)]
196pub enum MaybePieceStoredResult {
197    /// Definitely not stored
198    No,
199    /// Maybe has vacant slot to store
200    Vacant,
201    /// Maybe still stored
202    Yes,
203}
204
205/// Abstract plot cache implementation.
206///
207/// Plot cache is a cache that exploits space towards the end of the plot that is not yet occupied
208/// by sectors in order to increase effective caching space, which helps with plotting speed for
209/// small farmers since they don't need to retrieve the same pieces from the network over and over
210/// again, which is slower and uses a lot of Internet bandwidth.
211#[async_trait]
212pub trait PlotCache: Send + Sync + fmt::Debug {
213    /// Check if piece is potentially stored in this cache (not guaranteed to be because it might be
214    /// overridden with sector any time)
215    async fn is_piece_maybe_stored(
216        &self,
217        key: &RecordKey,
218    ) -> Result<MaybePieceStoredResult, FarmError>;
219
220    /// Store piece in cache if there is free space, otherwise `Ok(false)` is returned
221    async fn try_store_piece(
222        &self,
223        piece_index: PieceIndex,
224        piece: &Piece,
225    ) -> Result<bool, FarmError>;
226
227    /// Read piece from cache.
228    ///
229    /// Returns `None` if not cached.
230    async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
231}
232
233/// Auditing details
234#[derive(Debug, Copy, Clone, Encode, Decode)]
235pub struct AuditingDetails {
236    /// Number of sectors that were audited
237    pub sectors_count: SectorIndex,
238    /// Audit duration
239    pub time: Duration,
240}
241
242/// Result of the proving
243#[derive(Debug, Copy, Clone, Encode, Decode)]
244pub enum ProvingResult {
245    /// Proved successfully and accepted by the node
246    Success,
247    /// Proving took too long
248    Timeout,
249    /// Managed to prove within time limit, but node rejected solution, likely due to timeout on its
250    /// end
251    Rejected,
252    /// Proving failed altogether
253    Failed,
254}
255
256impl fmt::Display for ProvingResult {
257    #[inline]
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        f.write_str(match self {
260            Self::Success => "Success",
261            Self::Timeout => "Timeout",
262            Self::Rejected => "Rejected",
263            Self::Failed => "Failed",
264        })
265    }
266}
267
268/// Proving details
269#[derive(Debug, Copy, Clone, Encode, Decode)]
270pub struct ProvingDetails {
271    /// Whether proving ended up being successful
272    pub result: ProvingResult,
273    /// Audit duration
274    pub time: Duration,
275}
276
277/// Special decoded farming error
278#[derive(Debug, Encode, Decode)]
279pub struct DecodedFarmingError {
280    /// String representation of an error
281    error: String,
282    /// Whether error is fatal
283    is_fatal: bool,
284}
285
286impl fmt::Display for DecodedFarmingError {
287    #[inline]
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        self.error.fmt(f)
290    }
291}
292
293/// Errors that happen during farming
294#[derive(Debug, Error)]
295pub enum FarmingError {
296    /// Failed to subscribe to slot info notifications
297    #[error("Failed to subscribe to slot info notifications: {error}")]
298    FailedToSubscribeSlotInfo {
299        /// Lower-level error
300        error: anyhow::Error,
301    },
302    /// Failed to retrieve farmer info
303    #[error("Failed to retrieve farmer info: {error}")]
304    FailedToGetFarmerInfo {
305        /// Lower-level error
306        error: anyhow::Error,
307    },
308    /// Slot info notification stream ended
309    #[error("Slot info notification stream ended")]
310    SlotNotificationStreamEnded,
311    /// Low-level auditing error
312    #[error("Low-level auditing error: {0}")]
313    LowLevelAuditing(#[from] AuditingError),
314    /// Low-level proving error
315    #[error("Low-level proving error: {0}")]
316    LowLevelProving(#[from] ProvingError),
317    /// I/O error occurred
318    #[error("Farming I/O error: {0}")]
319    Io(#[from] io::Error),
320    /// Decoded farming error
321    #[error("Decoded farming error {0}")]
322    Decoded(DecodedFarmingError),
323}
324
325impl Encode for FarmingError {
326    #[inline]
327    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
328        let error = DecodedFarmingError {
329            error: self.to_string(),
330            is_fatal: self.is_fatal(),
331        };
332
333        error.encode_to(dest)
334    }
335}
336
337impl Decode for FarmingError {
338    #[inline]
339    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
340        DecodedFarmingError::decode(input).map(FarmingError::Decoded)
341    }
342}
343
344impl FarmingError {
345    /// String variant of the error, primarily for monitoring purposes
346    #[inline]
347    pub fn str_variant(&self) -> &str {
348        match self {
349            FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
350            FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
351            FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
352            FarmingError::LowLevelProving(_) => "LowLevelProving",
353            FarmingError::Io(_) => "Io",
354            FarmingError::Decoded(_) => "Decoded",
355            FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
356        }
357    }
358
359    /// Whether this error is fatal and makes farm unusable
360    pub fn is_fatal(&self) -> bool {
361        match self {
362            FarmingError::FailedToSubscribeSlotInfo { .. } => true,
363            FarmingError::FailedToGetFarmerInfo { .. } => true,
364            FarmingError::LowLevelAuditing(_) => true,
365            FarmingError::LowLevelProving(error) => error.is_fatal(),
366            FarmingError::Io(_) => true,
367            FarmingError::Decoded(error) => error.is_fatal,
368            FarmingError::SlotNotificationStreamEnded => true,
369        }
370    }
371}
372
373/// Various farming notifications
374#[derive(Debug, Clone, Encode, Decode)]
375pub enum FarmingNotification {
376    /// Auditing
377    Auditing(AuditingDetails),
378    /// Proving
379    Proving(ProvingDetails),
380    /// Non-fatal farming error
381    NonFatalError(Arc<FarmingError>),
382}
383
384/// Details about sector currently being plotted
385#[derive(Debug, Clone, Encode, Decode)]
386pub enum SectorPlottingDetails {
387    /// Starting plotting of a sector
388    Starting {
389        /// Progress so far in % (not including this sector)
390        progress: f32,
391        /// Whether sector is being replotted
392        replotting: bool,
393        /// Whether this is the last sector queued so far
394        last_queued: bool,
395    },
396    /// Downloading sector pieces
397    Downloading,
398    /// Downloaded sector pieces
399    Downloaded(Duration),
400    /// Encoding sector pieces
401    Encoding,
402    /// Encoded sector pieces
403    Encoded(Duration),
404    /// Writing sector
405    Writing,
406    /// Written sector
407    Written(Duration),
408    /// Finished plotting
409    Finished {
410        /// Information about plotted sector
411        plotted_sector: PlottedSector,
412        /// Information about old plotted sector that was replaced
413        old_plotted_sector: Option<PlottedSector>,
414        /// How much time it took to plot a sector
415        time: Duration,
416    },
417    /// Plotting failed
418    Error(String),
419}
420
421/// Details about sector expiration
422#[derive(Debug, Clone, Encode, Decode)]
423pub enum SectorExpirationDetails {
424    /// Sector expiration became known
425    Determined {
426        /// Segment index at which sector expires
427        expires_at: SegmentIndex,
428    },
429    /// Sector will expire at the next segment index and should be replotted
430    AboutToExpire,
431    /// Sector already expired
432    Expired,
433}
434
435/// Various sector updates
436#[derive(Debug, Clone, Encode, Decode)]
437pub enum SectorUpdate {
438    /// Sector is being plotted
439    Plotting(SectorPlottingDetails),
440    /// Sector expiration information updated
441    Expiration(SectorExpirationDetails),
442}
443
444/// Abstract piece reader implementation
445#[async_trait]
446pub trait PieceReader: Send + Sync + fmt::Debug {
447    /// Read piece from sector by offset, `None` means input parameters are incorrect or piece
448    /// reader was shut down
449    async fn read_piece(
450        &self,
451        sector_index: SectorIndex,
452        piece_offset: PieceOffset,
453    ) -> Result<Option<Piece>, FarmError>;
454}
455
456/// Opaque handler ID for event handlers, once dropped handler will be removed automatically
457pub trait HandlerId: Send + Sync + fmt::Debug {
458    /// Consumes [`HandlerId`] and prevents handler from being removed automatically.
459    fn detach(&self);
460}
461
462impl HandlerId for event_listener_primitives::HandlerId {
463    #[inline]
464    fn detach(&self) {
465        self.detach();
466    }
467}
468
469/// An identifier for a farm, can be used for in logs, thread names, etc.
470#[derive(
471    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
472)]
473#[serde(untagged)]
474pub enum FarmId {
475    /// Farm ID
476    Ulid(Ulid),
477}
478
479impl Encode for FarmId {
480    #[inline]
481    fn size_hint(&self) -> usize {
482        1_usize
483            + match self {
484                FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
485            }
486    }
487
488    #[inline]
489    fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
490        match self {
491            FarmId::Ulid(ulid) => {
492                output.push_byte(0);
493                Encode::encode_to(&ulid.0, output);
494            }
495        }
496    }
497}
498
499impl EncodeLike for FarmId {}
500
501impl Decode for FarmId {
502    #[inline]
503    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
504        match input
505            .read_byte()
506            .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
507        {
508            0 => u128::decode(input)
509                .map(|ulid| FarmId::Ulid(Ulid(ulid)))
510                .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
511            _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
512        }
513    }
514}
515
516#[allow(clippy::new_without_default)]
517impl FarmId {
518    /// Creates new ID
519    #[inline]
520    pub fn new() -> Self {
521        Self::Ulid(Ulid::new())
522    }
523}
524
525/// Abstract farm implementation
526#[async_trait(?Send)]
527pub trait Farm {
528    /// ID of this farm
529    fn id(&self) -> &FarmId;
530
531    /// Number of sectors in this farm
532    fn total_sectors_count(&self) -> SectorIndex;
533
534    /// Get plotted sectors instance
535    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;
536
537    /// Get piece reader to read plotted pieces later
538    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;
539
540    /// Subscribe to sector updates
541    fn on_sector_update(
542        &self,
543        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
544    ) -> Box<dyn HandlerId>;
545
546    /// Subscribe to farming notifications
547    fn on_farming_notification(
548        &self,
549        callback: HandlerFn<FarmingNotification>,
550    ) -> Box<dyn HandlerId>;
551
552    /// Subscribe to new solution notification
553    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;
554
555    /// Run and wait for background threads to exit or return an error
556    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
557}
558
559#[async_trait]
560impl<T> Farm for Box<T>
561where
562    T: Farm + ?Sized,
563{
564    #[inline]
565    fn id(&self) -> &FarmId {
566        self.as_ref().id()
567    }
568
569    #[inline]
570    fn total_sectors_count(&self) -> SectorIndex {
571        self.as_ref().total_sectors_count()
572    }
573
574    #[inline]
575    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
576        self.as_ref().plotted_sectors()
577    }
578
579    #[inline]
580    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
581        self.as_ref().piece_reader()
582    }
583
584    #[inline]
585    fn on_sector_update(
586        &self,
587        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
588    ) -> Box<dyn HandlerId> {
589        self.as_ref().on_sector_update(callback)
590    }
591
592    #[inline]
593    fn on_farming_notification(
594        &self,
595        callback: HandlerFn<FarmingNotification>,
596    ) -> Box<dyn HandlerId> {
597        self.as_ref().on_farming_notification(callback)
598    }
599
600    #[inline]
601    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
602        self.as_ref().on_solution(callback)
603    }
604
605    #[inline]
606    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
607        (*self).run()
608    }
609}