subspace_farmer/single_disk_farm/
farming.rs

1//! Farming-related utilities
2//!
3//! These utilities do not expose the whole farming workflow, but rather small bits of it that can
4//! be useful externally (for example for benchmarking purposes in CLI).
5
6pub mod rayon_files;
7
8use crate::farm::{
9    AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
13use crate::single_disk_farm::Handlers;
14use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
15use futures::channel::mpsc;
16use futures::StreamExt;
17use parking_lot::Mutex;
18use rayon::ThreadPool;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::time::Instant;
22use subspace_core_primitives::pieces::Record;
23use subspace_core_primitives::pos::PosSeed;
24use subspace_core_primitives::sectors::SectorIndex;
25use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
26use subspace_core_primitives::solutions::{Solution, SolutionRange};
27use subspace_core_primitives::PublicKey;
28use subspace_erasure_coding::ErasureCoding;
29use subspace_farmer_components::auditing::{audit_plot_sync, AuditingError};
30use subspace_farmer_components::proving::{ProvableSolutions, ProvingError};
31use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
32use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
33use subspace_farmer_components::ReadAtSync;
34use subspace_kzg::Kzg;
35use subspace_proof_of_space::{Table, TableGenerator};
36use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
37use tracing::{debug, error, info, trace, warn, Span};
38
39/// How many non-fatal errors should happen in a row before farm is considered non-operational
40const NON_FATAL_ERROR_LIMIT: usize = 10;
41
42pub(super) async fn slot_notification_forwarder<NC>(
43    node_client: &NC,
44    mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
45    metrics: Option<Arc<SingleDiskFarmMetrics>>,
46) -> Result<(), FarmingError>
47where
48    NC: NodeClient,
49{
50    info!("Subscribing to slot info notifications");
51
52    let mut slot_info_notifications = node_client
53        .subscribe_slot_info()
54        .await
55        .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
56
57    while let Some(slot_info) = slot_info_notifications.next().await {
58        debug!(?slot_info, "New slot");
59
60        let slot = slot_info.slot_number;
61
62        // Error means farmer is still solving for previous slot, which is too late, and we need to
63        // skip this slot
64        if slot_info_forwarder_sender.try_send(slot_info).is_err() {
65            if let Some(metrics) = &metrics {
66                metrics.skipped_slots.inc();
67            }
68            debug!(%slot, "Slow farming, skipping slot");
69        }
70    }
71
72    Err(FarmingError::SlotNotificationStreamEnded)
73}
74
75/// Plot audit options
76#[derive(Debug)]
77pub struct PlotAuditOptions<'a, 'b, PosTable>
78where
79    PosTable: Table,
80{
81    /// Public key of the farm
82    pub public_key: &'a PublicKey,
83    /// Reward address to use for solutions
84    pub reward_address: &'a PublicKey,
85    /// Slot info for the audit
86    pub slot_info: SlotInfo,
87    /// Metadata of all sectors plotted so far
88    pub sectors_metadata: &'a [SectorMetadataChecksummed],
89    /// Kzg instance
90    pub kzg: &'a Kzg,
91    /// Erasure coding instance
92    pub erasure_coding: &'a ErasureCoding,
93    /// Optional sector that is currently being modified (for example replotted) and should not be
94    /// audited
95    pub sectors_being_modified: &'b HashSet<SectorIndex>,
96    /// Mode of reading chunks during proving
97    pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
98    /// Proof of space table generator
99    pub table_generator: &'a Mutex<PosTable::Generator>,
100}
101
102impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
103where
104    PosTable: Table,
105{
106    #[inline]
107    fn clone(&self) -> Self {
108        *self
109    }
110}
111
112impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
113
114/// Plot auditing implementation
115#[derive(Debug)]
116pub struct PlotAudit<Plot>(Plot)
117where
118    Plot: ReadAtSync;
119
120impl<'a, Plot> PlotAudit<Plot>
121where
122    Plot: ReadAtSync + 'a,
123{
124    /// Create new instance
125    pub fn new(plot: Plot) -> Self {
126        Self(plot)
127    }
128
129    /// Audit this plot
130    #[allow(clippy::type_complexity)]
131    pub fn audit<'b, PosTable>(
132        &'a self,
133        options: PlotAuditOptions<'a, 'b, PosTable>,
134    ) -> Result<
135        Vec<(
136            SectorIndex,
137            impl ProvableSolutions<Item = Result<Solution<PublicKey>, ProvingError>> + 'a,
138        )>,
139        AuditingError,
140    >
141    where
142        PosTable: Table,
143    {
144        let PlotAuditOptions {
145            public_key,
146            reward_address,
147            slot_info,
148            sectors_metadata,
149            kzg,
150            erasure_coding,
151            sectors_being_modified,
152            read_sector_record_chunks_mode: mode,
153            table_generator,
154        } = options;
155
156        let audit_results = audit_plot_sync(
157            public_key,
158            &slot_info.global_challenge,
159            slot_info.voting_solution_range,
160            &self.0,
161            sectors_metadata,
162            sectors_being_modified,
163        )?;
164
165        Ok(audit_results
166            .into_iter()
167            .filter_map(|audit_results| {
168                let sector_index = audit_results.sector_index;
169
170                let sector_solutions = audit_results.solution_candidates.into_solutions(
171                    reward_address,
172                    kzg,
173                    erasure_coding,
174                    mode,
175                    |seed: &PosSeed| table_generator.lock().generate_parallel(seed),
176                );
177
178                let sector_solutions = match sector_solutions {
179                    Ok(solutions) => solutions,
180                    Err(error) => {
181                        warn!(
182                            %error,
183                            %sector_index,
184                            "Failed to turn solution candidates into solutions",
185                        );
186
187                        return None;
188                    }
189                };
190
191                if sector_solutions.len() == 0 {
192                    return None;
193                }
194
195                Some((sector_index, sector_solutions))
196            })
197            .collect())
198    }
199}
200
201pub(super) struct FarmingOptions<NC, PlotAudit> {
202    pub(super) public_key: PublicKey,
203    pub(super) reward_address: PublicKey,
204    pub(super) node_client: NC,
205    pub(super) plot_audit: PlotAudit,
206    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
207    pub(super) kzg: Kzg,
208    pub(super) erasure_coding: ErasureCoding,
209    pub(super) handlers: Arc<Handlers>,
210    pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
211    pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
212    pub(super) thread_pool: ThreadPool,
213    pub(super) read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
214    pub(super) global_mutex: Arc<AsyncMutex<()>>,
215    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
216}
217
218/// Starts farming process.
219///
220/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
221/// thread.
222pub(super) async fn farming<'a, PosTable, NC, Plot>(
223    farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
224) -> Result<(), FarmingError>
225where
226    PosTable: Table,
227    NC: NodeClient,
228    Plot: ReadAtSync + 'a,
229{
230    let FarmingOptions {
231        public_key,
232        reward_address,
233        node_client,
234        plot_audit,
235        sectors_metadata,
236        kzg,
237        erasure_coding,
238        handlers,
239        sectors_being_modified,
240        mut slot_info_notifications,
241        thread_pool,
242        read_sector_record_chunks_mode,
243        global_mutex,
244        metrics,
245    } = farming_options;
246
247    let farmer_app_info = node_client
248        .farmer_app_info()
249        .await
250        .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
251
252    // We assume that each slot is one second
253    let farming_timeout = farmer_app_info.farming_timeout;
254
255    let table_generator = Arc::new(Mutex::new(PosTable::generator()));
256    let span = Span::current();
257
258    let mut non_fatal_errors = 0;
259
260    while let Some(slot_info) = slot_info_notifications.next().await {
261        let slot = slot_info.slot_number;
262
263        // Take mutex briefly to make sure farming is allowed right now
264        global_mutex.lock().await;
265
266        let mut problematic_sectors = Vec::new();
267        let result: Result<(), FarmingError> = try {
268            let start = Instant::now();
269            let sectors_metadata = sectors_metadata.read().await;
270
271            debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
272
273            let mut sectors_solutions = {
274                let sectors_being_modified = &*sectors_being_modified.read().await;
275
276                thread_pool.install(|| {
277                    let _span_guard = span.enter();
278
279                    plot_audit.audit(PlotAuditOptions::<PosTable> {
280                        public_key: &public_key,
281                        reward_address: &reward_address,
282                        slot_info,
283                        sectors_metadata: &sectors_metadata,
284                        kzg: &kzg,
285                        erasure_coding: &erasure_coding,
286                        sectors_being_modified,
287                        read_sector_record_chunks_mode,
288                        table_generator: &table_generator,
289                    })
290                })?
291            };
292
293            sectors_solutions.sort_by(|a, b| {
294                let a_solution_distance =
295                    a.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
296                let b_solution_distance =
297                    b.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
298
299                a_solution_distance.cmp(&b_solution_distance)
300            });
301
302            {
303                let time = start.elapsed();
304                if let Some(metrics) = &metrics {
305                    metrics.auditing_time.observe(time.as_secs_f64());
306                }
307                handlers
308                    .farming_notification
309                    .call_simple(&FarmingNotification::Auditing(AuditingDetails {
310                        sectors_count: sectors_metadata.len() as SectorIndex,
311                        time,
312                    }));
313            }
314
315            // Take mutex and hold until proving end to make sure nothing else major happens at the
316            // same time
317            let _proving_guard = global_mutex.lock().await;
318
319            'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
320                if sector_solutions.is_empty() {
321                    continue;
322                }
323                let mut start = Instant::now();
324                while let Some(maybe_solution) = thread_pool.install(|| {
325                    let _span_guard = span.enter();
326
327                    sector_solutions.next()
328                }) {
329                    let solution = match maybe_solution {
330                        Ok(solution) => solution,
331                        Err(error) => {
332                            if let Some(metrics) = &metrics {
333                                metrics
334                                    .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
335                            }
336                            error!(
337                                %slot,
338                                %sector_index,
339                                %error,
340                                "Failed to prove, scheduling sector for replotting"
341                            );
342                            problematic_sectors.push(sector_index);
343                            // Do not error completely as disk corruption or other reasons why
344                            // proving might fail
345                            start = Instant::now();
346                            continue;
347                        }
348                    };
349
350                    debug!(%slot, %sector_index, "Solution found");
351                    trace!(?solution, "Solution found");
352
353                    {
354                        let time = start.elapsed();
355                        if time >= farming_timeout {
356                            if let Some(metrics) = &metrics {
357                                metrics.observe_proving_time(&time, ProvingResult::Timeout);
358                            }
359                            handlers.farming_notification.call_simple(
360                                &FarmingNotification::Proving(ProvingDetails {
361                                    result: ProvingResult::Timeout,
362                                    time,
363                                }),
364                            );
365                            warn!(
366                                %slot,
367                                %sector_index,
368                                "Proving for solution skipped due to farming time limit",
369                            );
370
371                            break 'solutions_processing;
372                        }
373                    }
374
375                    let response = SolutionResponse {
376                        slot_number: slot,
377                        solution,
378                    };
379
380                    handlers.solution.call_simple(&response);
381
382                    if let Err(error) = node_client.submit_solution_response(response).await {
383                        let time = start.elapsed();
384                        if let Some(metrics) = &metrics {
385                            metrics.observe_proving_time(&time, ProvingResult::Rejected);
386                        }
387                        handlers
388                            .farming_notification
389                            .call_simple(&FarmingNotification::Proving(ProvingDetails {
390                                result: ProvingResult::Rejected,
391                                time,
392                            }));
393                        warn!(
394                            %slot,
395                            %sector_index,
396                            %error,
397                            "Failed to send solution to node, skipping further proving for this slot",
398                        );
399                        break 'solutions_processing;
400                    }
401
402                    let time = start.elapsed();
403                    if let Some(metrics) = &metrics {
404                        metrics.observe_proving_time(&time, ProvingResult::Success);
405                    }
406                    handlers
407                        .farming_notification
408                        .call_simple(&FarmingNotification::Proving(ProvingDetails {
409                            result: ProvingResult::Success,
410                            time,
411                        }));
412                    start = Instant::now();
413                }
414            }
415        };
416
417        if let Err(error) = result {
418            if error.is_fatal() {
419                return Err(error);
420            }
421
422            non_fatal_errors += 1;
423
424            if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
425                return Err(error);
426            }
427
428            warn!(
429                %error,
430                "Non-fatal farming error"
431            );
432
433            if let Some(metrics) = &metrics {
434                metrics.note_farming_error(&error);
435            }
436            handlers
437                .farming_notification
438                .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
439
440            for sector_index in problematic_sectors.drain(..) {
441                // Inform others that this sector is being modified
442                sectors_being_modified.write().await.insert(sector_index);
443                // Replace metadata with a dummy one, so it will be picked up for replotting next
444                if let Some(existing_sector_metadata) = sectors_metadata
445                    .write()
446                    .await
447                    .get_mut(sector_index as usize)
448                {
449                    *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
450                        sector_index,
451                        pieces_in_sector: existing_sector_metadata.pieces_in_sector,
452                        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
453                        history_size: HistorySize::from(SegmentIndex::ZERO),
454                    });
455                }
456                // Inform others that this sector is no longer being modified
457                sectors_being_modified.write().await.remove(&sector_index);
458            }
459        } else {
460            non_fatal_errors = 0;
461        }
462    }
463
464    Ok(())
465}