subspace_farmer/plotter/
pool.rs1use crate::plotter::{Plotter, SectorPlottingProgress};
4use async_trait::async_trait;
5use event_listener::Event;
6use futures::channel::mpsc;
7use futures::future;
8use std::any::type_name_of_val;
9use std::pin::pin;
10use std::time::Duration;
11use subspace_core_primitives::sectors::SectorIndex;
12use subspace_core_primitives::PublicKey;
13use subspace_farmer_components::FarmerProtocolInfo;
14use tracing::{error, trace};
15
16#[derive(Debug)]
21pub struct PoolPlotter {
22 plotters: Vec<Box<dyn Plotter + Send + Sync>>,
23 retry_interval: Duration,
24 notification: Event,
25}
26
27#[async_trait]
28impl Plotter for PoolPlotter {
29 async fn has_free_capacity(&self) -> Result<bool, String> {
30 for (index, plotter) in self.plotters.iter().enumerate() {
31 match plotter.has_free_capacity().await {
32 Ok(result) => {
33 if result {
34 return Ok(true);
35 }
36 }
37 Err(error) => {
38 error!(
39 %error,
40 %index,
41 r#type = type_name_of_val(plotter),
42 "Failed to check free capacity for plotter"
43 );
44 }
45 }
46 }
47
48 Ok(false)
49 }
50
51 async fn plot_sector(
52 &self,
53 public_key: PublicKey,
54 sector_index: SectorIndex,
55 farmer_protocol_info: FarmerProtocolInfo,
56 pieces_in_sector: u16,
57 replotting: bool,
58 progress_sender: mpsc::Sender<SectorPlottingProgress>,
59 ) {
60 loop {
61 for plotter in &self.plotters {
62 if plotter
63 .try_plot_sector(
64 public_key,
65 sector_index,
66 farmer_protocol_info,
67 pieces_in_sector,
68 replotting,
69 progress_sender.clone(),
70 )
71 .await
72 {
73 self.notification.notify_relaxed(1);
74 return;
75 }
76 }
77
78 trace!(
79 retry_interval = ?self.retry_interval,
80 "All plotters are busy, will wait and try again later"
81 );
82 future::select(
83 pin!(tokio::time::sleep(self.retry_interval)),
84 self.notification.listen(),
85 )
86 .await;
87 }
88 }
89
90 async fn try_plot_sector(
91 &self,
92 public_key: PublicKey,
93 sector_index: SectorIndex,
94 farmer_protocol_info: FarmerProtocolInfo,
95 pieces_in_sector: u16,
96 replotting: bool,
97 progress_sender: mpsc::Sender<SectorPlottingProgress>,
98 ) -> bool {
99 for plotter in &self.plotters {
100 if plotter
101 .try_plot_sector(
102 public_key,
103 sector_index,
104 farmer_protocol_info,
105 pieces_in_sector,
106 replotting,
107 progress_sender.clone(),
108 )
109 .await
110 {
111 self.notification.notify_relaxed(1);
112 return true;
113 }
114 }
115
116 false
117 }
118}
119
120impl PoolPlotter {
121 pub fn new(plotters: Vec<Box<dyn Plotter + Send + Sync>>, retry_interval: Duration) -> Self {
123 Self {
124 plotters,
125 retry_interval,
126 notification: Event::new(),
127 }
128 }
129}