subspace_farmer/plotter/
pool.rs

1//! Pool plotter
2
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use async_trait::async_trait;
5use event_listener::Event;
6use futures::channel::mpsc;
7use futures::future;
8use std::any::type_name_of_val;
9use std::pin::pin;
10use std::time::Duration;
11use subspace_core_primitives::sectors::SectorIndex;
12use subspace_core_primitives::PublicKey;
13use subspace_farmer_components::FarmerProtocolInfo;
14use tracing::{error, trace};
15
16/// Pool plotter.
17///
18/// This plotter implementation relies on retries and should only be used with local plotter
19/// implementations (like CPU and GPU).
20#[derive(Debug)]
21pub struct PoolPlotter {
22    plotters: Vec<Box<dyn Plotter + Send + Sync>>,
23    retry_interval: Duration,
24    notification: Event,
25}
26
27#[async_trait]
28impl Plotter for PoolPlotter {
29    async fn has_free_capacity(&self) -> Result<bool, String> {
30        for (index, plotter) in self.plotters.iter().enumerate() {
31            match plotter.has_free_capacity().await {
32                Ok(result) => {
33                    if result {
34                        return Ok(true);
35                    }
36                }
37                Err(error) => {
38                    error!(
39                        %error,
40                        %index,
41                        r#type = type_name_of_val(plotter),
42                        "Failed to check free capacity for plotter"
43                    );
44                }
45            }
46        }
47
48        Ok(false)
49    }
50
51    async fn plot_sector(
52        &self,
53        public_key: PublicKey,
54        sector_index: SectorIndex,
55        farmer_protocol_info: FarmerProtocolInfo,
56        pieces_in_sector: u16,
57        replotting: bool,
58        progress_sender: mpsc::Sender<SectorPlottingProgress>,
59    ) {
60        loop {
61            for plotter in &self.plotters {
62                if plotter
63                    .try_plot_sector(
64                        public_key,
65                        sector_index,
66                        farmer_protocol_info,
67                        pieces_in_sector,
68                        replotting,
69                        progress_sender.clone(),
70                    )
71                    .await
72                {
73                    self.notification.notify_relaxed(1);
74                    return;
75                }
76            }
77
78            trace!(
79                retry_interval = ?self.retry_interval,
80                "All plotters are busy, will wait and try again later"
81            );
82            future::select(
83                pin!(tokio::time::sleep(self.retry_interval)),
84                self.notification.listen(),
85            )
86            .await;
87        }
88    }
89
90    async fn try_plot_sector(
91        &self,
92        public_key: PublicKey,
93        sector_index: SectorIndex,
94        farmer_protocol_info: FarmerProtocolInfo,
95        pieces_in_sector: u16,
96        replotting: bool,
97        progress_sender: mpsc::Sender<SectorPlottingProgress>,
98    ) -> bool {
99        for plotter in &self.plotters {
100            if plotter
101                .try_plot_sector(
102                    public_key,
103                    sector_index,
104                    farmer_protocol_info,
105                    pieces_in_sector,
106                    replotting,
107                    progress_sender.clone(),
108                )
109                .await
110            {
111                self.notification.notify_relaxed(1);
112                return true;
113            }
114        }
115
116        false
117    }
118}
119
120impl PoolPlotter {
121    /// Create new instance
122    pub fn new(plotters: Vec<Box<dyn Plotter + Send + Sync>>, retry_interval: Duration) -> Self {
123        Self {
124            plotters,
125            retry_interval,
126            notification: Event::new(),
127        }
128    }
129}