1pub mod rayon_files;
7
8use crate::farm::{
9 AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
13use crate::single_disk_farm::Handlers;
14use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
15use futures::channel::mpsc;
16use futures::StreamExt;
17use parking_lot::Mutex;
18use rayon::ThreadPool;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::time::Instant;
22use subspace_core_primitives::pieces::Record;
23use subspace_core_primitives::pos::PosSeed;
24use subspace_core_primitives::sectors::SectorIndex;
25use subspace_core_primitives::segments::{HistorySize, SegmentIndex};
26use subspace_core_primitives::solutions::{Solution, SolutionRange};
27use subspace_core_primitives::PublicKey;
28use subspace_erasure_coding::ErasureCoding;
29use subspace_farmer_components::auditing::{audit_plot_sync, AuditingError};
30use subspace_farmer_components::proving::{ProvableSolutions, ProvingError};
31use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
32use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
33use subspace_farmer_components::ReadAtSync;
34use subspace_kzg::Kzg;
35use subspace_proof_of_space::{Table, TableGenerator};
36use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
37use tracing::{debug, error, info, trace, warn, Span};
38
39const NON_FATAL_ERROR_LIMIT: usize = 10;
41
42pub(super) async fn slot_notification_forwarder<NC>(
43 node_client: &NC,
44 mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
45 metrics: Option<Arc<SingleDiskFarmMetrics>>,
46) -> Result<(), FarmingError>
47where
48 NC: NodeClient,
49{
50 info!("Subscribing to slot info notifications");
51
52 let mut slot_info_notifications = node_client
53 .subscribe_slot_info()
54 .await
55 .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
56
57 while let Some(slot_info) = slot_info_notifications.next().await {
58 debug!(?slot_info, "New slot");
59
60 let slot = slot_info.slot_number;
61
62 if slot_info_forwarder_sender.try_send(slot_info).is_err() {
65 if let Some(metrics) = &metrics {
66 metrics.skipped_slots.inc();
67 }
68 debug!(%slot, "Slow farming, skipping slot");
69 }
70 }
71
72 Err(FarmingError::SlotNotificationStreamEnded)
73}
74
75#[derive(Debug)]
77pub struct PlotAuditOptions<'a, 'b, PosTable>
78where
79 PosTable: Table,
80{
81 pub public_key: &'a PublicKey,
83 pub reward_address: &'a PublicKey,
85 pub slot_info: SlotInfo,
87 pub sectors_metadata: &'a [SectorMetadataChecksummed],
89 pub kzg: &'a Kzg,
91 pub erasure_coding: &'a ErasureCoding,
93 pub sectors_being_modified: &'b HashSet<SectorIndex>,
96 pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
98 pub table_generator: &'a Mutex<PosTable::Generator>,
100}
101
102impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
103where
104 PosTable: Table,
105{
106 #[inline]
107 fn clone(&self) -> Self {
108 *self
109 }
110}
111
112impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
113
114#[derive(Debug)]
116pub struct PlotAudit<Plot>(Plot)
117where
118 Plot: ReadAtSync;
119
120impl<'a, Plot> PlotAudit<Plot>
121where
122 Plot: ReadAtSync + 'a,
123{
124 pub fn new(plot: Plot) -> Self {
126 Self(plot)
127 }
128
129 #[allow(clippy::type_complexity)]
131 pub fn audit<'b, PosTable>(
132 &'a self,
133 options: PlotAuditOptions<'a, 'b, PosTable>,
134 ) -> Result<
135 Vec<(
136 SectorIndex,
137 impl ProvableSolutions<Item = Result<Solution<PublicKey>, ProvingError>> + 'a,
138 )>,
139 AuditingError,
140 >
141 where
142 PosTable: Table,
143 {
144 let PlotAuditOptions {
145 public_key,
146 reward_address,
147 slot_info,
148 sectors_metadata,
149 kzg,
150 erasure_coding,
151 sectors_being_modified,
152 read_sector_record_chunks_mode: mode,
153 table_generator,
154 } = options;
155
156 let audit_results = audit_plot_sync(
157 public_key,
158 &slot_info.global_challenge,
159 slot_info.voting_solution_range,
160 &self.0,
161 sectors_metadata,
162 sectors_being_modified,
163 )?;
164
165 Ok(audit_results
166 .into_iter()
167 .filter_map(|audit_results| {
168 let sector_index = audit_results.sector_index;
169
170 let sector_solutions = audit_results.solution_candidates.into_solutions(
171 reward_address,
172 kzg,
173 erasure_coding,
174 mode,
175 |seed: &PosSeed| table_generator.lock().generate_parallel(seed),
176 );
177
178 let sector_solutions = match sector_solutions {
179 Ok(solutions) => solutions,
180 Err(error) => {
181 warn!(
182 %error,
183 %sector_index,
184 "Failed to turn solution candidates into solutions",
185 );
186
187 return None;
188 }
189 };
190
191 if sector_solutions.len() == 0 {
192 return None;
193 }
194
195 Some((sector_index, sector_solutions))
196 })
197 .collect())
198 }
199}
200
201pub(super) struct FarmingOptions<NC, PlotAudit> {
202 pub(super) public_key: PublicKey,
203 pub(super) reward_address: PublicKey,
204 pub(super) node_client: NC,
205 pub(super) plot_audit: PlotAudit,
206 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
207 pub(super) kzg: Kzg,
208 pub(super) erasure_coding: ErasureCoding,
209 pub(super) handlers: Arc<Handlers>,
210 pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
211 pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
212 pub(super) thread_pool: ThreadPool,
213 pub(super) read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
214 pub(super) global_mutex: Arc<AsyncMutex<()>>,
215 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
216}
217
218pub(super) async fn farming<'a, PosTable, NC, Plot>(
223 farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
224) -> Result<(), FarmingError>
225where
226 PosTable: Table,
227 NC: NodeClient,
228 Plot: ReadAtSync + 'a,
229{
230 let FarmingOptions {
231 public_key,
232 reward_address,
233 node_client,
234 plot_audit,
235 sectors_metadata,
236 kzg,
237 erasure_coding,
238 handlers,
239 sectors_being_modified,
240 mut slot_info_notifications,
241 thread_pool,
242 read_sector_record_chunks_mode,
243 global_mutex,
244 metrics,
245 } = farming_options;
246
247 let farmer_app_info = node_client
248 .farmer_app_info()
249 .await
250 .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
251
252 let farming_timeout = farmer_app_info.farming_timeout;
254
255 let table_generator = Arc::new(Mutex::new(PosTable::generator()));
256 let span = Span::current();
257
258 let mut non_fatal_errors = 0;
259
260 while let Some(slot_info) = slot_info_notifications.next().await {
261 let slot = slot_info.slot_number;
262
263 global_mutex.lock().await;
265
266 let mut problematic_sectors = Vec::new();
267 let result: Result<(), FarmingError> = try {
268 let start = Instant::now();
269 let sectors_metadata = sectors_metadata.read().await;
270
271 debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
272
273 let mut sectors_solutions = {
274 let sectors_being_modified = &*sectors_being_modified.read().await;
275
276 thread_pool.install(|| {
277 let _span_guard = span.enter();
278
279 plot_audit.audit(PlotAuditOptions::<PosTable> {
280 public_key: &public_key,
281 reward_address: &reward_address,
282 slot_info,
283 sectors_metadata: §ors_metadata,
284 kzg: &kzg,
285 erasure_coding: &erasure_coding,
286 sectors_being_modified,
287 read_sector_record_chunks_mode,
288 table_generator: &table_generator,
289 })
290 })?
291 };
292
293 sectors_solutions.sort_by(|a, b| {
294 let a_solution_distance =
295 a.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
296 let b_solution_distance =
297 b.1.best_solution_distance().unwrap_or(SolutionRange::MAX);
298
299 a_solution_distance.cmp(&b_solution_distance)
300 });
301
302 {
303 let time = start.elapsed();
304 if let Some(metrics) = &metrics {
305 metrics.auditing_time.observe(time.as_secs_f64());
306 }
307 handlers
308 .farming_notification
309 .call_simple(&FarmingNotification::Auditing(AuditingDetails {
310 sectors_count: sectors_metadata.len() as SectorIndex,
311 time,
312 }));
313 }
314
315 let _proving_guard = global_mutex.lock().await;
318
319 'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
320 if sector_solutions.is_empty() {
321 continue;
322 }
323 let mut start = Instant::now();
324 while let Some(maybe_solution) = thread_pool.install(|| {
325 let _span_guard = span.enter();
326
327 sector_solutions.next()
328 }) {
329 let solution = match maybe_solution {
330 Ok(solution) => solution,
331 Err(error) => {
332 if let Some(metrics) = &metrics {
333 metrics
334 .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
335 }
336 error!(
337 %slot,
338 %sector_index,
339 %error,
340 "Failed to prove, scheduling sector for replotting"
341 );
342 problematic_sectors.push(sector_index);
343 start = Instant::now();
346 continue;
347 }
348 };
349
350 debug!(%slot, %sector_index, "Solution found");
351 trace!(?solution, "Solution found");
352
353 {
354 let time = start.elapsed();
355 if time >= farming_timeout {
356 if let Some(metrics) = &metrics {
357 metrics.observe_proving_time(&time, ProvingResult::Timeout);
358 }
359 handlers.farming_notification.call_simple(
360 &FarmingNotification::Proving(ProvingDetails {
361 result: ProvingResult::Timeout,
362 time,
363 }),
364 );
365 warn!(
366 %slot,
367 %sector_index,
368 "Proving for solution skipped due to farming time limit",
369 );
370
371 break 'solutions_processing;
372 }
373 }
374
375 let response = SolutionResponse {
376 slot_number: slot,
377 solution,
378 };
379
380 handlers.solution.call_simple(&response);
381
382 if let Err(error) = node_client.submit_solution_response(response).await {
383 let time = start.elapsed();
384 if let Some(metrics) = &metrics {
385 metrics.observe_proving_time(&time, ProvingResult::Rejected);
386 }
387 handlers
388 .farming_notification
389 .call_simple(&FarmingNotification::Proving(ProvingDetails {
390 result: ProvingResult::Rejected,
391 time,
392 }));
393 warn!(
394 %slot,
395 %sector_index,
396 %error,
397 "Failed to send solution to node, skipping further proving for this slot",
398 );
399 break 'solutions_processing;
400 }
401
402 let time = start.elapsed();
403 if let Some(metrics) = &metrics {
404 metrics.observe_proving_time(&time, ProvingResult::Success);
405 }
406 handlers
407 .farming_notification
408 .call_simple(&FarmingNotification::Proving(ProvingDetails {
409 result: ProvingResult::Success,
410 time,
411 }));
412 start = Instant::now();
413 }
414 }
415 };
416
417 if let Err(error) = result {
418 if error.is_fatal() {
419 return Err(error);
420 }
421
422 non_fatal_errors += 1;
423
424 if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
425 return Err(error);
426 }
427
428 warn!(
429 %error,
430 "Non-fatal farming error"
431 );
432
433 if let Some(metrics) = &metrics {
434 metrics.note_farming_error(&error);
435 }
436 handlers
437 .farming_notification
438 .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
439
440 for sector_index in problematic_sectors.drain(..) {
441 sectors_being_modified.write().await.insert(sector_index);
443 if let Some(existing_sector_metadata) = sectors_metadata
445 .write()
446 .await
447 .get_mut(sector_index as usize)
448 {
449 *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
450 sector_index,
451 pieces_in_sector: existing_sector_metadata.pieces_in_sector,
452 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
453 history_size: HistorySize::from(SegmentIndex::ZERO),
454 });
455 }
456 sectors_being_modified.write().await.remove(§or_index);
458 }
459 } else {
460 non_fatal_errors = 0;
461 }
462 }
463
464 Ok(())
465}