subspace_farmer/single_disk_farm/
plotting.rs

1use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate};
2use crate::node_client::NodeClient;
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use crate::single_disk_farm::direct_io_file::DirectIoFile;
5use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
6use crate::single_disk_farm::{
7    BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
8};
9use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard};
10use futures::channel::{mpsc, oneshot};
11use futures::stream::FuturesOrdered;
12use futures::{select, FutureExt, SinkExt, StreamExt};
13use parity_scale_codec::Encode;
14use rand::prelude::*;
15use std::collections::HashSet;
16use std::future::Future;
17use std::io;
18use std::num::NonZeroUsize;
19use std::ops::Range;
20use std::pin::pin;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use subspace_core_primitives::hashes::Blake3Hash;
24use subspace_core_primitives::pieces::PieceOffset;
25use subspace_core_primitives::sectors::{SectorId, SectorIndex};
26use subspace_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
27use subspace_core_primitives::PublicKey;
28use subspace_farmer_components::file_ext::FileExt;
29use subspace_farmer_components::plotting::PlottedSector;
30use subspace_farmer_components::sector::SectorMetadataChecksummed;
31use thiserror::Error;
32use tokio::sync::watch;
33use tokio::task;
34use tracing::{debug, info, info_span, trace, warn, Instrument};
35
36const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
37const PLOTTING_RETRY_DELAY: Duration = Duration::from_secs(1);
38
39pub(super) struct SectorToPlot {
40    sector_index: SectorIndex,
41    /// Progress so far in % (not including this sector)
42    progress: f32,
43    /// Whether this is the last sector queued so far
44    last_queued: bool,
45    acknowledgement_sender: oneshot::Sender<()>,
46}
47
48/// Errors that happen during plotting
49#[derive(Debug, Error)]
50pub enum PlottingError {
51    /// Failed to retrieve farmer info
52    #[error("Failed to retrieve farmer info: {error}")]
53    FailedToGetFarmerInfo {
54        /// Lower-level error
55        error: anyhow::Error,
56    },
57    /// Failed to get segment header
58    #[error("Failed to get segment header: {error}")]
59    FailedToGetSegmentHeader {
60        /// Lower-level error
61        error: anyhow::Error,
62    },
63    /// Missing archived segment header
64    #[error("Missing archived segment header: {segment_index}")]
65    MissingArchivedSegmentHeader {
66        /// Segment index that was missing
67        segment_index: SegmentIndex,
68    },
69    /// Failed to subscribe to archived segments
70    #[error("Failed to subscribe to archived segments: {error}")]
71    FailedToSubscribeArchivedSegments {
72        /// Lower-level error
73        error: anyhow::Error,
74    },
75    /// Low-level plotting error
76    #[error("Low-level plotting error: {0}")]
77    LowLevel(String),
78    /// I/O error occurred
79    #[error("Plotting I/O error: {0}")]
80    Io(#[from] io::Error),
81    /// Background downloading panicked
82    #[error("Background downloading panicked")]
83    BackgroundDownloadingPanicked,
84}
85
86pub(super) struct SectorPlottingOptions<'a, NC> {
87    pub(super) public_key: PublicKey,
88    pub(super) node_client: &'a NC,
89    pub(super) pieces_in_sector: u16,
90    pub(super) sector_size: usize,
91    pub(super) plot_file: Arc<DirectIoFile>,
92    pub(super) metadata_file: Arc<DirectIoFile>,
93    pub(super) handlers: &'a Handlers,
94    pub(super) global_mutex: &'a AsyncMutex<()>,
95    pub(super) plotter: Arc<dyn Plotter>,
96    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
97}
98
99pub(super) struct PlottingOptions<'a, NC> {
100    pub(super) metadata_header: PlotMetadataHeader,
101    pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
102    pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
103    pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
104    pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC>,
105    pub(super) max_plotting_sectors_per_farm: NonZeroUsize,
106}
107
108/// Starts plotting process.
109///
110/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
111/// thread.
112pub(super) async fn plotting<NC>(
113    plotting_options: PlottingOptions<'_, NC>,
114) -> Result<(), PlottingError>
115where
116    NC: NodeClient,
117{
118    let PlottingOptions {
119        mut metadata_header,
120        sectors_metadata,
121        sectors_being_modified,
122        mut sectors_to_plot_receiver,
123        sector_plotting_options,
124        max_plotting_sectors_per_farm,
125    } = plotting_options;
126
127    let sector_plotting_options = &sector_plotting_options;
128    let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get());
129    let mut sectors_being_plotted = FuturesOrdered::new();
130    // Channel size is intentionally unbounded for easier analysis, but it is bounded by plotting
131    // semaphore in practice due to permit stored in `SectorPlottingResult`
132    let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded();
133    let process_plotting_result_fut = async move {
134        while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await {
135            process_plotting_result(
136                sector_plotting_result,
137                sectors_metadata,
138                sectors_being_modified,
139                &mut metadata_header,
140                Arc::clone(&sector_plotting_options.metadata_file),
141            )
142            .await?;
143        }
144
145        unreachable!(
146            "Stream will not end before the rest of the plotting process is shutting down"
147        );
148    };
149    let process_plotting_result_fut = process_plotting_result_fut.fuse();
150    let mut process_plotting_result_fut = pin!(process_plotting_result_fut);
151
152    // Wait for new sectors to plot from `sectors_to_plot_receiver` and wait for sectors that
153    // already started plotting to finish plotting and then update metadata header
154    loop {
155        select! {
156            maybe_sector_to_plot = sectors_to_plot_receiver.next() => {
157                let Some(sector_to_plot) = maybe_sector_to_plot else {
158                    break;
159                };
160
161                let sector_index = sector_to_plot.sector_index;
162                let sector_plotting_init_fut = plot_single_sector(
163                    sector_to_plot,
164                    sector_plotting_options,
165                    sectors_metadata,
166                    sectors_being_modified,
167                    &plotting_semaphore,
168                )
169                    .instrument(info_span!("", %sector_index))
170                    .fuse();
171                let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut);
172
173                // Wait for plotting of new sector to start (backpressure), while also waiting
174                // for sectors that already started plotting to finish plotting and then update
175                // metadata header
176                loop {
177                    select! {
178                        sector_plotting_init_result = sector_plotting_init_fut => {
179                            let sector_plotting_fut = match sector_plotting_init_result {
180                                PlotSingleSectorResult::Scheduled(future) => future,
181                                PlotSingleSectorResult::Skipped => {
182                                    break;
183                                }
184                                PlotSingleSectorResult::FatalError(error) => {
185                                    return Err(error);
186                                }
187                            };
188                            sectors_being_plotted.push_back(
189                                sector_plotting_fut.instrument(info_span!("", %sector_index))
190                            );
191                            break;
192                        }
193                        maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
194                            sector_plotting_result_sender
195                                .unbounded_send(maybe_sector_plotting_result?)
196                                .expect("Sending means receiver is not dropped yet; qed");
197                        }
198                        result = process_plotting_result_fut => {
199                            return result;
200                        }
201                    }
202                }
203            }
204            maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
205                sector_plotting_result_sender
206                    .unbounded_send(maybe_sector_plotting_result?)
207                    .expect("Sending means receiver is not dropped yet; qed");
208            }
209            result = process_plotting_result_fut => {
210                return result;
211            }
212        }
213    }
214
215    Ok(())
216}
217
218async fn process_plotting_result(
219    sector_plotting_result: SectorPlottingResult<'_>,
220    sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
221    sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
222    metadata_header: &mut PlotMetadataHeader,
223    metadata_file: Arc<DirectIoFile>,
224) -> Result<(), PlottingError> {
225    let SectorPlottingResult {
226        sector_metadata,
227        replotting,
228        last_queued,
229        plotting_permit,
230    } = sector_plotting_result;
231
232    let sector_index = sector_metadata.sector_index;
233
234    {
235        let mut sectors_metadata = sectors_metadata.write().await;
236        // If exists then we're replotting, otherwise we create sector for the first time
237        if let Some(existing_sector_metadata) = sectors_metadata.get_mut(sector_index as usize) {
238            *existing_sector_metadata = sector_metadata;
239        } else {
240            sectors_metadata.push(sector_metadata);
241        }
242    }
243
244    // Inform others that this sector is no longer being modified
245    sectors_being_modified.write().await.remove(&sector_index);
246
247    if sector_index + 1 > metadata_header.plotted_sector_count {
248        metadata_header.plotted_sector_count = sector_index + 1;
249
250        let encoded_metadata_header = metadata_header.encode();
251        let write_fut =
252            task::spawn_blocking(move || metadata_file.write_all_at(&encoded_metadata_header, 0));
253        write_fut.await.map_err(|error| {
254            PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
255        })??;
256    }
257
258    if last_queued {
259        if replotting {
260            info!("Replotting complete");
261        } else {
262            info!("Initial plotting complete");
263        }
264    }
265
266    drop(plotting_permit);
267
268    Ok(())
269}
270
271enum PlotSingleSectorResult<F> {
272    Scheduled(F),
273    Skipped,
274    FatalError(PlottingError),
275}
276
277struct SectorPlottingResult<'a> {
278    sector_metadata: SectorMetadataChecksummed,
279    replotting: bool,
280    last_queued: bool,
281    plotting_permit: SemaphoreGuard<'a>,
282}
283
284async fn plot_single_sector<'a, NC>(
285    sector_to_plot: SectorToPlot,
286    sector_plotting_options: &'a SectorPlottingOptions<'a, NC>,
287    sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
288    sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
289    plotting_semaphore: &'a Semaphore,
290) -> PlotSingleSectorResult<
291    impl Future<Output = Result<SectorPlottingResult<'a>, PlottingError>> + 'a,
292>
293where
294    NC: NodeClient,
295{
296    let SectorPlottingOptions {
297        public_key,
298        node_client,
299        pieces_in_sector,
300        sector_size,
301        plot_file,
302        metadata_file,
303        handlers,
304        global_mutex,
305        plotter,
306        metrics,
307    } = sector_plotting_options;
308
309    let SectorToPlot {
310        sector_index,
311        progress,
312        last_queued,
313        acknowledgement_sender: _acknowledgement_sender,
314    } = sector_to_plot;
315    trace!("Preparing to plot sector");
316
317    // Inform others that this sector is being modified
318    {
319        let mut sectors_being_modified = sectors_being_modified.write().await;
320        if !sectors_being_modified.insert(sector_index) {
321            debug!("Skipped sector plotting, it is already in progress");
322            return PlotSingleSectorResult::Skipped;
323        }
324    }
325
326    let plotting_permit = plotting_semaphore.acquire().await;
327
328    let maybe_old_sector_metadata = sectors_metadata
329        .read()
330        .await
331        .get(sector_index as usize)
332        .cloned();
333    let replotting = maybe_old_sector_metadata.is_some();
334
335    if let Some(metrics) = metrics {
336        metrics.sector_plotting.inc();
337    }
338    let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Starting {
339        progress,
340        replotting,
341        last_queued,
342    });
343    handlers
344        .sector_update
345        .call_simple(&(sector_index, sector_state));
346
347    let start = Instant::now();
348
349    // This `loop` is a workaround for edge-case in local setup if expiration is configured to 1.
350    // In that scenario we get replotting notification essentially straight from block import
351    // pipeline of the node, before block is imported. This can result in subsequent request for
352    // farmer app info to return old data, meaning we're replotting exactly the same sector that
353    // just expired.
354    let farmer_app_info = loop {
355        let farmer_app_info = match node_client.farmer_app_info().await {
356            Ok(farmer_app_info) => farmer_app_info,
357            Err(error) => {
358                return PlotSingleSectorResult::FatalError(PlottingError::FailedToGetFarmerInfo {
359                    error,
360                });
361            }
362        };
363
364        if let Some(old_sector_metadata) = &maybe_old_sector_metadata {
365            if farmer_app_info.protocol_info.history_size <= old_sector_metadata.history_size {
366                if farmer_app_info.protocol_info.min_sector_lifetime == HistorySize::ONE {
367                    debug!(
368                        current_history_size = %farmer_app_info.protocol_info.history_size,
369                        old_sector_history_size = %old_sector_metadata.history_size,
370                        "Latest protocol history size is not yet newer than old sector history \
371                        size, wait for a bit and try again"
372                    );
373                    tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
374                    continue;
375                } else {
376                    debug!(
377                        current_history_size = %farmer_app_info.protocol_info.history_size,
378                        old_sector_history_size = %old_sector_metadata.history_size,
379                        "Skipped sector plotting, likely redundant due to redundant archived \
380                        segment notification"
381                    );
382                    return PlotSingleSectorResult::Skipped;
383                }
384            }
385        }
386
387        break farmer_app_info;
388    };
389
390    let (progress_sender, mut progress_receiver) = mpsc::channel(10);
391
392    // Initiate plotting
393    plotter
394        .plot_sector(
395            *public_key,
396            sector_index,
397            farmer_app_info.protocol_info,
398            *pieces_in_sector,
399            replotting,
400            progress_sender,
401        )
402        .await;
403
404    if replotting {
405        info!("Replotting sector ({progress:.2}% complete)");
406    } else {
407        info!("Plotting sector ({progress:.2}% complete)");
408    }
409
410    PlotSingleSectorResult::Scheduled(async move {
411        let plotted_sector = loop {
412            match plot_single_sector_internal(
413                sector_index,
414                *sector_size,
415                plot_file,
416                metadata_file,
417                handlers,
418                global_mutex,
419                progress_receiver,
420                metrics,
421            )
422            .await?
423            {
424                Ok(plotted_sector) => {
425                    break plotted_sector;
426                }
427                Err(error) => {
428                    warn!(
429                        %error,
430                        "Failed to plot sector, retrying in {PLOTTING_RETRY_DELAY:?}"
431                    );
432
433                    tokio::time::sleep(PLOTTING_RETRY_DELAY).await;
434                }
435            }
436
437            let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(10);
438            progress_receiver = retry_progress_receiver;
439
440            // Initiate plotting
441            plotter
442                .plot_sector(
443                    *public_key,
444                    sector_index,
445                    farmer_app_info.protocol_info,
446                    *pieces_in_sector,
447                    replotting,
448                    retry_progress_sender,
449                )
450                .await;
451
452            if replotting {
453                info!("Replotting sector retry");
454            } else {
455                info!("Plotting sector retry");
456            }
457        };
458
459        let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| {
460            let old_history_size = old_sector_metadata.history_size;
461
462            PlottedSector {
463                sector_id: plotted_sector.sector_id,
464                sector_index: plotted_sector.sector_index,
465                sector_metadata: old_sector_metadata,
466                piece_indexes: {
467                    let mut piece_indexes = Vec::with_capacity(usize::from(*pieces_in_sector));
468                    (PieceOffset::ZERO..)
469                        .take(usize::from(*pieces_in_sector))
470                        .map(|piece_offset| {
471                            plotted_sector.sector_id.derive_piece_index(
472                                piece_offset,
473                                old_history_size,
474                                farmer_app_info.protocol_info.max_pieces_in_sector,
475                                farmer_app_info.protocol_info.recent_segments,
476                                farmer_app_info.protocol_info.recent_history_fraction,
477                            )
478                        })
479                        .collect_into(&mut piece_indexes);
480                    piece_indexes
481                },
482            }
483        });
484
485        if replotting {
486            debug!("Sector replotted successfully");
487        } else {
488            debug!("Sector plotted successfully");
489        }
490
491        let sector_metadata = plotted_sector.sector_metadata.clone();
492
493        let time = start.elapsed();
494        if let Some(metrics) = metrics {
495            metrics.sector_plotting_time.observe(time.as_secs_f64());
496            metrics.sector_plotted.inc();
497            metrics.update_sector_state(SectorState::Plotted);
498        }
499        let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished {
500            plotted_sector,
501            old_plotted_sector: maybe_old_plotted_sector,
502            time,
503        });
504        handlers
505            .sector_update
506            .call_simple(&(sector_index, sector_state));
507
508        Ok(SectorPlottingResult {
509            sector_metadata,
510            replotting,
511            last_queued,
512            plotting_permit,
513        })
514    })
515}
516
517/// Outer error is used to indicate irrecoverable plotting errors, while inner result is for
518/// recoverable errors
519#[allow(clippy::too_many_arguments)]
520async fn plot_single_sector_internal(
521    sector_index: SectorIndex,
522    sector_size: usize,
523    plot_file: &Arc<DirectIoFile>,
524    metadata_file: &Arc<DirectIoFile>,
525    handlers: &Handlers,
526    global_mutex: &AsyncMutex<()>,
527    mut progress_receiver: mpsc::Receiver<SectorPlottingProgress>,
528    metrics: &Option<Arc<SingleDiskFarmMetrics>>,
529) -> Result<Result<PlottedSector, PlottingError>, PlottingError> {
530    // Process plotting progress notifications
531    let progress_processor_fut = async {
532        while let Some(progress) = progress_receiver.next().await {
533            match progress {
534                SectorPlottingProgress::Downloading => {
535                    if let Some(metrics) = metrics {
536                        metrics.sector_downloading.inc();
537                    }
538                    handlers.sector_update.call_simple(&(
539                        sector_index,
540                        SectorUpdate::Plotting(SectorPlottingDetails::Downloading),
541                    ));
542                }
543                SectorPlottingProgress::Downloaded(time) => {
544                    if let Some(metrics) = metrics {
545                        metrics.sector_downloading_time.observe(time.as_secs_f64());
546                        metrics.sector_downloaded.inc();
547                    }
548                    handlers.sector_update.call_simple(&(
549                        sector_index,
550                        SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)),
551                    ));
552                }
553                SectorPlottingProgress::Encoding => {
554                    if let Some(metrics) = metrics {
555                        metrics.sector_encoding.inc();
556                    }
557                    handlers.sector_update.call_simple(&(
558                        sector_index,
559                        SectorUpdate::Plotting(SectorPlottingDetails::Encoding),
560                    ));
561                }
562                SectorPlottingProgress::Encoded(time) => {
563                    if let Some(metrics) = metrics {
564                        metrics.sector_encoding_time.observe(time.as_secs_f64());
565                        metrics.sector_encoded.inc();
566                    }
567                    handlers.sector_update.call_simple(&(
568                        sector_index,
569                        SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)),
570                    ));
571                }
572                SectorPlottingProgress::Finished {
573                    plotted_sector,
574                    time: _,
575                    sector,
576                } => {
577                    return Ok((plotted_sector, sector));
578                }
579                SectorPlottingProgress::Error { error } => {
580                    if let Some(metrics) = metrics {
581                        metrics.sector_plotting_error.inc();
582                    }
583                    handlers.sector_update.call_simple(&(
584                        sector_index,
585                        SectorUpdate::Plotting(SectorPlottingDetails::Error(error.clone())),
586                    ));
587                    return Err(error);
588                }
589            }
590        }
591
592        Err("Plotting progress stream ended before plotting finished".to_string())
593    };
594
595    let (plotted_sector, mut sector) = match progress_processor_fut.await {
596        Ok(result) => result,
597        Err(error) => {
598            return Ok(Err(PlottingError::LowLevel(error)));
599        }
600    };
601
602    {
603        // Take mutex briefly to make sure writing is allowed right now
604        global_mutex.lock().await;
605
606        if let Some(metrics) = metrics {
607            metrics.sector_writing.inc();
608        }
609        handlers.sector_update.call_simple(&(
610            sector_index,
611            SectorUpdate::Plotting(SectorPlottingDetails::Writing),
612        ));
613
614        let start = Instant::now();
615
616        {
617            let sector_write_base_offset = u64::from(sector_index) * sector_size as u64;
618            let mut total_received = 0;
619            let mut sector_write_offset = sector_write_base_offset;
620            while let Some(maybe_sector_chunk) = sector.next().await {
621                let sector_chunk = match maybe_sector_chunk {
622                    Ok(sector_chunk) => sector_chunk,
623                    Err(error) => {
624                        return Ok(Err(PlottingError::LowLevel(format!(
625                            "Sector chunk receive error: {error}"
626                        ))));
627                    }
628                };
629
630                total_received += sector_chunk.len();
631
632                if total_received > sector_size {
633                    return Ok(Err(PlottingError::LowLevel(format!(
634                        "Received too many bytes {total_received} instead of expected \
635                        {sector_size} bytes"
636                    ))));
637                }
638
639                let sector_chunk_size = sector_chunk.len() as u64;
640
641                trace!(sector_chunk_size, "Writing sector chunk to disk");
642                let write_fut = task::spawn_blocking({
643                    let plot_file = Arc::clone(plot_file);
644
645                    move || plot_file.write_all_at(&sector_chunk, sector_write_offset)
646                });
647                write_fut.await.map_err(|error| {
648                    PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
649                })??;
650
651                sector_write_offset += sector_chunk_size;
652            }
653            drop(sector);
654
655            if total_received != sector_size {
656                return Ok(Err(PlottingError::LowLevel(format!(
657                    "Received only {total_received} sector bytes out of {sector_size} \
658                    expected bytes"
659                ))));
660            }
661        }
662        {
663            let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
664            let write_fut = task::spawn_blocking({
665                let metadata_file = Arc::clone(metadata_file);
666
667                move || {
668                    metadata_file.write_all_at(
669                        &encoded_sector_metadata,
670                        RESERVED_PLOT_METADATA
671                            + (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
672                    )
673                }
674            });
675            write_fut.await.map_err(|error| {
676                PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
677            })??;
678        }
679
680        let time = start.elapsed();
681        if let Some(metrics) = metrics {
682            metrics.sector_writing_time.observe(time.as_secs_f64());
683            metrics.sector_written.inc();
684        }
685        handlers.sector_update.call_simple(&(
686            sector_index,
687            SectorUpdate::Plotting(SectorPlottingDetails::Written(time)),
688        ));
689    }
690
691    Ok(Ok(plotted_sector))
692}
693
694pub(super) struct PlottingSchedulerOptions<NC> {
695    pub(super) public_key_hash: Blake3Hash,
696    pub(super) sectors_indices_left_to_plot: Range<SectorIndex>,
697    pub(super) target_sector_count: SectorIndex,
698    pub(super) last_archived_segment_index: SegmentIndex,
699    pub(super) min_sector_lifetime: HistorySize,
700    pub(super) node_client: NC,
701    pub(super) handlers: Arc<Handlers>,
702    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
703    pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
704    // Max delay between segment header being acknowledged by farmer and potentially
705    // triggering replotting
706    pub(super) new_segment_processing_delay: Duration,
707    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
708}
709
710pub(super) async fn plotting_scheduler<NC>(
711    plotting_scheduler_options: PlottingSchedulerOptions<NC>,
712) -> Result<(), BackgroundTaskError>
713where
714    NC: NodeClient,
715{
716    let PlottingSchedulerOptions {
717        public_key_hash,
718        sectors_indices_left_to_plot,
719        target_sector_count,
720        last_archived_segment_index,
721        min_sector_lifetime,
722        node_client,
723        handlers,
724        sectors_metadata,
725        sectors_to_plot_sender,
726        new_segment_processing_delay,
727        metrics,
728    } = plotting_scheduler_options;
729
730    // Create a proxy channel with atomically updatable last archived segment that
731    // allows to not buffer messages from RPC subscription, but also access the most
732    // recent value at any time
733    let last_archived_segment = node_client
734        .segment_headers(vec![last_archived_segment_index])
735        .await
736        .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
737        .into_iter()
738        .next()
739        .flatten()
740        .ok_or(PlottingError::MissingArchivedSegmentHeader {
741            segment_index: last_archived_segment_index,
742        })?;
743
744    let (archived_segments_sender, archived_segments_receiver) =
745        watch::channel(last_archived_segment);
746
747    let read_archived_segments_notifications_fut = read_archived_segments_notifications(
748        &node_client,
749        archived_segments_sender,
750        new_segment_processing_delay,
751    );
752
753    let send_plotting_notifications_fut = send_plotting_notifications(
754        public_key_hash,
755        sectors_indices_left_to_plot,
756        target_sector_count,
757        min_sector_lifetime,
758        &node_client,
759        &handlers,
760        sectors_metadata,
761        archived_segments_receiver,
762        sectors_to_plot_sender,
763        &metrics,
764    );
765
766    select! {
767        result = read_archived_segments_notifications_fut.fuse() => {
768            result
769        }
770        result = send_plotting_notifications_fut.fuse() => {
771            result
772        }
773    }
774}
775
776async fn read_archived_segments_notifications<NC>(
777    node_client: &NC,
778    archived_segments_sender: watch::Sender<SegmentHeader>,
779    new_segment_processing_delay: Duration,
780) -> Result<(), BackgroundTaskError>
781where
782    NC: NodeClient,
783{
784    info!("Subscribing to archived segments");
785
786    let mut archived_segments_notifications = node_client
787        .subscribe_archived_segment_headers()
788        .await
789        .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;
790
791    while let Some(segment_header) = archived_segments_notifications.next().await {
792        debug!(?segment_header, "New archived segment");
793        if let Err(error) = node_client
794            .acknowledge_archived_segment_header(segment_header.segment_index())
795            .await
796        {
797            debug!(%error, "Failed to acknowledge segment header");
798        }
799
800        // There is no urgent need to rush replotting sectors immediately and this delay allows for
801        // newly archived pieces to be both cached locally and on other farmers on the network
802        let delay = Duration::from_secs(thread_rng().gen_range(
803            new_segment_processing_delay.as_secs() / 10..=new_segment_processing_delay.as_secs(),
804        ));
805        tokio::time::sleep(delay).await;
806
807        if archived_segments_sender.send(segment_header).is_err() {
808            break;
809        }
810    }
811
812    Ok(())
813}
814
815struct SectorToReplot {
816    sector_index: SectorIndex,
817    expires_at: SegmentIndex,
818}
819
820#[allow(clippy::too_many_arguments)]
821async fn send_plotting_notifications<NC>(
822    public_key_hash: Blake3Hash,
823    sectors_indices_left_to_plot: Range<SectorIndex>,
824    target_sector_count: SectorIndex,
825    min_sector_lifetime: HistorySize,
826    node_client: &NC,
827    handlers: &Handlers,
828    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
829    mut archived_segments_receiver: watch::Receiver<SegmentHeader>,
830    mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
831    metrics: &Option<Arc<SingleDiskFarmMetrics>>,
832) -> Result<(), BackgroundTaskError>
833where
834    NC: NodeClient,
835{
836    // Finish initial plotting if some sectors were not plotted fully yet
837    for sector_index in sectors_indices_left_to_plot {
838        let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
839        if let Err(error) = sectors_to_plot_sender
840            .send(SectorToPlot {
841                sector_index,
842                progress: sector_index as f32 / target_sector_count as f32 * 100.0,
843                last_queued: sector_index + 1 == target_sector_count,
844                acknowledgement_sender,
845            })
846            .await
847        {
848            warn!(%error, "Failed to send sector index for initial plotting");
849            return Ok(());
850        }
851
852        // We do not care if message was sent back or sender was just dropped
853        let _ = acknowledgement_receiver.await;
854    }
855
856    let mut sectors_expire_at = vec![None::<SegmentIndex>; usize::from(target_sector_count)];
857    // 10% capacity is generous and should prevent reallocation in most cases
858    let mut sectors_to_replot = Vec::with_capacity(usize::from(target_sector_count) / 10);
859
860    loop {
861        let segment_index = archived_segments_receiver
862            .borrow_and_update()
863            .segment_index();
864        trace!(%segment_index, "New archived segment received");
865
866        let sectors_metadata = sectors_metadata.read().await;
867        let sectors_to_check = sectors_metadata
868            .iter()
869            .map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size));
870        for (sector_index, history_size) in sectors_to_check {
871            if let Some(Some(expires_at)) =
872                sectors_expire_at.get(usize::from(sector_index)).copied()
873            {
874                trace!(
875                    %sector_index,
876                    %history_size,
877                    %expires_at,
878                    "Checking sector for expiration"
879                );
880                // +1 means we will start replotting a bit before it actually expires to avoid
881                // storing expired sectors
882                if expires_at <= (segment_index + SegmentIndex::ONE) {
883                    debug!(
884                        %sector_index,
885                        %history_size,
886                        %expires_at,
887                        "Sector expires soon #1, scheduling replotting"
888                    );
889
890                    let expiration_details = if expires_at <= segment_index {
891                        if let Some(metrics) = metrics {
892                            metrics.update_sector_state(SectorState::Expired);
893                        }
894                        SectorExpirationDetails::Expired
895                    } else {
896                        if let Some(metrics) = metrics {
897                            metrics.update_sector_state(SectorState::AboutToExpire);
898                        }
899                        SectorExpirationDetails::AboutToExpire
900                    };
901                    handlers
902                        .sector_update
903                        .call_simple(&(sector_index, SectorUpdate::Expiration(expiration_details)));
904
905                    // Time to replot
906                    sectors_to_replot.push(SectorToReplot {
907                        sector_index,
908                        expires_at,
909                    });
910                }
911                continue;
912            }
913
914            if let Some(expiration_check_segment_index) = history_size
915                .sector_expiration_check(min_sector_lifetime)
916                .map(|expiration_check_history_size| expiration_check_history_size.segment_index())
917            {
918                trace!(
919                    %sector_index,
920                    %history_size,
921                    %expiration_check_segment_index,
922                    "Determined sector expiration check segment index"
923                );
924                let maybe_sector_expiration_check_segment_commitment = node_client
925                    .segment_headers(vec![expiration_check_segment_index])
926                    .await
927                    .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
928                    .into_iter()
929                    .next()
930                    .flatten()
931                    .map(|segment_header| segment_header.segment_commitment());
932
933                if let Some(sector_expiration_check_segment_commitment) =
934                    maybe_sector_expiration_check_segment_commitment
935                {
936                    let sector_id = SectorId::new(public_key_hash, sector_index, history_size);
937                    let expiration_history_size = sector_id
938                        .derive_expiration_history_size(
939                            history_size,
940                            &sector_expiration_check_segment_commitment,
941                            min_sector_lifetime,
942                        )
943                        .expect(
944                            "Farmers internally stores correct history size in sector \
945                            metadata; qed",
946                        );
947
948                    let expires_at = expiration_history_size.segment_index();
949
950                    trace!(
951                        %sector_index,
952                        %history_size,
953                        sector_expire_at = %expires_at,
954                        "Determined sector expiration segment index"
955                    );
956                    // +1 means we will start replotting a bit before it actually expires to avoid
957                    // storing expired sectors
958                    if expires_at <= (segment_index + SegmentIndex::ONE) {
959                        debug!(
960                            %sector_index,
961                            %history_size,
962                            %expires_at,
963                            "Sector expires soon #2, scheduling replotting"
964                        );
965
966                        let expiration_details = if expires_at <= segment_index {
967                            if let Some(metrics) = metrics {
968                                metrics.update_sector_state(SectorState::Expired);
969                            }
970                            SectorExpirationDetails::Expired
971                        } else {
972                            if let Some(metrics) = metrics {
973                                metrics.update_sector_state(SectorState::AboutToExpire);
974                            }
975                            SectorExpirationDetails::AboutToExpire
976                        };
977                        handlers.sector_update.call_simple(&(
978                            sector_index,
979                            SectorUpdate::Expiration(expiration_details),
980                        ));
981
982                        // Time to replot
983                        sectors_to_replot.push(SectorToReplot {
984                            sector_index,
985                            expires_at,
986                        });
987                    } else {
988                        trace!(
989                            %sector_index,
990                            %history_size,
991                            sector_expire_at = %expires_at,
992                            "Sector expires later, remembering sector expiration"
993                        );
994
995                        handlers.sector_update.call_simple(&(
996                            sector_index,
997                            SectorUpdate::Expiration(SectorExpirationDetails::Determined {
998                                expires_at,
999                            }),
1000                        ));
1001
1002                        // Store expiration so we don't have to recalculate it later
1003                        if let Some(expires_at_entry) =
1004                            sectors_expire_at.get_mut(usize::from(sector_index))
1005                        {
1006                            expires_at_entry.replace(expires_at);
1007                        }
1008                    }
1009                }
1010            }
1011        }
1012        drop(sectors_metadata);
1013
1014        let sectors_queued = sectors_to_replot.len();
1015        sectors_to_replot.sort_by_key(|sector_to_replot| sector_to_replot.expires_at);
1016        for (index, SectorToReplot { sector_index, .. }) in sectors_to_replot.drain(..).enumerate()
1017        {
1018            let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
1019            if let Err(error) = sectors_to_plot_sender
1020                .send(SectorToPlot {
1021                    sector_index,
1022                    progress: index as f32 / sectors_queued as f32 * 100.0,
1023                    last_queued: index + 1 == sectors_queued,
1024                    acknowledgement_sender,
1025                })
1026                .await
1027            {
1028                warn!(%error, "Failed to send sector index for replotting");
1029                return Ok(());
1030            }
1031
1032            // We do not care if message was sent back or sender was just dropped
1033            let _ = acknowledgement_receiver.await;
1034
1035            if let Some(expires_at_entry) = sectors_expire_at.get_mut(usize::from(sector_index)) {
1036                expires_at_entry.take();
1037            }
1038        }
1039
1040        if archived_segments_receiver.changed().await.is_err() {
1041            break;
1042        }
1043    }
1044
1045    Ok(())
1046}