1use async_trait::async_trait;
11use derive_more::{Display, From};
12use futures::Stream;
13use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
14use serde::{Deserialize, Serialize};
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::time::Duration;
19use std::{fmt, io};
20use subspace_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
21use subspace_core_primitives::sectors::SectorIndex;
22use subspace_core_primitives::segments::SegmentIndex;
23use subspace_farmer_components::auditing::AuditingError;
24use subspace_farmer_components::plotting::PlottedSector;
25use subspace_farmer_components::proving::ProvingError;
26use subspace_networking::libp2p::kad::RecordKey;
27use subspace_rpc_primitives::SolutionResponse;
28use thiserror::Error;
29use ulid::Ulid;
30
31pub mod plotted_pieces;
32
33pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
35pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
37
38#[async_trait]
40pub trait PlottedSectors: Send + Sync + fmt::Debug {
41 async fn get(
43 &self,
44 ) -> Result<
45 Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
46 FarmError,
47 >;
48}
49
50#[derive(
52 Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
53)]
54#[serde(untagged)]
55pub enum PieceCacheId {
56 Ulid(Ulid),
58}
59
60impl Encode for PieceCacheId {
61 #[inline]
62 fn size_hint(&self) -> usize {
63 1_usize
64 + match self {
65 PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
66 }
67 }
68
69 #[inline]
70 fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
71 match self {
72 PieceCacheId::Ulid(ulid) => {
73 output.push_byte(0);
74 Encode::encode_to(&ulid.0, output);
75 }
76 }
77 }
78}
79
80impl EncodeLike for PieceCacheId {}
81
82impl Decode for PieceCacheId {
83 #[inline]
84 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
85 match input
86 .read_byte()
87 .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))?
88 {
89 0 => u128::decode(input)
90 .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
91 .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")),
92 _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()),
93 }
94 }
95}
96
97#[allow(clippy::new_without_default)]
98impl PieceCacheId {
99 #[inline]
101 pub fn new() -> Self {
102 Self::Ulid(Ulid::new())
103 }
104}
105
106#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
108#[repr(transparent)]
109pub struct PieceCacheOffset(pub(crate) u32);
110
111#[async_trait]
117pub trait PieceCache: Send + Sync + fmt::Debug {
118 fn id(&self) -> &PieceCacheId;
120
121 fn max_num_elements(&self) -> u32;
123
124 async fn contents(
129 &self,
130 ) -> Result<
131 Box<
132 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
133 + Unpin
134 + Send
135 + '_,
136 >,
137 FarmError,
138 >;
139
140 async fn write_piece(
145 &self,
146 offset: PieceCacheOffset,
147 piece_index: PieceIndex,
148 piece: &Piece,
149 ) -> Result<(), FarmError>;
150
151 async fn read_piece_index(
158 &self,
159 offset: PieceCacheOffset,
160 ) -> Result<Option<PieceIndex>, FarmError>;
161
162 async fn read_piece(
169 &self,
170 offset: PieceCacheOffset,
171 ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
172
173 async fn read_pieces(
181 &self,
182 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
183 ) -> Result<
184 Box<
185 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
186 + Send
187 + Unpin
188 + '_,
189 >,
190 FarmError,
191 >;
192}
193
194#[derive(Debug, Copy, Clone, Encode, Decode)]
196pub enum MaybePieceStoredResult {
197 No,
199 Vacant,
201 Yes,
203}
204
205#[async_trait]
212pub trait PlotCache: Send + Sync + fmt::Debug {
213 async fn is_piece_maybe_stored(
216 &self,
217 key: &RecordKey,
218 ) -> Result<MaybePieceStoredResult, FarmError>;
219
220 async fn try_store_piece(
222 &self,
223 piece_index: PieceIndex,
224 piece: &Piece,
225 ) -> Result<bool, FarmError>;
226
227 async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
231}
232
233#[derive(Debug, Copy, Clone, Encode, Decode)]
235pub struct AuditingDetails {
236 pub sectors_count: SectorIndex,
238 pub time: Duration,
240}
241
242#[derive(Debug, Copy, Clone, Encode, Decode)]
244pub enum ProvingResult {
245 Success,
247 Timeout,
249 Rejected,
252 Failed,
254}
255
256impl fmt::Display for ProvingResult {
257 #[inline]
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 f.write_str(match self {
260 Self::Success => "Success",
261 Self::Timeout => "Timeout",
262 Self::Rejected => "Rejected",
263 Self::Failed => "Failed",
264 })
265 }
266}
267
268#[derive(Debug, Copy, Clone, Encode, Decode)]
270pub struct ProvingDetails {
271 pub result: ProvingResult,
273 pub time: Duration,
275}
276
277#[derive(Debug, Encode, Decode)]
279pub struct DecodedFarmingError {
280 error: String,
282 is_fatal: bool,
284}
285
286impl fmt::Display for DecodedFarmingError {
287 #[inline]
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 self.error.fmt(f)
290 }
291}
292
293#[derive(Debug, Error)]
295pub enum FarmingError {
296 #[error("Failed to subscribe to slot info notifications: {error}")]
298 FailedToSubscribeSlotInfo {
299 error: anyhow::Error,
301 },
302 #[error("Failed to retrieve farmer info: {error}")]
304 FailedToGetFarmerInfo {
305 error: anyhow::Error,
307 },
308 #[error("Slot info notification stream ended")]
310 SlotNotificationStreamEnded,
311 #[error("Low-level auditing error: {0}")]
313 LowLevelAuditing(#[from] AuditingError),
314 #[error("Low-level proving error: {0}")]
316 LowLevelProving(#[from] ProvingError),
317 #[error("Farming I/O error: {0}")]
319 Io(#[from] io::Error),
320 #[error("Decoded farming error {0}")]
322 Decoded(DecodedFarmingError),
323}
324
325impl Encode for FarmingError {
326 #[inline]
327 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
328 let error = DecodedFarmingError {
329 error: self.to_string(),
330 is_fatal: self.is_fatal(),
331 };
332
333 error.encode_to(dest)
334 }
335}
336
337impl Decode for FarmingError {
338 #[inline]
339 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
340 DecodedFarmingError::decode(input).map(FarmingError::Decoded)
341 }
342}
343
344impl FarmingError {
345 #[inline]
347 pub fn str_variant(&self) -> &str {
348 match self {
349 FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
350 FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
351 FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
352 FarmingError::LowLevelProving(_) => "LowLevelProving",
353 FarmingError::Io(_) => "Io",
354 FarmingError::Decoded(_) => "Decoded",
355 FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
356 }
357 }
358
359 pub fn is_fatal(&self) -> bool {
361 match self {
362 FarmingError::FailedToSubscribeSlotInfo { .. } => true,
363 FarmingError::FailedToGetFarmerInfo { .. } => true,
364 FarmingError::LowLevelAuditing(_) => true,
365 FarmingError::LowLevelProving(error) => error.is_fatal(),
366 FarmingError::Io(_) => true,
367 FarmingError::Decoded(error) => error.is_fatal,
368 FarmingError::SlotNotificationStreamEnded => true,
369 }
370 }
371}
372
373#[derive(Debug, Clone, Encode, Decode)]
375pub enum FarmingNotification {
376 Auditing(AuditingDetails),
378 Proving(ProvingDetails),
380 NonFatalError(Arc<FarmingError>),
382}
383
384#[derive(Debug, Clone, Encode, Decode)]
386pub enum SectorPlottingDetails {
387 Starting {
389 progress: f32,
391 replotting: bool,
393 last_queued: bool,
395 },
396 Downloading,
398 Downloaded(Duration),
400 Encoding,
402 Encoded(Duration),
404 Writing,
406 Written(Duration),
408 Finished {
410 plotted_sector: PlottedSector,
412 old_plotted_sector: Option<PlottedSector>,
414 time: Duration,
416 },
417 Error(String),
419}
420
421#[derive(Debug, Clone, Encode, Decode)]
423pub enum SectorExpirationDetails {
424 Determined {
426 expires_at: SegmentIndex,
428 },
429 AboutToExpire,
431 Expired,
433}
434
435#[derive(Debug, Clone, Encode, Decode)]
437pub enum SectorUpdate {
438 Plotting(SectorPlottingDetails),
440 Expiration(SectorExpirationDetails),
442}
443
444#[async_trait]
446pub trait PieceReader: Send + Sync + fmt::Debug {
447 async fn read_piece(
450 &self,
451 sector_index: SectorIndex,
452 piece_offset: PieceOffset,
453 ) -> Result<Option<Piece>, FarmError>;
454}
455
456pub trait HandlerId: Send + Sync + fmt::Debug {
458 fn detach(&self);
460}
461
462impl HandlerId for event_listener_primitives::HandlerId {
463 #[inline]
464 fn detach(&self) {
465 self.detach();
466 }
467}
468
469#[derive(
471 Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
472)]
473#[serde(untagged)]
474pub enum FarmId {
475 Ulid(Ulid),
477}
478
479impl Encode for FarmId {
480 #[inline]
481 fn size_hint(&self) -> usize {
482 1_usize
483 + match self {
484 FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
485 }
486 }
487
488 #[inline]
489 fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
490 match self {
491 FarmId::Ulid(ulid) => {
492 output.push_byte(0);
493 Encode::encode_to(&ulid.0, output);
494 }
495 }
496 }
497}
498
499impl EncodeLike for FarmId {}
500
501impl Decode for FarmId {
502 #[inline]
503 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
504 match input
505 .read_byte()
506 .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
507 {
508 0 => u128::decode(input)
509 .map(|ulid| FarmId::Ulid(Ulid(ulid)))
510 .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
511 _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
512 }
513 }
514}
515
516#[allow(clippy::new_without_default)]
517impl FarmId {
518 #[inline]
520 pub fn new() -> Self {
521 Self::Ulid(Ulid::new())
522 }
523}
524
525#[async_trait(?Send)]
527pub trait Farm {
528 fn id(&self) -> &FarmId;
530
531 fn total_sectors_count(&self) -> SectorIndex;
533
534 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;
536
537 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;
539
540 fn on_sector_update(
542 &self,
543 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
544 ) -> Box<dyn HandlerId>;
545
546 fn on_farming_notification(
548 &self,
549 callback: HandlerFn<FarmingNotification>,
550 ) -> Box<dyn HandlerId>;
551
552 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;
554
555 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
557}
558
559#[async_trait]
560impl<T> Farm for Box<T>
561where
562 T: Farm + ?Sized,
563{
564 #[inline]
565 fn id(&self) -> &FarmId {
566 self.as_ref().id()
567 }
568
569 #[inline]
570 fn total_sectors_count(&self) -> SectorIndex {
571 self.as_ref().total_sectors_count()
572 }
573
574 #[inline]
575 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
576 self.as_ref().plotted_sectors()
577 }
578
579 #[inline]
580 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
581 self.as_ref().piece_reader()
582 }
583
584 #[inline]
585 fn on_sector_update(
586 &self,
587 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
588 ) -> Box<dyn HandlerId> {
589 self.as_ref().on_sector_update(callback)
590 }
591
592 #[inline]
593 fn on_farming_notification(
594 &self,
595 callback: HandlerFn<FarmingNotification>,
596 ) -> Box<dyn HandlerId> {
597 self.as_ref().on_farming_notification(callback)
598 }
599
600 #[inline]
601 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
602 self.as_ref().on_solution(callback)
603 }
604
605 #[inline]
606 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
607 (*self).run()
608 }
609}