1pub mod rayon_files;
7
8use crate::farm::{
9 AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
15use futures::StreamExt;
16use futures::channel::mpsc;
17use parking_lot::Mutex;
18use rayon::ThreadPool;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::time::Instant;
22use subspace_core_primitives::PublicKey;
23use subspace_core_primitives::pieces::Record;
24use subspace_core_primitives::pos::PosSeed;
25use subspace_core_primitives::sectors::SectorIndex;
26use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
27use subspace_core_primitives::solutions::{Solution, SolutionRange};
28use subspace_erasure_coding::ErasureCoding;
29use subspace_farmer_components::ReadAtSync;
30use subspace_farmer_components::auditing::{AuditingError, audit_plot_sync};
31use subspace_farmer_components::proving::{ProvableSolutions, ProvingError};
32use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
33use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
34use subspace_kzg::Kzg;
35use subspace_proof_of_space::{Table, TableGenerator};
36use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
37use tracing::{Span, debug, error, info, trace, warn};
38
39const NON_FATAL_ERROR_LIMIT: usize = 10;
41
42pub(super) async fn slot_notification_forwarder<NC>(
43 node_client: &NC,
44 mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
45 metrics: Option<Arc<SingleDiskFarmMetrics>>,
46) -> Result<(), FarmingError>
47where
48 NC: NodeClient,
49{
50 info!("Subscribing to slot info notifications");
51
52 let mut slot_info_notifications = node_client
53 .subscribe_slot_info()
54 .await
55 .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
56
57 while let Some(slot_info) = slot_info_notifications.next().await {
58 debug!(?slot_info, "New slot");
59
60 let slot = slot_info.slot_number;
61
62 if slot_info_forwarder_sender.try_send(slot_info).is_err() {
65 if let Some(metrics) = &metrics {
66 metrics.skipped_slots.inc();
67 }
68 debug!(%slot, "Slow farming, skipping slot");
69 }
70 }
71
72 Err(FarmingError::SlotNotificationStreamEnded)
73}
74
75#[derive(Debug)]
77pub struct PlotAuditOptions<'a, 'b, PosTable>
78where
79 PosTable: Table,
80{
81 pub public_key: &'a PublicKey,
83 pub reward_address: &'a PublicKey,
85 pub slot_info: SlotInfo,
87 pub sectors_metadata: &'a [SectorMetadataChecksummed],
89 pub kzg: &'a Kzg,
91 pub erasure_coding: &'a ErasureCoding,
93 pub sectors_being_modified: &'b HashSet<SectorIndex>,
96 pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
98 pub table_generator: &'a Mutex<PosTable::Generator>,
100}
101
102impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
103where
104 PosTable: Table,
105{
106 #[inline]
107 fn clone(&self) -> Self {
108 *self
109 }
110}
111
112impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
113
114#[derive(Debug)]
116pub struct PlotAudit<Plot>(Plot)
117where
118 Plot: ReadAtSync;
119
120impl<'a, Plot> PlotAudit<Plot>
121where
122 Plot: ReadAtSync + 'a,
123{
124 pub fn new(plot: Plot) -> Self {
126 Self(plot)
127 }
128
129 #[allow(clippy::type_complexity)]
131 pub fn audit<'b, PosTable>(
132 &'a self,
133 options: PlotAuditOptions<'a, 'b, PosTable>,
134 ) -> Result<
135 Vec<(
136 SectorIndex,
137 impl ProvableSolutions<Item = Result<Solution<PublicKey>, ProvingError>>
138 + use<'a, PosTable, Plot>,
139 )>,
140 AuditingError,
141 >
142 where
143 PosTable: Table,
144 {
145 let PlotAuditOptions {
146 public_key,
147 reward_address,
148 slot_info,
149 sectors_metadata,
150 kzg,
151 erasure_coding,
152 sectors_being_modified,
153 read_sector_record_chunks_mode: mode,
154 table_generator,
155 } = options;
156
157 let audit_results = audit_plot_sync(
158 public_key,
159 &slot_info.global_challenge,
160 slot_info.voting_solution_range,
161 &self.0,
162 sectors_metadata,
163 sectors_being_modified,
164 )?;
165
166 Ok(audit_results
167 .into_iter()
168 .filter_map(|audit_results| {
169 let sector_index = audit_results.sector_index;
170
171 let sector_solutions = audit_results.solution_candidates.into_solutions(
172 reward_address,
173 kzg,
174 erasure_coding,
175 mode,
176 |seed: &PosSeed| table_generator.lock().generate_parallel(seed),
177 );
178
179 let sector_solutions = match sector_solutions {
180 Ok(solutions) => solutions,
181 Err(error) => {
182 warn!(
183 %error,
184 %sector_index,
185 "Failed to turn solution candidates into solutions",
186 );
187
188 return None;
189 }
190 };
191
192 if sector_solutions.len() == 0 {
193 return None;
194 }
195
196 Some((sector_index, sector_solutions))
197 })
198 .collect())
199 }
200}
201
202pub(super) struct FarmingOptions<NC, PlotAudit> {
203 pub(super) public_key: PublicKey,
204 pub(super) reward_address: PublicKey,
205 pub(super) node_client: NC,
206 pub(super) plot_audit: PlotAudit,
207 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
208 pub(super) kzg: Kzg,
209 pub(super) erasure_coding: ErasureCoding,
210 pub(super) handlers: Arc<Handlers>,
211 pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
212 pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
213 pub(super) thread_pool: ThreadPool,
214 pub(super) read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
215 pub(super) global_mutex: Arc<AsyncMutex<()>>,
216 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
217}
218
219pub(super) async fn farming<'a, PosTable, NC, Plot>(
224 farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
225) -> Result<(), FarmingError>
226where
227 PosTable: Table,
228 NC: NodeClient,
229 Plot: ReadAtSync + 'a,
230{
231 let FarmingOptions {
232 public_key,
233 reward_address,
234 node_client,
235 plot_audit,
236 sectors_metadata,
237 kzg,
238 erasure_coding,
239 handlers,
240 sectors_being_modified,
241 mut slot_info_notifications,
242 thread_pool,
243 read_sector_record_chunks_mode,
244 global_mutex,
245 metrics,
246 } = farming_options;
247
248 let farmer_app_info = node_client
249 .farmer_app_info()
250 .await
251 .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
252
253 let farming_timeout = farmer_app_info.farming_timeout;
255
256 let table_generator = Arc::new(Mutex::new(PosTable::generator()));
257 let span = Span::current();
258
259 let mut non_fatal_errors = 0;
260
261 while let Some(slot_info) = slot_info_notifications.next().await {
262 let slot = slot_info.slot_number;
263
264 global_mutex.lock().await;
266
267 let mut problematic_sectors = Vec::new();
268 let result: Result<(), FarmingError> = try {
269 let start = Instant::now();
270 let sectors_metadata = sectors_metadata.read().await;
271
272 debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
273
274 let mut sectors_solutions = {
275 let sectors_being_modified = &*sectors_being_modified.read().await;
276
277 thread_pool.install(|| {
278 let _span_guard = span.enter();
279
280 plot_audit.audit(PlotAuditOptions::<PosTable> {
281 public_key: &public_key,
282 reward_address: &reward_address,
283 slot_info,
284 sectors_metadata: §ors_metadata,
285 kzg: &kzg,
286 erasure_coding: &erasure_coding,
287 sectors_being_modified,
288 read_sector_record_chunks_mode,
289 table_generator: &table_generator,
290 })
291 })?
292 };
293
294 sectors_solutions.sort_by(|a, b| {
295 let a_solution_distance =
296 a.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
297 let b_solution_distance =
298 b.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
299
300 a_solution_distance.cmp(&b_solution_distance)
301 });
302
303 {
304 let time = start.elapsed();
305 if let Some(metrics) = &metrics {
306 metrics.auditing_time.observe(time.as_secs_f64());
307 }
308 handlers
309 .farming_notification
310 .call_simple(&FarmingNotification::Auditing(AuditingDetails {
311 sectors_count: sectors_metadata.len() as SectorIndex,
312 time,
313 }));
314 }
315
316 let _proving_guard = global_mutex.lock().await;
319
320 'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
321 if sector_solutions.is_empty() {
322 continue;
323 }
324 let mut start = Instant::now();
325 while let Some(maybe_solution) = thread_pool.install(|| {
326 let _span_guard = span.enter();
327
328 sector_solutions.next()
329 }) {
330 let solution = match maybe_solution {
331 Ok(solution) => solution,
332 Err(error) => {
333 if let Some(metrics) = &metrics {
334 metrics
335 .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
336 }
337 error!(
338 %slot,
339 %sector_index,
340 %error,
341 "Failed to prove, scheduling sector for replotting"
342 );
343 problematic_sectors.push(sector_index);
344 start = Instant::now();
347 continue;
348 }
349 };
350
351 debug!(%slot, %sector_index, "Solution found");
352 trace!(?solution, "Solution found");
353
354 {
355 let time = start.elapsed();
356 if time >= farming_timeout {
357 if let Some(metrics) = &metrics {
358 metrics.observe_proving_time(&time, ProvingResult::Timeout);
359 }
360 handlers.farming_notification.call_simple(
361 &FarmingNotification::Proving(ProvingDetails {
362 result: ProvingResult::Timeout,
363 time,
364 }),
365 );
366 warn!(
367 %slot,
368 %sector_index,
369 "Proving for solution skipped due to farming time limit",
370 );
371
372 break 'solutions_processing;
373 }
374 }
375
376 let response = SolutionResponse {
377 slot_number: slot,
378 solution,
379 };
380
381 handlers.solution.call_simple(&response);
382
383 if let Err(error) = node_client.submit_solution_response(response).await {
384 let time = start.elapsed();
385 if let Some(metrics) = &metrics {
386 metrics.observe_proving_time(&time, ProvingResult::Rejected);
387 }
388 handlers
389 .farming_notification
390 .call_simple(&FarmingNotification::Proving(ProvingDetails {
391 result: ProvingResult::Rejected,
392 time,
393 }));
394 warn!(
395 %slot,
396 %sector_index,
397 %error,
398 "Failed to send solution to node, skipping further proving for this slot",
399 );
400 break 'solutions_processing;
401 }
402
403 let time = start.elapsed();
404 if let Some(metrics) = &metrics {
405 metrics.observe_proving_time(&time, ProvingResult::Success);
406 }
407 handlers
408 .farming_notification
409 .call_simple(&FarmingNotification::Proving(ProvingDetails {
410 result: ProvingResult::Success,
411 time,
412 }));
413 start = Instant::now();
414 }
415 }
416 };
417
418 if let Err(error) = result {
419 if error.is_fatal() {
420 return Err(error);
421 }
422
423 non_fatal_errors += 1;
424
425 if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
426 return Err(error);
427 }
428
429 warn!(
430 %error,
431 "Non-fatal farming error"
432 );
433
434 if let Some(metrics) = &metrics {
435 metrics.note_farming_error(&error);
436 }
437 handlers
438 .farming_notification
439 .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
440
441 for sector_index in problematic_sectors.drain(..) {
442 sectors_being_modified.write().await.insert(sector_index);
444 if let Some(existing_sector_metadata) = sectors_metadata
446 .write()
447 .await
448 .get_mut(sector_index as usize)
449 {
450 *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
451 sector_index,
452 pieces_in_sector: existing_sector_metadata.pieces_in_sector,
453 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
454 history_size: HistorySize::from(SegmentIndex::ZERO),
455 });
456 }
457 sectors_being_modified.write().await.remove(§or_index);
459 }
460 } else {
461 non_fatal_errors = 0;
462 }
463 }
464
465 Ok(())
466}