1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
//! Abstract farm API
//!
//! This module provides a bunch of traits and simple data structures that serve as a layer of
//! abstraction that improves composition without having assumptions about implementation details.
//!
//! Implementations can be local (backed by local disk) and remote (connected via network in some
//! way). This crate provides a few of such implementations, but more can be created externally as
//! well if needed without modifying the library itself.

use crate::node_client;
use async_trait::async_trait;
use derive_more::{Display, From};
use futures::Stream;
use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io};
use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex, SegmentIndex};
use subspace_farmer_components::auditing::AuditingError;
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::proving::ProvingError;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_rpc_primitives::SolutionResponse;
use thiserror::Error;
use ulid::Ulid;

pub mod plotted_pieces;

/// Erased error type
pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Type alias used for event handlers
pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;

/// Getter for plotted sectors
#[async_trait]
pub trait PlottedSectors: Send + Sync + fmt::Debug {
    /// Get already plotted sectors
    async fn get(
        &self,
    ) -> Result<
        Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
        FarmError,
    >;
}

/// An identifier for a cache, can be used for in logs, thread names, etc.
#[derive(
    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
)]
#[serde(untagged)]
pub enum PieceCacheId {
    /// Cache ID
    Ulid(Ulid),
}

impl Encode for PieceCacheId {
    #[inline]
    fn size_hint(&self) -> usize {
        1_usize
            + match self {
                PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
            }
    }

    #[inline]
    fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
        match self {
            PieceCacheId::Ulid(ulid) => {
                output.push_byte(0);
                Encode::encode_to(&ulid.0, output);
            }
        }
    }
}

impl EncodeLike for PieceCacheId {}

impl Decode for PieceCacheId {
    #[inline]
    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
        match input
            .read_byte()
            .map_err(|e| e.chain("Could not decode `CacheId`, failed to read variant byte"))?
        {
            0 => u128::decode(input)
                .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
                .map_err(|e| e.chain("Could not decode `CacheId::Ulid.0`")),
            _ => Err("Could not decode `CacheId`, variant doesn't exist".into()),
        }
    }
}

#[allow(clippy::new_without_default)]
impl PieceCacheId {
    /// Creates new ID
    #[inline]
    pub fn new() -> Self {
        Self::Ulid(Ulid::new())
    }
}

/// Offset wrapper for pieces in [`PieceCache`]
#[derive(Debug, Display, Copy, Clone, Encode, Decode)]
#[repr(transparent)]
pub struct PieceCacheOffset(pub(crate) u32);

/// Abstract piece cache implementation.
///
/// Piece cache is a simple container that stores concatenated pieces in a flat file at specific
/// offsets. Implementation doesn't have to be local though, cache can be remote somewhere on the
/// network, APIs are intentionally async to account for that.
#[async_trait]
pub trait PieceCache: Send + Sync + fmt::Debug {
    /// ID of this cache
    fn id(&self) -> &PieceCacheId;

    /// Max number of elements in this cache
    fn max_num_elements(&self) -> u32;

    /// Contents of this piece cache.
    ///
    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
    /// doesn't happen for the same piece being accessed!
    async fn contents(
        &self,
    ) -> Result<
        Box<
            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
                + Unpin
                + Send
                + '_,
        >,
        FarmError,
    >;

    /// Store piece in cache at specified offset, replacing existing piece if there is any.
    ///
    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
    /// doesn't happen for the same piece being accessed!
    async fn write_piece(
        &self,
        offset: PieceCacheOffset,
        piece_index: PieceIndex,
        piece: &Piece,
    ) -> Result<(), FarmError>;

    /// Read piece index from cache at specified offset.
    ///
    /// Returns `None` if offset is out of range.
    ///
    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
    /// doesn't happen for the same piece being accessed!
    async fn read_piece_index(
        &self,
        offset: PieceCacheOffset,
    ) -> Result<Option<PieceIndex>, FarmError>;

    /// Read piece from cache at specified offset.
    ///
    /// Returns `None` if offset is out of range.
    ///
    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
    /// doesn't happen for the same piece being accessed!
    async fn read_piece(
        &self,
        offset: PieceCacheOffset,
    ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
}

/// Result of piece storing check
#[derive(Debug, Copy, Clone, Encode, Decode)]
pub enum MaybePieceStoredResult {
    /// Definitely not stored
    No,
    /// Maybe has vacant slot to store
    Vacant,
    /// Maybe still stored
    Yes,
}

/// Abstract plot cache implementation.
///
/// Plot cache is a cache that exploits space towards the end of the plot that is not yet occupied
/// by sectors in order to increase effective caching space, which helps with plotting speed for
/// small farmers since they don't need to retrieve the same pieces from the network over and over
/// again, which is slower and uses a lot of Internet bandwidth.
#[async_trait]
pub trait PlotCache: Send + Sync + fmt::Debug {
    /// Check if piece is potentially stored in this cache (not guaranteed to be because it might be
    /// overridden with sector any time)
    async fn is_piece_maybe_stored(
        &self,
        key: &RecordKey,
    ) -> Result<MaybePieceStoredResult, FarmError>;

    /// Store piece in cache if there is free space, otherwise `Ok(false)` is returned
    async fn try_store_piece(
        &self,
        piece_index: PieceIndex,
        piece: &Piece,
    ) -> Result<bool, FarmError>;

    /// Read piece from cache.
    ///
    /// Returns `None` if not cached.
    async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
}

/// Auditing details
#[derive(Debug, Copy, Clone, Encode, Decode)]
pub struct AuditingDetails {
    /// Number of sectors that were audited
    pub sectors_count: SectorIndex,
    /// Audit duration
    pub time: Duration,
}

/// Result of the proving
#[derive(Debug, Copy, Clone, Encode, Decode)]
pub enum ProvingResult {
    /// Proved successfully and accepted by the node
    Success,
    /// Proving took too long
    Timeout,
    /// Managed to prove within time limit, but node rejected solution, likely due to timeout on its
    /// end
    Rejected,
    /// Proving failed altogether
    Failed,
}

impl fmt::Display for ProvingResult {
    #[inline]
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(match self {
            Self::Success => "Success",
            Self::Timeout => "Timeout",
            Self::Rejected => "Rejected",
            Self::Failed => "Failed",
        })
    }
}

/// Proving details
#[derive(Debug, Copy, Clone, Encode, Decode)]
pub struct ProvingDetails {
    /// Whether proving ended up being successful
    pub result: ProvingResult,
    /// Audit duration
    pub time: Duration,
}

/// Special decoded farming error
#[derive(Debug, Encode, Decode)]
pub struct DecodedFarmingError {
    /// String representation of an error
    error: String,
    /// Whether error is fatal
    is_fatal: bool,
}

impl fmt::Display for DecodedFarmingError {
    #[inline]
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        self.error.fmt(f)
    }
}

/// Errors that happen during farming
#[derive(Debug, Error)]
pub enum FarmingError {
    /// Failed to subscribe to slot info notifications
    #[error("Failed to subscribe to slot info notifications: {error}")]
    FailedToSubscribeSlotInfo {
        /// Lower-level error
        error: node_client::Error,
    },
    /// Failed to retrieve farmer info
    #[error("Failed to retrieve farmer info: {error}")]
    FailedToGetFarmerInfo {
        /// Lower-level error
        error: node_client::Error,
    },
    /// Slot info notification stream ended
    #[error("Slot info notification stream ended")]
    SlotNotificationStreamEnded,
    /// Low-level auditing error
    #[error("Low-level auditing error: {0}")]
    LowLevelAuditing(#[from] AuditingError),
    /// Low-level proving error
    #[error("Low-level proving error: {0}")]
    LowLevelProving(#[from] ProvingError),
    /// I/O error occurred
    #[error("Farming I/O error: {0}")]
    Io(#[from] io::Error),
    /// Decoded farming error
    #[error("Decoded farming error {0}")]
    Decoded(DecodedFarmingError),
}

impl Encode for FarmingError {
    #[inline]
    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
        let error = DecodedFarmingError {
            error: self.to_string(),
            is_fatal: self.is_fatal(),
        };

        error.encode_to(dest)
    }
}

impl Decode for FarmingError {
    #[inline]
    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
        DecodedFarmingError::decode(input).map(FarmingError::Decoded)
    }
}

impl FarmingError {
    /// String variant of the error, primarily for monitoring purposes
    #[inline]
    pub fn str_variant(&self) -> &str {
        match self {
            FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
            FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
            FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
            FarmingError::LowLevelProving(_) => "LowLevelProving",
            FarmingError::Io(_) => "Io",
            FarmingError::Decoded(_) => "Decoded",
            FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
        }
    }

    /// Whether this error is fatal and makes farm unusable
    pub fn is_fatal(&self) -> bool {
        match self {
            FarmingError::FailedToSubscribeSlotInfo { .. } => true,
            FarmingError::FailedToGetFarmerInfo { .. } => true,
            FarmingError::LowLevelAuditing(_) => true,
            FarmingError::LowLevelProving(error) => error.is_fatal(),
            FarmingError::Io(_) => true,
            FarmingError::Decoded(error) => error.is_fatal,
            FarmingError::SlotNotificationStreamEnded => true,
        }
    }
}

/// Various farming notifications
#[derive(Debug, Clone, Encode, Decode)]
pub enum FarmingNotification {
    /// Auditing
    Auditing(AuditingDetails),
    /// Proving
    Proving(ProvingDetails),
    /// Non-fatal farming error
    NonFatalError(Arc<FarmingError>),
}

/// Details about sector currently being plotted
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorPlottingDetails {
    /// Starting plotting of a sector
    Starting {
        /// Progress so far in % (not including this sector)
        progress: f32,
        /// Whether sector is being replotted
        replotting: bool,
        /// Whether this is the last sector queued so far
        last_queued: bool,
    },
    /// Downloading sector pieces
    Downloading,
    /// Downloaded sector pieces
    Downloaded(Duration),
    /// Encoding sector pieces
    Encoding,
    /// Encoded sector pieces
    Encoded(Duration),
    /// Writing sector
    Writing,
    /// Written sector
    Written(Duration),
    /// Finished plotting
    Finished {
        /// Information about plotted sector
        plotted_sector: PlottedSector,
        /// Information about old plotted sector that was replaced
        old_plotted_sector: Option<PlottedSector>,
        /// How much time it took to plot a sector
        time: Duration,
    },
    /// Plotting failed
    Error(String),
}

/// Details about sector expiration
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorExpirationDetails {
    /// Sector expiration became known
    Determined {
        /// Segment index at which sector expires
        expires_at: SegmentIndex,
    },
    /// Sector will expire at the next segment index and should be replotted
    AboutToExpire,
    /// Sector already expired
    Expired,
}

/// Various sector updates
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorUpdate {
    /// Sector is being plotted
    Plotting(SectorPlottingDetails),
    /// Sector expiration information updated
    Expiration(SectorExpirationDetails),
}

/// Abstract piece reader implementation
#[async_trait]
pub trait PieceReader: Send + Sync + fmt::Debug {
    /// Read piece from sector by offset, `None` means input parameters are incorrect or piece
    /// reader was shut down
    async fn read_piece(
        &self,
        sector_index: SectorIndex,
        piece_offset: PieceOffset,
    ) -> Result<Option<Piece>, FarmError>;
}

/// Opaque handler ID for event handlers, once dropped handler will be removed automatically
pub trait HandlerId: Send + Sync + fmt::Debug {
    /// Consumes [`HandlerId`] and prevents handler from being removed automatically.
    fn detach(&self);
}

impl HandlerId for event_listener_primitives::HandlerId {
    #[inline]
    fn detach(&self) {
        self.detach();
    }
}

/// An identifier for a farm, can be used for in logs, thread names, etc.
#[derive(
    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
)]
#[serde(untagged)]
pub enum FarmId {
    /// Farm ID
    Ulid(Ulid),
}

impl Encode for FarmId {
    #[inline]
    fn size_hint(&self) -> usize {
        1_usize
            + match self {
                FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
            }
    }

    #[inline]
    fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
        match self {
            FarmId::Ulid(ulid) => {
                output.push_byte(0);
                Encode::encode_to(&ulid.0, output);
            }
        }
    }
}

impl EncodeLike for FarmId {}

impl Decode for FarmId {
    #[inline]
    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
        match input
            .read_byte()
            .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
        {
            0 => u128::decode(input)
                .map(|ulid| FarmId::Ulid(Ulid(ulid)))
                .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
            _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
        }
    }
}

#[allow(clippy::new_without_default)]
impl FarmId {
    /// Creates new ID
    #[inline]
    pub fn new() -> Self {
        Self::Ulid(Ulid::new())
    }
}

/// Abstract farm implementation
#[async_trait(?Send)]
pub trait Farm {
    /// ID of this farm
    fn id(&self) -> &FarmId;

    /// Number of sectors in this farm
    fn total_sectors_count(&self) -> SectorIndex;

    /// Get plotted sectors instance
    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;

    /// Get piece reader to read plotted pieces later
    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;

    /// Subscribe to sector updates
    fn on_sector_update(
        &self,
        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
    ) -> Box<dyn HandlerId>;

    /// Subscribe to farming notifications
    fn on_farming_notification(
        &self,
        callback: HandlerFn<FarmingNotification>,
    ) -> Box<dyn HandlerId>;

    /// Subscribe to new solution notification
    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;

    /// Run and wait for background threads to exit or return an error
    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
}

#[async_trait]
impl<T> Farm for Box<T>
where
    T: Farm + ?Sized,
{
    #[inline]
    fn id(&self) -> &FarmId {
        self.as_ref().id()
    }

    #[inline]
    fn total_sectors_count(&self) -> SectorIndex {
        self.as_ref().total_sectors_count()
    }

    #[inline]
    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
        self.as_ref().plotted_sectors()
    }

    #[inline]
    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
        self.as_ref().piece_reader()
    }

    #[inline]
    fn on_sector_update(
        &self,
        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
    ) -> Box<dyn HandlerId> {
        self.as_ref().on_sector_update(callback)
    }

    #[inline]
    fn on_farming_notification(
        &self,
        callback: HandlerFn<FarmingNotification>,
    ) -> Box<dyn HandlerId> {
        self.as_ref().on_farming_notification(callback)
    }

    #[inline]
    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
        self.as_ref().on_solution(callback)
    }

    #[inline]
    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
        (*self).run()
    }
}