1use async_trait::async_trait;
11use derive_more::{Display, From};
12use futures::Stream;
13use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
14use serde::{Deserialize, Serialize};
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::time::Duration;
19use std::{fmt, io};
20use subspace_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
21use subspace_core_primitives::sectors::SectorIndex;
22use subspace_core_primitives::segments::SegmentIndex;
23use subspace_farmer_components::auditing::AuditingError;
24use subspace_farmer_components::plotting::PlottedSector;
25use subspace_farmer_components::proving::ProvingError;
26use subspace_networking::libp2p::kad::RecordKey;
27use subspace_rpc_primitives::SolutionResponse;
28use thiserror::Error;
29use ulid::Ulid;
30
31pub mod plotted_pieces;
32
33pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
35pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
37
38#[async_trait]
40pub trait PlottedSectors: Send + Sync + fmt::Debug {
41 async fn get(
43 &self,
44 ) -> Result<
45 Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
46 FarmError,
47 >;
48}
49
50#[derive(
52 Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
53)]
54#[serde(untagged)]
55pub enum PieceCacheId {
56 Ulid(Ulid),
58}
59
60impl Encode for PieceCacheId {
61 #[inline]
62 fn size_hint(&self) -> usize {
63 1_usize
64 + match self {
65 PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
66 }
67 }
68
69 #[inline]
70 fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
71 match self {
72 PieceCacheId::Ulid(ulid) => {
73 output.push_byte(0);
74 Encode::encode_to(&ulid.0, output);
75 }
76 }
77 }
78}
79
80impl EncodeLike for PieceCacheId {}
81
82impl Decode for PieceCacheId {
83 #[inline]
84 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
85 match input
86 .read_byte()
87 .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))?
88 {
89 0 => u128::decode(input)
90 .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
91 .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")),
92 _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()),
93 }
94 }
95}
96
97#[allow(clippy::new_without_default)]
98impl PieceCacheId {
99 #[inline]
101 pub fn new() -> Self {
102 Self::Ulid(Ulid::new())
103 }
104}
105
106#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
108#[repr(transparent)]
109pub struct PieceCacheOffset(pub(crate) u32);
110
111#[async_trait]
117pub trait PieceCache: Send + Sync + fmt::Debug {
118 fn id(&self) -> &PieceCacheId;
120
121 fn max_num_elements(&self) -> u32;
123
124 async fn contents(
129 &self,
130 ) -> Result<
131 Box<
132 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
133 + Unpin
134 + Send
135 + '_,
136 >,
137 FarmError,
138 >;
139
140 async fn write_piece(
145 &self,
146 offset: PieceCacheOffset,
147 piece_index: PieceIndex,
148 piece: &Piece,
149 ) -> Result<(), FarmError>;
150
151 async fn read_piece_index(
158 &self,
159 offset: PieceCacheOffset,
160 ) -> Result<Option<PieceIndex>, FarmError>;
161
162 async fn read_piece(
169 &self,
170 offset: PieceCacheOffset,
171 ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
172
173 async fn read_pieces(
181 &self,
182 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
183 ) -> Result<
184 Box<
185 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
186 + Send
187 + Unpin
188 + '_,
189 >,
190 FarmError,
191 >;
192}
193
194#[derive(Debug, Copy, Clone, Encode, Decode)]
196pub enum MaybePieceStoredResult {
197 No,
199 Vacant,
203 Yes,
205}
206
207#[async_trait]
214pub trait PlotCache: Send + Sync + fmt::Debug {
215 async fn is_piece_maybe_stored(
219 &self,
220 key: &RecordKey,
221 ) -> Result<MaybePieceStoredResult, FarmError>;
222
223 async fn try_store_piece(
226 &self,
227 piece_index: PieceIndex,
228 piece: &Piece,
229 ) -> Result<bool, FarmError>;
230
231 async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
235}
236
237#[derive(Debug, Copy, Clone, Encode, Decode)]
239pub struct AuditingDetails {
240 pub sectors_count: SectorIndex,
242 pub time: Duration,
244}
245
246#[derive(Debug, Copy, Clone, Encode, Decode)]
248pub enum ProvingResult {
249 Success,
251 Timeout,
253 Rejected,
256 Failed,
258}
259
260impl fmt::Display for ProvingResult {
261 #[inline]
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.write_str(match self {
264 Self::Success => "Success",
265 Self::Timeout => "Timeout",
266 Self::Rejected => "Rejected",
267 Self::Failed => "Failed",
268 })
269 }
270}
271
272#[derive(Debug, Copy, Clone, Encode, Decode)]
274pub struct ProvingDetails {
275 pub result: ProvingResult,
277 pub time: Duration,
279}
280
281#[derive(Debug, Encode, Decode)]
283pub struct DecodedFarmingError {
284 error: String,
286 is_fatal: bool,
288}
289
290impl fmt::Display for DecodedFarmingError {
291 #[inline]
292 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293 self.error.fmt(f)
294 }
295}
296
297#[derive(Debug, Error)]
299pub enum FarmingError {
300 #[error("Failed to subscribe to slot info notifications: {error}")]
302 FailedToSubscribeSlotInfo {
303 error: anyhow::Error,
305 },
306 #[error("Failed to retrieve farmer info: {error}")]
308 FailedToGetFarmerInfo {
309 error: anyhow::Error,
311 },
312 #[error("Slot info notification stream ended")]
314 SlotNotificationStreamEnded,
315 #[error("Low-level auditing error: {0}")]
317 LowLevelAuditing(#[from] AuditingError),
318 #[error("Low-level proving error: {0}")]
320 LowLevelProving(#[from] ProvingError),
321 #[error("Farming I/O error: {0}")]
323 Io(#[from] io::Error),
324 #[error("Decoded farming error {0}")]
326 Decoded(DecodedFarmingError),
327}
328
329impl Encode for FarmingError {
330 #[inline]
331 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
332 let error = DecodedFarmingError {
333 error: self.to_string(),
334 is_fatal: self.is_fatal(),
335 };
336
337 error.encode_to(dest)
338 }
339}
340
341impl Decode for FarmingError {
342 #[inline]
343 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
344 DecodedFarmingError::decode(input).map(FarmingError::Decoded)
345 }
346}
347
348impl FarmingError {
349 #[inline]
351 pub fn str_variant(&self) -> &str {
352 match self {
353 FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
354 FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
355 FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
356 FarmingError::LowLevelProving(_) => "LowLevelProving",
357 FarmingError::Io(_) => "Io",
358 FarmingError::Decoded(_) => "Decoded",
359 FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
360 }
361 }
362
363 pub fn is_fatal(&self) -> bool {
365 match self {
366 FarmingError::FailedToSubscribeSlotInfo { .. } => true,
367 FarmingError::FailedToGetFarmerInfo { .. } => true,
368 FarmingError::LowLevelAuditing(_) => true,
369 FarmingError::LowLevelProving(error) => error.is_fatal(),
370 FarmingError::Io(_) => true,
371 FarmingError::Decoded(error) => error.is_fatal,
372 FarmingError::SlotNotificationStreamEnded => true,
373 }
374 }
375}
376
377#[derive(Debug, Clone, Encode, Decode)]
379pub enum FarmingNotification {
380 Auditing(AuditingDetails),
382 Proving(ProvingDetails),
384 NonFatalError(Arc<FarmingError>),
386}
387
388#[derive(Debug, Clone, Encode, Decode)]
390pub enum SectorPlottingDetails {
391 Starting {
393 progress: f32,
395 replotting: bool,
397 last_queued: bool,
399 },
400 Downloading,
402 Downloaded(Duration),
404 Encoding,
406 Encoded(Duration),
408 Writing,
410 Written(Duration),
412 Finished {
414 plotted_sector: PlottedSector,
416 old_plotted_sector: Option<PlottedSector>,
418 time: Duration,
420 },
421 Error(String),
423}
424
425#[derive(Debug, Clone, Encode, Decode)]
427pub enum SectorExpirationDetails {
428 Determined {
430 expires_at: SegmentIndex,
432 },
433 AboutToExpire,
435 Expired,
437}
438
439#[derive(Debug, Clone, Encode, Decode)]
441pub enum SectorUpdate {
442 Plotting(SectorPlottingDetails),
444 Expiration(SectorExpirationDetails),
446}
447
448#[async_trait]
450pub trait PieceReader: Send + Sync + fmt::Debug {
451 async fn read_piece(
454 &self,
455 sector_index: SectorIndex,
456 piece_offset: PieceOffset,
457 ) -> Result<Option<Piece>, FarmError>;
458}
459
460pub trait HandlerId: Send + Sync + fmt::Debug {
462 fn detach(&self);
464}
465
466impl HandlerId for event_listener_primitives::HandlerId {
467 #[inline]
468 fn detach(&self) {
469 self.detach();
470 }
471}
472
473#[derive(
475 Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
476)]
477#[serde(untagged)]
478pub enum FarmId {
479 Ulid(Ulid),
481}
482
483impl Encode for FarmId {
484 #[inline]
485 fn size_hint(&self) -> usize {
486 1_usize
487 + match self {
488 FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
489 }
490 }
491
492 #[inline]
493 fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
494 match self {
495 FarmId::Ulid(ulid) => {
496 output.push_byte(0);
497 Encode::encode_to(&ulid.0, output);
498 }
499 }
500 }
501}
502
503impl EncodeLike for FarmId {}
504
505impl Decode for FarmId {
506 #[inline]
507 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
508 match input
509 .read_byte()
510 .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
511 {
512 0 => u128::decode(input)
513 .map(|ulid| FarmId::Ulid(Ulid(ulid)))
514 .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
515 _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
516 }
517 }
518}
519
520#[allow(clippy::new_without_default)]
521impl FarmId {
522 #[inline]
524 pub fn new() -> Self {
525 Self::Ulid(Ulid::new())
526 }
527}
528
529#[async_trait(?Send)]
531pub trait Farm {
532 fn id(&self) -> &FarmId;
534
535 fn total_sectors_count(&self) -> SectorIndex;
537
538 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;
540
541 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;
543
544 fn on_sector_update(
546 &self,
547 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
548 ) -> Box<dyn HandlerId>;
549
550 fn on_farming_notification(
552 &self,
553 callback: HandlerFn<FarmingNotification>,
554 ) -> Box<dyn HandlerId>;
555
556 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;
558
559 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
561}
562
563#[async_trait]
564impl<T> Farm for Box<T>
565where
566 T: Farm + ?Sized,
567{
568 #[inline]
569 fn id(&self) -> &FarmId {
570 self.as_ref().id()
571 }
572
573 #[inline]
574 fn total_sectors_count(&self) -> SectorIndex {
575 self.as_ref().total_sectors_count()
576 }
577
578 #[inline]
579 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
580 self.as_ref().plotted_sectors()
581 }
582
583 #[inline]
584 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
585 self.as_ref().piece_reader()
586 }
587
588 #[inline]
589 fn on_sector_update(
590 &self,
591 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
592 ) -> Box<dyn HandlerId> {
593 self.as_ref().on_sector_update(callback)
594 }
595
596 #[inline]
597 fn on_farming_notification(
598 &self,
599 callback: HandlerFn<FarmingNotification>,
600 ) -> Box<dyn HandlerId> {
601 self.as_ref().on_farming_notification(callback)
602 }
603
604 #[inline]
605 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
606 self.as_ref().on_solution(callback)
607 }
608
609 #[inline]
610 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
611 (*self).run()
612 }
613}