1pub mod rayon_files;
7
8use crate::farm::{
9 AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
15use futures::StreamExt;
16use futures::channel::mpsc;
17use parking_lot::Mutex;
18use rayon::ThreadPool;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::time::Instant;
22use subspace_core_primitives::PublicKey;
23use subspace_core_primitives::pieces::Record;
24use subspace_core_primitives::pos::PosSeed;
25use subspace_core_primitives::sectors::SectorIndex;
26use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
27use subspace_core_primitives::solutions::{Solution, SolutionRange};
28use subspace_erasure_coding::ErasureCoding;
29use subspace_farmer_components::ReadAtSync;
30use subspace_farmer_components::auditing::{AuditingError, audit_plot_sync};
31use subspace_farmer_components::proving::{ProvableSolutions, ProvingError};
32use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
33use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
34use subspace_kzg::Kzg;
35use subspace_proof_of_space::{Table, TableGenerator};
36use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
37use tracing::{Span, debug, error, info, trace, warn};
38
39const NON_FATAL_ERROR_LIMIT: usize = 10;
41
42pub(super) async fn slot_notification_forwarder<NC>(
43 node_client: &NC,
44 mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
45 metrics: Option<Arc<SingleDiskFarmMetrics>>,
46) -> Result<(), FarmingError>
47where
48 NC: NodeClient,
49{
50 info!("Subscribing to slot info notifications");
51
52 let mut slot_info_notifications = node_client
53 .subscribe_slot_info()
54 .await
55 .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
56
57 while let Some(slot_info) = slot_info_notifications.next().await {
58 debug!(?slot_info, "New slot");
59
60 let slot = slot_info.slot_number;
61
62 if slot_info_forwarder_sender.try_send(slot_info).is_err() {
65 if let Some(metrics) = &metrics {
66 metrics.skipped_slots.inc();
67 }
68 debug!(%slot, "Slow farming, skipping slot");
69 }
70 }
71
72 Err(FarmingError::SlotNotificationStreamEnded)
73}
74
75#[derive(Debug)]
77pub struct PlotAuditOptions<'a, 'b, PosTable>
78where
79 PosTable: Table,
80{
81 pub public_key: &'a PublicKey,
83 pub reward_address: &'a PublicKey,
85 pub slot_info: SlotInfo,
87 pub sectors_metadata: &'a [SectorMetadataChecksummed],
89 pub kzg: &'a Kzg,
91 pub erasure_coding: &'a ErasureCoding,
93 pub sectors_being_modified: &'b HashSet<SectorIndex>,
96 pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
98 pub table_generator: &'a Mutex<PosTable::Generator>,
100}
101
102impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
103where
104 PosTable: Table,
105{
106 #[inline]
107 fn clone(&self) -> Self {
108 *self
109 }
110}
111
112impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
113
114#[derive(Debug)]
116pub struct PlotAudit<Plot>(Plot)
117where
118 Plot: ReadAtSync;
119
120impl<'a, Plot> PlotAudit<Plot>
121where
122 Plot: ReadAtSync + 'a,
123{
124 pub fn new(plot: Plot) -> Self {
126 Self(plot)
127 }
128
129 #[allow(clippy::type_complexity)]
131 pub fn audit<'b, PosTable>(
132 &'a self,
133 options: PlotAuditOptions<'a, 'b, PosTable>,
134 ) -> Result<
135 Vec<(
136 SectorIndex,
137 impl ProvableSolutions<Item = Result<Solution<PublicKey>, ProvingError>>
138 + use<'a, PosTable, Plot>,
139 )>,
140 AuditingError,
141 >
142 where
143 PosTable: Table,
144 {
145 let PlotAuditOptions {
146 public_key,
147 reward_address,
148 slot_info,
149 sectors_metadata,
150 kzg,
151 erasure_coding,
152 sectors_being_modified,
153 read_sector_record_chunks_mode: mode,
154 table_generator,
155 } = options;
156
157 let audit_results = audit_plot_sync(
158 public_key,
159 &slot_info.global_challenge,
160 slot_info.voting_solution_range,
161 &self.0,
162 sectors_metadata,
163 sectors_being_modified,
164 )?;
165
166 Ok(audit_results
167 .into_iter()
168 .filter_map(|audit_results| {
169 let sector_index = audit_results.sector_index;
170
171 let sector_solutions = audit_results.solution_candidates.into_solutions(
172 reward_address,
173 kzg,
174 erasure_coding,
175 mode,
176 |seed: &PosSeed| table_generator.lock().generate_parallel(seed),
177 );
178
179 let sector_solutions = match sector_solutions {
180 Ok(solutions) => solutions,
181 Err(error) => {
182 warn!(
183 %error,
184 %sector_index,
185 "Failed to turn solution candidates into solutions",
186 );
187
188 return None;
189 }
190 };
191
192 if sector_solutions.len() == 0 {
193 return None;
194 }
195
196 Some((sector_index, sector_solutions))
197 })
198 .collect())
199 }
200}
201
202pub(super) struct FarmingOptions<NC, PlotAudit> {
203 pub(super) public_key: PublicKey,
204 pub(super) reward_address: PublicKey,
205 pub(super) node_client: NC,
206 pub(super) plot_audit: PlotAudit,
207 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
208 pub(super) kzg: Kzg,
209 pub(super) erasure_coding: ErasureCoding,
210 pub(super) handlers: Arc<Handlers>,
211 pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
212 pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
213 pub(super) thread_pool: ThreadPool,
214 pub(super) read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
215 pub(super) global_mutex: Arc<AsyncMutex<()>>,
216 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
217}
218
219pub(super) async fn farming<'a, PosTable, NC, Plot>(
224 farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
225) -> Result<(), FarmingError>
226where
227 PosTable: Table,
228 NC: NodeClient,
229 Plot: ReadAtSync + 'a,
230{
231 let FarmingOptions {
232 public_key,
233 reward_address,
234 node_client,
235 plot_audit,
236 sectors_metadata,
237 kzg,
238 erasure_coding,
239 handlers,
240 sectors_being_modified,
241 mut slot_info_notifications,
242 thread_pool,
243 read_sector_record_chunks_mode,
244 global_mutex,
245 metrics,
246 } = farming_options;
247
248 let farmer_app_info = node_client
249 .farmer_app_info()
250 .await
251 .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
252
253 let farming_timeout = farmer_app_info.farming_timeout;
255
256 let table_generator = Arc::new(Mutex::new(PosTable::generator()));
257 let span = Span::current();
258
259 let mut non_fatal_errors = 0;
260
261 while let Some(slot_info) = slot_info_notifications.next().await {
262 let slot = slot_info.slot_number;
263
264 global_mutex.lock().await;
266
267 let mut problematic_sectors = Vec::new();
268 let result = try {
269 let start = Instant::now();
270 let sectors_metadata = sectors_metadata.read().await;
271
272 debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
273
274 let mut sectors_solutions = {
275 let sectors_being_modified = &*sectors_being_modified.read().await;
276
277 thread_pool
278 .install(|| {
279 let _span_guard = span.enter();
280
281 plot_audit.audit(PlotAuditOptions::<PosTable> {
282 public_key: &public_key,
283 reward_address: &reward_address,
284 slot_info,
285 sectors_metadata: §ors_metadata,
286 kzg: &kzg,
287 erasure_coding: &erasure_coding,
288 sectors_being_modified,
289 read_sector_record_chunks_mode,
290 table_generator: &table_generator,
291 })
292 })
293 .map_err(FarmingError::LowLevelAuditing)?
294 };
295
296 sectors_solutions.sort_by(|a, b| {
297 let a_solution_distance =
298 a.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
299 let b_solution_distance =
300 b.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
301
302 a_solution_distance.cmp(&b_solution_distance)
303 });
304
305 {
306 let time = start.elapsed();
307 if let Some(metrics) = &metrics {
308 metrics.auditing_time.observe(time.as_secs_f64());
309 }
310 handlers
311 .farming_notification
312 .call_simple(&FarmingNotification::Auditing(AuditingDetails {
313 sectors_count: sectors_metadata.len() as SectorIndex,
314 time,
315 }));
316 }
317
318 let _proving_guard = global_mutex.lock().await;
321
322 'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
323 if sector_solutions.is_empty() {
324 continue;
325 }
326 let mut start = Instant::now();
327 while let Some(maybe_solution) = thread_pool.install(|| {
328 let _span_guard = span.enter();
329
330 sector_solutions.next()
331 }) {
332 let solution = match maybe_solution {
333 Ok(solution) => solution,
334 Err(error) => {
335 if let Some(metrics) = &metrics {
336 metrics
337 .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
338 }
339 error!(
340 %slot,
341 %sector_index,
342 %error,
343 "Failed to prove, scheduling sector for replotting"
344 );
345 problematic_sectors.push(sector_index);
346 start = Instant::now();
349 continue;
350 }
351 };
352
353 debug!(%slot, %sector_index, "Solution found");
354 trace!(?solution, "Solution found");
355
356 {
357 let time = start.elapsed();
358 if time >= farming_timeout {
359 if let Some(metrics) = &metrics {
360 metrics.observe_proving_time(&time, ProvingResult::Timeout);
361 }
362 handlers.farming_notification.call_simple(
363 &FarmingNotification::Proving(ProvingDetails {
364 result: ProvingResult::Timeout,
365 time,
366 }),
367 );
368 warn!(
369 %slot,
370 %sector_index,
371 "Proving for solution skipped due to farming time limit",
372 );
373
374 break 'solutions_processing;
375 }
376 }
377
378 let response = SolutionResponse {
379 slot_number: slot,
380 solution,
381 };
382
383 handlers.solution.call_simple(&response);
384
385 if let Err(error) = node_client.submit_solution_response(response).await {
386 let time = start.elapsed();
387 if let Some(metrics) = &metrics {
388 metrics.observe_proving_time(&time, ProvingResult::Rejected);
389 }
390 handlers
391 .farming_notification
392 .call_simple(&FarmingNotification::Proving(ProvingDetails {
393 result: ProvingResult::Rejected,
394 time,
395 }));
396 warn!(
397 %slot,
398 %sector_index,
399 %error,
400 "Failed to send solution to node, skipping further proving for this slot",
401 );
402 break 'solutions_processing;
403 }
404
405 let time = start.elapsed();
406 if let Some(metrics) = &metrics {
407 metrics.observe_proving_time(&time, ProvingResult::Success);
408 }
409 handlers
410 .farming_notification
411 .call_simple(&FarmingNotification::Proving(ProvingDetails {
412 result: ProvingResult::Success,
413 time,
414 }));
415 start = Instant::now();
416 }
417 }
418 };
419
420 if let Err(error) = result {
421 if error.is_fatal() {
422 return Err(error);
423 }
424
425 non_fatal_errors += 1;
426
427 if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
428 return Err(error);
429 }
430
431 warn!(
432 %error,
433 "Non-fatal farming error"
434 );
435
436 if let Some(metrics) = &metrics {
437 metrics.note_farming_error(&error);
438 }
439 handlers
440 .farming_notification
441 .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
442
443 for sector_index in problematic_sectors.drain(..) {
444 sectors_being_modified.write().await.insert(sector_index);
446 if let Some(existing_sector_metadata) = sectors_metadata
448 .write()
449 .await
450 .get_mut(sector_index as usize)
451 {
452 *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
453 sector_index,
454 pieces_in_sector: existing_sector_metadata.pieces_in_sector,
455 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
456 history_size: HistorySize::from(SegmentIndex::ZERO),
457 });
458 }
459 sectors_being_modified.write().await.remove(§or_index);
461 }
462 } else {
463 non_fatal_errors = 0;
464 }
465 }
466
467 Ok(())
468}