subspace_farmer/single_disk_farm/
farming.rs

1//! Farming-related utilities
2//!
3//! These utilities do not expose the whole farming workflow, but rather small bits of it that can
4//! be useful externally (for example for benchmarking purposes in CLI).
5
6pub mod rayon_files;
7
8use crate::farm::{
9    AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
15use futures::StreamExt;
16use futures::channel::mpsc;
17use parking_lot::Mutex;
18use rayon::ThreadPool;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::time::Instant;
22use subspace_core_primitives::PublicKey;
23use subspace_core_primitives::pieces::Record;
24use subspace_core_primitives::pos::PosSeed;
25use subspace_core_primitives::sectors::SectorIndex;
26use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
27use subspace_core_primitives::solutions::{Solution, SolutionRange};
28use subspace_erasure_coding::ErasureCoding;
29use subspace_farmer_components::ReadAtSync;
30use subspace_farmer_components::auditing::{AuditingError, audit_plot_sync};
31use subspace_farmer_components::proving::{ProvableSolutions, ProvingError};
32use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
33use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
34use subspace_kzg::Kzg;
35use subspace_proof_of_space::{Table, TableGenerator};
36use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
37use tracing::{Span, debug, error, info, trace, warn};
38
39/// How many non-fatal errors should happen in a row before farm is considered non-operational
40const NON_FATAL_ERROR_LIMIT: usize = 10;
41
42pub(super) async fn slot_notification_forwarder<NC>(
43    node_client: &NC,
44    mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
45    metrics: Option<Arc<SingleDiskFarmMetrics>>,
46) -> Result<(), FarmingError>
47where
48    NC: NodeClient,
49{
50    info!("Subscribing to slot info notifications");
51
52    let mut slot_info_notifications = node_client
53        .subscribe_slot_info()
54        .await
55        .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
56
57    while let Some(slot_info) = slot_info_notifications.next().await {
58        debug!(?slot_info, "New slot");
59
60        let slot = slot_info.slot_number;
61
62        // Error means farmer is still solving for previous slot, which is too late, and we need to
63        // skip this slot
64        if slot_info_forwarder_sender.try_send(slot_info).is_err() {
65            if let Some(metrics) = &metrics {
66                metrics.skipped_slots.inc();
67            }
68            debug!(%slot, "Slow farming, skipping slot");
69        }
70    }
71
72    Err(FarmingError::SlotNotificationStreamEnded)
73}
74
75/// Plot audit options
76#[derive(Debug)]
77pub struct PlotAuditOptions<'a, 'b, PosTable>
78where
79    PosTable: Table,
80{
81    /// Public key of the farm
82    pub public_key: &'a PublicKey,
83    /// Reward address to use for solutions
84    pub reward_address: &'a PublicKey,
85    /// Slot info for the audit
86    pub slot_info: SlotInfo,
87    /// Metadata of all sectors plotted so far
88    pub sectors_metadata: &'a [SectorMetadataChecksummed],
89    /// Kzg instance
90    pub kzg: &'a Kzg,
91    /// Erasure coding instance
92    pub erasure_coding: &'a ErasureCoding,
93    /// Optional sector that is currently being modified (for example replotted) and should not be
94    /// audited
95    pub sectors_being_modified: &'b HashSet<SectorIndex>,
96    /// Mode of reading chunks during proving
97    pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
98    /// Proof of space table generator
99    pub table_generator: &'a Mutex<PosTable::Generator>,
100}
101
102impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
103where
104    PosTable: Table,
105{
106    #[inline]
107    fn clone(&self) -> Self {
108        *self
109    }
110}
111
112impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
113
114/// Plot auditing implementation
115#[derive(Debug)]
116pub struct PlotAudit<Plot>(Plot)
117where
118    Plot: ReadAtSync;
119
120impl<'a, Plot> PlotAudit<Plot>
121where
122    Plot: ReadAtSync + 'a,
123{
124    /// Create new instance
125    pub fn new(plot: Plot) -> Self {
126        Self(plot)
127    }
128
129    /// Audit this plot
130    #[allow(clippy::type_complexity)]
131    pub fn audit<'b, PosTable>(
132        &'a self,
133        options: PlotAuditOptions<'a, 'b, PosTable>,
134    ) -> Result<
135        Vec<(
136            SectorIndex,
137            impl ProvableSolutions<Item = Result<Solution<PublicKey>, ProvingError>>
138            + use<'a, PosTable, Plot>,
139        )>,
140        AuditingError,
141    >
142    where
143        PosTable: Table,
144    {
145        let PlotAuditOptions {
146            public_key,
147            reward_address,
148            slot_info,
149            sectors_metadata,
150            kzg,
151            erasure_coding,
152            sectors_being_modified,
153            read_sector_record_chunks_mode: mode,
154            table_generator,
155        } = options;
156
157        let audit_results = audit_plot_sync(
158            public_key,
159            &slot_info.global_challenge,
160            slot_info.voting_solution_range,
161            &self.0,
162            sectors_metadata,
163            sectors_being_modified,
164        )?;
165
166        Ok(audit_results
167            .into_iter()
168            .filter_map(|audit_results| {
169                let sector_index = audit_results.sector_index;
170
171                let sector_solutions = audit_results.solution_candidates.into_solutions(
172                    reward_address,
173                    kzg,
174                    erasure_coding,
175                    mode,
176                    |seed: &PosSeed| table_generator.lock().generate_parallel(seed),
177                );
178
179                let sector_solutions = match sector_solutions {
180                    Ok(solutions) => solutions,
181                    Err(error) => {
182                        warn!(
183                            %error,
184                            %sector_index,
185                            "Failed to turn solution candidates into solutions",
186                        );
187
188                        return None;
189                    }
190                };
191
192                if sector_solutions.len() == 0 {
193                    return None;
194                }
195
196                Some((sector_index, sector_solutions))
197            })
198            .collect())
199    }
200}
201
202pub(super) struct FarmingOptions<NC, PlotAudit> {
203    pub(super) public_key: PublicKey,
204    pub(super) reward_address: PublicKey,
205    pub(super) node_client: NC,
206    pub(super) plot_audit: PlotAudit,
207    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
208    pub(super) kzg: Kzg,
209    pub(super) erasure_coding: ErasureCoding,
210    pub(super) handlers: Arc<Handlers>,
211    pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
212    pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
213    pub(super) thread_pool: ThreadPool,
214    pub(super) read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
215    pub(super) global_mutex: Arc<AsyncMutex<()>>,
216    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
217}
218
219/// Starts farming process.
220///
221/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
222/// thread.
223pub(super) async fn farming<'a, PosTable, NC, Plot>(
224    farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
225) -> Result<(), FarmingError>
226where
227    PosTable: Table,
228    NC: NodeClient,
229    Plot: ReadAtSync + 'a,
230{
231    let FarmingOptions {
232        public_key,
233        reward_address,
234        node_client,
235        plot_audit,
236        sectors_metadata,
237        kzg,
238        erasure_coding,
239        handlers,
240        sectors_being_modified,
241        mut slot_info_notifications,
242        thread_pool,
243        read_sector_record_chunks_mode,
244        global_mutex,
245        metrics,
246    } = farming_options;
247
248    let farmer_app_info = node_client
249        .farmer_app_info()
250        .await
251        .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
252
253    // We assume that each slot is one second
254    let farming_timeout = farmer_app_info.farming_timeout;
255
256    let table_generator = Arc::new(Mutex::new(PosTable::generator()));
257    let span = Span::current();
258
259    let mut non_fatal_errors = 0;
260
261    while let Some(slot_info) = slot_info_notifications.next().await {
262        let slot = slot_info.slot_number;
263
264        // Take mutex briefly to make sure farming is allowed right now
265        global_mutex.lock().await;
266
267        let mut problematic_sectors = Vec::new();
268        let result: Result<(), FarmingError> = try {
269            let start = Instant::now();
270            let sectors_metadata = sectors_metadata.read().await;
271
272            debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
273
274            let mut sectors_solutions = {
275                let sectors_being_modified = &*sectors_being_modified.read().await;
276
277                thread_pool.install(|| {
278                    let _span_guard = span.enter();
279
280                    plot_audit.audit(PlotAuditOptions::<PosTable> {
281                        public_key: &public_key,
282                        reward_address: &reward_address,
283                        slot_info,
284                        sectors_metadata: &sectors_metadata,
285                        kzg: &kzg,
286                        erasure_coding: &erasure_coding,
287                        sectors_being_modified,
288                        read_sector_record_chunks_mode,
289                        table_generator: &table_generator,
290                    })
291                })?
292            };
293
294            sectors_solutions.sort_by(|a, b| {
295                let a_solution_distance =
296                    a.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
297                let b_solution_distance =
298                    b.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
299
300                a_solution_distance.cmp(&b_solution_distance)
301            });
302
303            {
304                let time = start.elapsed();
305                if let Some(metrics) = &metrics {
306                    metrics.auditing_time.observe(time.as_secs_f64());
307                }
308                handlers
309                    .farming_notification
310                    .call_simple(&FarmingNotification::Auditing(AuditingDetails {
311                        sectors_count: sectors_metadata.len() as SectorIndex,
312                        time,
313                    }));
314            }
315
316            // Take mutex and hold until proving end to make sure nothing else major happens at the
317            // same time
318            let _proving_guard = global_mutex.lock().await;
319
320            'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
321                if sector_solutions.is_empty() {
322                    continue;
323                }
324                let mut start = Instant::now();
325                while let Some(maybe_solution) = thread_pool.install(|| {
326                    let _span_guard = span.enter();
327
328                    sector_solutions.next()
329                }) {
330                    let solution = match maybe_solution {
331                        Ok(solution) => solution,
332                        Err(error) => {
333                            if let Some(metrics) = &metrics {
334                                metrics
335                                    .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
336                            }
337                            error!(
338                                %slot,
339                                %sector_index,
340                                %error,
341                                "Failed to prove, scheduling sector for replotting"
342                            );
343                            problematic_sectors.push(sector_index);
344                            // Do not error completely as disk corruption or other reasons why
345                            // proving might fail
346                            start = Instant::now();
347                            continue;
348                        }
349                    };
350
351                    debug!(%slot, %sector_index, "Solution found");
352                    trace!(?solution, "Solution found");
353
354                    {
355                        let time = start.elapsed();
356                        if time >= farming_timeout {
357                            if let Some(metrics) = &metrics {
358                                metrics.observe_proving_time(&time, ProvingResult::Timeout);
359                            }
360                            handlers.farming_notification.call_simple(
361                                &FarmingNotification::Proving(ProvingDetails {
362                                    result: ProvingResult::Timeout,
363                                    time,
364                                }),
365                            );
366                            warn!(
367                                %slot,
368                                %sector_index,
369                                "Proving for solution skipped due to farming time limit",
370                            );
371
372                            break 'solutions_processing;
373                        }
374                    }
375
376                    let response = SolutionResponse {
377                        slot_number: slot,
378                        solution,
379                    };
380
381                    handlers.solution.call_simple(&response);
382
383                    if let Err(error) = node_client.submit_solution_response(response).await {
384                        let time = start.elapsed();
385                        if let Some(metrics) = &metrics {
386                            metrics.observe_proving_time(&time, ProvingResult::Rejected);
387                        }
388                        handlers
389                            .farming_notification
390                            .call_simple(&FarmingNotification::Proving(ProvingDetails {
391                                result: ProvingResult::Rejected,
392                                time,
393                            }));
394                        warn!(
395                            %slot,
396                            %sector_index,
397                            %error,
398                            "Failed to send solution to node, skipping further proving for this slot",
399                        );
400                        break 'solutions_processing;
401                    }
402
403                    let time = start.elapsed();
404                    if let Some(metrics) = &metrics {
405                        metrics.observe_proving_time(&time, ProvingResult::Success);
406                    }
407                    handlers
408                        .farming_notification
409                        .call_simple(&FarmingNotification::Proving(ProvingDetails {
410                            result: ProvingResult::Success,
411                            time,
412                        }));
413                    start = Instant::now();
414                }
415            }
416        };
417
418        if let Err(error) = result {
419            if error.is_fatal() {
420                return Err(error);
421            }
422
423            non_fatal_errors += 1;
424
425            if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
426                return Err(error);
427            }
428
429            warn!(
430                %error,
431                "Non-fatal farming error"
432            );
433
434            if let Some(metrics) = &metrics {
435                metrics.note_farming_error(&error);
436            }
437            handlers
438                .farming_notification
439                .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
440
441            for sector_index in problematic_sectors.drain(..) {
442                // Inform others that this sector is being modified
443                sectors_being_modified.write().await.insert(sector_index);
444                // Replace metadata with a dummy one, so it will be picked up for replotting next
445                if let Some(existing_sector_metadata) = sectors_metadata
446                    .write()
447                    .await
448                    .get_mut(sector_index as usize)
449                {
450                    *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
451                        sector_index,
452                        pieces_in_sector: existing_sector_metadata.pieces_in_sector,
453                        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
454                        history_size: HistorySize::from(SegmentIndex::ZERO),
455                    });
456                }
457                // Inform others that this sector is no longer being modified
458                sectors_being_modified.write().await.remove(&sector_index);
459            }
460        } else {
461            non_fatal_errors = 0;
462        }
463    }
464
465    Ok(())
466}