1use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate};
2use crate::node_client::NodeClient;
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use crate::single_disk_farm::direct_io_file::DirectIoFile;
5use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
6use crate::single_disk_farm::{
7 BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
8};
9use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard};
10use futures::channel::{mpsc, oneshot};
11use futures::stream::FuturesOrdered;
12use futures::{select, FutureExt, SinkExt, StreamExt};
13use parity_scale_codec::Encode;
14use rand::prelude::*;
15use std::collections::HashSet;
16use std::future::Future;
17use std::io;
18use std::num::NonZeroUsize;
19use std::ops::Range;
20use std::pin::pin;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use subspace_core_primitives::hashes::Blake3Hash;
24use subspace_core_primitives::pieces::PieceOffset;
25use subspace_core_primitives::sectors::{SectorId, SectorIndex};
26use subspace_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
27use subspace_core_primitives::PublicKey;
28use subspace_farmer_components::file_ext::FileExt;
29use subspace_farmer_components::plotting::PlottedSector;
30use subspace_farmer_components::sector::SectorMetadataChecksummed;
31use thiserror::Error;
32use tokio::sync::watch;
33use tokio::task;
34use tracing::{debug, info, info_span, trace, warn, Instrument};
35
36const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
37const PLOTTING_RETRY_DELAY: Duration = Duration::from_secs(1);
38
39pub(super) struct SectorToPlot {
40 sector_index: SectorIndex,
41 progress: f32,
43 last_queued: bool,
45 acknowledgement_sender: oneshot::Sender<()>,
46}
47
48#[derive(Debug, Error)]
50pub enum PlottingError {
51 #[error("Failed to retrieve farmer info: {error}")]
53 FailedToGetFarmerInfo {
54 error: anyhow::Error,
56 },
57 #[error("Failed to get segment header: {error}")]
59 FailedToGetSegmentHeader {
60 error: anyhow::Error,
62 },
63 #[error("Missing archived segment header: {segment_index}")]
65 MissingArchivedSegmentHeader {
66 segment_index: SegmentIndex,
68 },
69 #[error("Failed to subscribe to archived segments: {error}")]
71 FailedToSubscribeArchivedSegments {
72 error: anyhow::Error,
74 },
75 #[error("Low-level plotting error: {0}")]
77 LowLevel(String),
78 #[error("Plotting I/O error: {0}")]
80 Io(#[from] io::Error),
81 #[error("Background downloading panicked")]
83 BackgroundDownloadingPanicked,
84}
85
86pub(super) struct SectorPlottingOptions<'a, NC> {
87 pub(super) public_key: PublicKey,
88 pub(super) node_client: &'a NC,
89 pub(super) pieces_in_sector: u16,
90 pub(super) sector_size: usize,
91 pub(super) plot_file: Arc<DirectIoFile>,
92 pub(super) metadata_file: Arc<DirectIoFile>,
93 pub(super) handlers: &'a Handlers,
94 pub(super) global_mutex: &'a AsyncMutex<()>,
95 pub(super) plotter: Arc<dyn Plotter>,
96 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
97}
98
99pub(super) struct PlottingOptions<'a, NC> {
100 pub(super) metadata_header: PlotMetadataHeader,
101 pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
102 pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
103 pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
104 pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC>,
105 pub(super) max_plotting_sectors_per_farm: NonZeroUsize,
106}
107
108pub(super) async fn plotting<NC>(
113 plotting_options: PlottingOptions<'_, NC>,
114) -> Result<(), PlottingError>
115where
116 NC: NodeClient,
117{
118 let PlottingOptions {
119 mut metadata_header,
120 sectors_metadata,
121 sectors_being_modified,
122 mut sectors_to_plot_receiver,
123 sector_plotting_options,
124 max_plotting_sectors_per_farm,
125 } = plotting_options;
126
127 let sector_plotting_options = §or_plotting_options;
128 let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get());
129 let mut sectors_being_plotted = FuturesOrdered::new();
130 let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded();
133 let process_plotting_result_fut = async move {
134 while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await {
135 process_plotting_result(
136 sector_plotting_result,
137 sectors_metadata,
138 sectors_being_modified,
139 &mut metadata_header,
140 Arc::clone(§or_plotting_options.metadata_file),
141 )
142 .await?;
143 }
144
145 unreachable!(
146 "Stream will not end before the rest of the plotting process is shutting down"
147 );
148 };
149 let process_plotting_result_fut = process_plotting_result_fut.fuse();
150 let mut process_plotting_result_fut = pin!(process_plotting_result_fut);
151
152 loop {
155 select! {
156 maybe_sector_to_plot = sectors_to_plot_receiver.next() => {
157 let Some(sector_to_plot) = maybe_sector_to_plot else {
158 break;
159 };
160
161 let sector_index = sector_to_plot.sector_index;
162 let sector_plotting_init_fut = plot_single_sector(
163 sector_to_plot,
164 sector_plotting_options,
165 sectors_metadata,
166 sectors_being_modified,
167 &plotting_semaphore,
168 )
169 .instrument(info_span!("", %sector_index))
170 .fuse();
171 let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut);
172
173 loop {
177 select! {
178 sector_plotting_init_result = sector_plotting_init_fut => {
179 let sector_plotting_fut = match sector_plotting_init_result {
180 PlotSingleSectorResult::Scheduled(future) => future,
181 PlotSingleSectorResult::Skipped => {
182 break;
183 }
184 PlotSingleSectorResult::FatalError(error) => {
185 return Err(error);
186 }
187 };
188 sectors_being_plotted.push_back(
189 sector_plotting_fut.instrument(info_span!("", %sector_index))
190 );
191 break;
192 }
193 maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
194 sector_plotting_result_sender
195 .unbounded_send(maybe_sector_plotting_result?)
196 .expect("Sending means receiver is not dropped yet; qed");
197 }
198 result = process_plotting_result_fut => {
199 return result;
200 }
201 }
202 }
203 }
204 maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
205 sector_plotting_result_sender
206 .unbounded_send(maybe_sector_plotting_result?)
207 .expect("Sending means receiver is not dropped yet; qed");
208 }
209 result = process_plotting_result_fut => {
210 return result;
211 }
212 }
213 }
214
215 Ok(())
216}
217
218async fn process_plotting_result(
219 sector_plotting_result: SectorPlottingResult<'_>,
220 sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
221 sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
222 metadata_header: &mut PlotMetadataHeader,
223 metadata_file: Arc<DirectIoFile>,
224) -> Result<(), PlottingError> {
225 let SectorPlottingResult {
226 sector_metadata,
227 replotting,
228 last_queued,
229 plotting_permit,
230 } = sector_plotting_result;
231
232 let sector_index = sector_metadata.sector_index;
233
234 {
235 let mut sectors_metadata = sectors_metadata.write().await;
236 if let Some(existing_sector_metadata) = sectors_metadata.get_mut(sector_index as usize) {
238 *existing_sector_metadata = sector_metadata;
239 } else {
240 sectors_metadata.push(sector_metadata);
241 }
242 }
243
244 sectors_being_modified.write().await.remove(§or_index);
246
247 if sector_index + 1 > metadata_header.plotted_sector_count {
248 metadata_header.plotted_sector_count = sector_index + 1;
249
250 let encoded_metadata_header = metadata_header.encode();
251 let write_fut =
252 task::spawn_blocking(move || metadata_file.write_all_at(&encoded_metadata_header, 0));
253 write_fut.await.map_err(|error| {
254 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
255 })??;
256 }
257
258 if last_queued {
259 if replotting {
260 info!("Replotting complete");
261 } else {
262 info!("Initial plotting complete");
263 }
264 }
265
266 drop(plotting_permit);
267
268 Ok(())
269}
270
271enum PlotSingleSectorResult<F> {
272 Scheduled(F),
273 Skipped,
274 FatalError(PlottingError),
275}
276
277struct SectorPlottingResult<'a> {
278 sector_metadata: SectorMetadataChecksummed,
279 replotting: bool,
280 last_queued: bool,
281 plotting_permit: SemaphoreGuard<'a>,
282}
283
284async fn plot_single_sector<'a, NC>(
285 sector_to_plot: SectorToPlot,
286 sector_plotting_options: &'a SectorPlottingOptions<'a, NC>,
287 sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
288 sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
289 plotting_semaphore: &'a Semaphore,
290) -> PlotSingleSectorResult<
291 impl Future<Output = Result<SectorPlottingResult<'a>, PlottingError>> + 'a,
292>
293where
294 NC: NodeClient,
295{
296 let SectorPlottingOptions {
297 public_key,
298 node_client,
299 pieces_in_sector,
300 sector_size,
301 plot_file,
302 metadata_file,
303 handlers,
304 global_mutex,
305 plotter,
306 metrics,
307 } = sector_plotting_options;
308
309 let SectorToPlot {
310 sector_index,
311 progress,
312 last_queued,
313 acknowledgement_sender: _acknowledgement_sender,
314 } = sector_to_plot;
315 trace!("Preparing to plot sector");
316
317 {
319 let mut sectors_being_modified = sectors_being_modified.write().await;
320 if !sectors_being_modified.insert(sector_index) {
321 debug!("Skipped sector plotting, it is already in progress");
322 return PlotSingleSectorResult::Skipped;
323 }
324 }
325
326 let plotting_permit = plotting_semaphore.acquire().await;
327
328 let maybe_old_sector_metadata = sectors_metadata
329 .read()
330 .await
331 .get(sector_index as usize)
332 .cloned();
333 let replotting = maybe_old_sector_metadata.is_some();
334
335 if let Some(metrics) = metrics {
336 metrics.sector_plotting.inc();
337 }
338 let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Starting {
339 progress,
340 replotting,
341 last_queued,
342 });
343 handlers
344 .sector_update
345 .call_simple(&(sector_index, sector_state));
346
347 let start = Instant::now();
348
349 let farmer_app_info = loop {
355 let farmer_app_info = match node_client.farmer_app_info().await {
356 Ok(farmer_app_info) => farmer_app_info,
357 Err(error) => {
358 return PlotSingleSectorResult::FatalError(PlottingError::FailedToGetFarmerInfo {
359 error,
360 });
361 }
362 };
363
364 if let Some(old_sector_metadata) = &maybe_old_sector_metadata {
365 if farmer_app_info.protocol_info.history_size <= old_sector_metadata.history_size {
366 if farmer_app_info.protocol_info.min_sector_lifetime == HistorySize::ONE {
367 debug!(
368 current_history_size = %farmer_app_info.protocol_info.history_size,
369 old_sector_history_size = %old_sector_metadata.history_size,
370 "Latest protocol history size is not yet newer than old sector history \
371 size, wait for a bit and try again"
372 );
373 tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
374 continue;
375 } else {
376 debug!(
377 current_history_size = %farmer_app_info.protocol_info.history_size,
378 old_sector_history_size = %old_sector_metadata.history_size,
379 "Skipped sector plotting, likely redundant due to redundant archived \
380 segment notification"
381 );
382 return PlotSingleSectorResult::Skipped;
383 }
384 }
385 }
386
387 break farmer_app_info;
388 };
389
390 let (progress_sender, mut progress_receiver) = mpsc::channel(10);
391
392 plotter
394 .plot_sector(
395 *public_key,
396 sector_index,
397 farmer_app_info.protocol_info,
398 *pieces_in_sector,
399 replotting,
400 progress_sender,
401 )
402 .await;
403
404 if replotting {
405 info!("Replotting sector ({progress:.2}% complete)");
406 } else {
407 info!("Plotting sector ({progress:.2}% complete)");
408 }
409
410 PlotSingleSectorResult::Scheduled(async move {
411 let plotted_sector = loop {
412 match plot_single_sector_internal(
413 sector_index,
414 *sector_size,
415 plot_file,
416 metadata_file,
417 handlers,
418 global_mutex,
419 progress_receiver,
420 metrics,
421 )
422 .await?
423 {
424 Ok(plotted_sector) => {
425 break plotted_sector;
426 }
427 Err(error) => {
428 warn!(
429 %error,
430 "Failed to plot sector, retrying in {PLOTTING_RETRY_DELAY:?}"
431 );
432
433 tokio::time::sleep(PLOTTING_RETRY_DELAY).await;
434 }
435 }
436
437 let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(10);
438 progress_receiver = retry_progress_receiver;
439
440 plotter
442 .plot_sector(
443 *public_key,
444 sector_index,
445 farmer_app_info.protocol_info,
446 *pieces_in_sector,
447 replotting,
448 retry_progress_sender,
449 )
450 .await;
451
452 if replotting {
453 info!("Replotting sector retry");
454 } else {
455 info!("Plotting sector retry");
456 }
457 };
458
459 let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| {
460 let old_history_size = old_sector_metadata.history_size;
461
462 PlottedSector {
463 sector_id: plotted_sector.sector_id,
464 sector_index: plotted_sector.sector_index,
465 sector_metadata: old_sector_metadata,
466 piece_indexes: {
467 let mut piece_indexes = Vec::with_capacity(usize::from(*pieces_in_sector));
468 (PieceOffset::ZERO..)
469 .take(usize::from(*pieces_in_sector))
470 .map(|piece_offset| {
471 plotted_sector.sector_id.derive_piece_index(
472 piece_offset,
473 old_history_size,
474 farmer_app_info.protocol_info.max_pieces_in_sector,
475 farmer_app_info.protocol_info.recent_segments,
476 farmer_app_info.protocol_info.recent_history_fraction,
477 )
478 })
479 .collect_into(&mut piece_indexes);
480 piece_indexes
481 },
482 }
483 });
484
485 if replotting {
486 debug!("Sector replotted successfully");
487 } else {
488 debug!("Sector plotted successfully");
489 }
490
491 let sector_metadata = plotted_sector.sector_metadata.clone();
492
493 let time = start.elapsed();
494 if let Some(metrics) = metrics {
495 metrics.sector_plotting_time.observe(time.as_secs_f64());
496 metrics.sector_plotted.inc();
497 metrics.update_sector_state(SectorState::Plotted);
498 }
499 let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished {
500 plotted_sector,
501 old_plotted_sector: maybe_old_plotted_sector,
502 time,
503 });
504 handlers
505 .sector_update
506 .call_simple(&(sector_index, sector_state));
507
508 Ok(SectorPlottingResult {
509 sector_metadata,
510 replotting,
511 last_queued,
512 plotting_permit,
513 })
514 })
515}
516
517#[allow(clippy::too_many_arguments)]
520async fn plot_single_sector_internal(
521 sector_index: SectorIndex,
522 sector_size: usize,
523 plot_file: &Arc<DirectIoFile>,
524 metadata_file: &Arc<DirectIoFile>,
525 handlers: &Handlers,
526 global_mutex: &AsyncMutex<()>,
527 mut progress_receiver: mpsc::Receiver<SectorPlottingProgress>,
528 metrics: &Option<Arc<SingleDiskFarmMetrics>>,
529) -> Result<Result<PlottedSector, PlottingError>, PlottingError> {
530 let progress_processor_fut = async {
532 while let Some(progress) = progress_receiver.next().await {
533 match progress {
534 SectorPlottingProgress::Downloading => {
535 if let Some(metrics) = metrics {
536 metrics.sector_downloading.inc();
537 }
538 handlers.sector_update.call_simple(&(
539 sector_index,
540 SectorUpdate::Plotting(SectorPlottingDetails::Downloading),
541 ));
542 }
543 SectorPlottingProgress::Downloaded(time) => {
544 if let Some(metrics) = metrics {
545 metrics.sector_downloading_time.observe(time.as_secs_f64());
546 metrics.sector_downloaded.inc();
547 }
548 handlers.sector_update.call_simple(&(
549 sector_index,
550 SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)),
551 ));
552 }
553 SectorPlottingProgress::Encoding => {
554 if let Some(metrics) = metrics {
555 metrics.sector_encoding.inc();
556 }
557 handlers.sector_update.call_simple(&(
558 sector_index,
559 SectorUpdate::Plotting(SectorPlottingDetails::Encoding),
560 ));
561 }
562 SectorPlottingProgress::Encoded(time) => {
563 if let Some(metrics) = metrics {
564 metrics.sector_encoding_time.observe(time.as_secs_f64());
565 metrics.sector_encoded.inc();
566 }
567 handlers.sector_update.call_simple(&(
568 sector_index,
569 SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)),
570 ));
571 }
572 SectorPlottingProgress::Finished {
573 plotted_sector,
574 time: _,
575 sector,
576 } => {
577 return Ok((plotted_sector, sector));
578 }
579 SectorPlottingProgress::Error { error } => {
580 if let Some(metrics) = metrics {
581 metrics.sector_plotting_error.inc();
582 }
583 handlers.sector_update.call_simple(&(
584 sector_index,
585 SectorUpdate::Plotting(SectorPlottingDetails::Error(error.clone())),
586 ));
587 return Err(error);
588 }
589 }
590 }
591
592 Err("Plotting progress stream ended before plotting finished".to_string())
593 };
594
595 let (plotted_sector, mut sector) = match progress_processor_fut.await {
596 Ok(result) => result,
597 Err(error) => {
598 return Ok(Err(PlottingError::LowLevel(error)));
599 }
600 };
601
602 {
603 global_mutex.lock().await;
605
606 if let Some(metrics) = metrics {
607 metrics.sector_writing.inc();
608 }
609 handlers.sector_update.call_simple(&(
610 sector_index,
611 SectorUpdate::Plotting(SectorPlottingDetails::Writing),
612 ));
613
614 let start = Instant::now();
615
616 {
617 let sector_write_base_offset = u64::from(sector_index) * sector_size as u64;
618 let mut total_received = 0;
619 let mut sector_write_offset = sector_write_base_offset;
620 while let Some(maybe_sector_chunk) = sector.next().await {
621 let sector_chunk = match maybe_sector_chunk {
622 Ok(sector_chunk) => sector_chunk,
623 Err(error) => {
624 return Ok(Err(PlottingError::LowLevel(format!(
625 "Sector chunk receive error: {error}"
626 ))));
627 }
628 };
629
630 total_received += sector_chunk.len();
631
632 if total_received > sector_size {
633 return Ok(Err(PlottingError::LowLevel(format!(
634 "Received too many bytes {total_received} instead of expected \
635 {sector_size} bytes"
636 ))));
637 }
638
639 let sector_chunk_size = sector_chunk.len() as u64;
640
641 trace!(sector_chunk_size, "Writing sector chunk to disk");
642 let write_fut = task::spawn_blocking({
643 let plot_file = Arc::clone(plot_file);
644
645 move || plot_file.write_all_at(§or_chunk, sector_write_offset)
646 });
647 write_fut.await.map_err(|error| {
648 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
649 })??;
650
651 sector_write_offset += sector_chunk_size;
652 }
653 drop(sector);
654
655 if total_received != sector_size {
656 return Ok(Err(PlottingError::LowLevel(format!(
657 "Received only {total_received} sector bytes out of {sector_size} \
658 expected bytes"
659 ))));
660 }
661 }
662 {
663 let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
664 let write_fut = task::spawn_blocking({
665 let metadata_file = Arc::clone(metadata_file);
666
667 move || {
668 metadata_file.write_all_at(
669 &encoded_sector_metadata,
670 RESERVED_PLOT_METADATA
671 + (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
672 )
673 }
674 });
675 write_fut.await.map_err(|error| {
676 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
677 })??;
678 }
679
680 let time = start.elapsed();
681 if let Some(metrics) = metrics {
682 metrics.sector_writing_time.observe(time.as_secs_f64());
683 metrics.sector_written.inc();
684 }
685 handlers.sector_update.call_simple(&(
686 sector_index,
687 SectorUpdate::Plotting(SectorPlottingDetails::Written(time)),
688 ));
689 }
690
691 Ok(Ok(plotted_sector))
692}
693
694pub(super) struct PlottingSchedulerOptions<NC> {
695 pub(super) public_key_hash: Blake3Hash,
696 pub(super) sectors_indices_left_to_plot: Range<SectorIndex>,
697 pub(super) target_sector_count: SectorIndex,
698 pub(super) last_archived_segment_index: SegmentIndex,
699 pub(super) min_sector_lifetime: HistorySize,
700 pub(super) node_client: NC,
701 pub(super) handlers: Arc<Handlers>,
702 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
703 pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
704 pub(super) new_segment_processing_delay: Duration,
707 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
708}
709
710pub(super) async fn plotting_scheduler<NC>(
711 plotting_scheduler_options: PlottingSchedulerOptions<NC>,
712) -> Result<(), BackgroundTaskError>
713where
714 NC: NodeClient,
715{
716 let PlottingSchedulerOptions {
717 public_key_hash,
718 sectors_indices_left_to_plot,
719 target_sector_count,
720 last_archived_segment_index,
721 min_sector_lifetime,
722 node_client,
723 handlers,
724 sectors_metadata,
725 sectors_to_plot_sender,
726 new_segment_processing_delay,
727 metrics,
728 } = plotting_scheduler_options;
729
730 let last_archived_segment = node_client
734 .segment_headers(vec![last_archived_segment_index])
735 .await
736 .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
737 .into_iter()
738 .next()
739 .flatten()
740 .ok_or(PlottingError::MissingArchivedSegmentHeader {
741 segment_index: last_archived_segment_index,
742 })?;
743
744 let (archived_segments_sender, archived_segments_receiver) =
745 watch::channel(last_archived_segment);
746
747 let read_archived_segments_notifications_fut = read_archived_segments_notifications(
748 &node_client,
749 archived_segments_sender,
750 new_segment_processing_delay,
751 );
752
753 let send_plotting_notifications_fut = send_plotting_notifications(
754 public_key_hash,
755 sectors_indices_left_to_plot,
756 target_sector_count,
757 min_sector_lifetime,
758 &node_client,
759 &handlers,
760 sectors_metadata,
761 archived_segments_receiver,
762 sectors_to_plot_sender,
763 &metrics,
764 );
765
766 select! {
767 result = read_archived_segments_notifications_fut.fuse() => {
768 result
769 }
770 result = send_plotting_notifications_fut.fuse() => {
771 result
772 }
773 }
774}
775
776async fn read_archived_segments_notifications<NC>(
777 node_client: &NC,
778 archived_segments_sender: watch::Sender<SegmentHeader>,
779 new_segment_processing_delay: Duration,
780) -> Result<(), BackgroundTaskError>
781where
782 NC: NodeClient,
783{
784 info!("Subscribing to archived segments");
785
786 let mut archived_segments_notifications = node_client
787 .subscribe_archived_segment_headers()
788 .await
789 .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;
790
791 while let Some(segment_header) = archived_segments_notifications.next().await {
792 debug!(?segment_header, "New archived segment");
793 if let Err(error) = node_client
794 .acknowledge_archived_segment_header(segment_header.segment_index())
795 .await
796 {
797 debug!(%error, "Failed to acknowledge segment header");
798 }
799
800 let delay = Duration::from_secs(thread_rng().gen_range(
803 new_segment_processing_delay.as_secs() / 10..=new_segment_processing_delay.as_secs(),
804 ));
805 tokio::time::sleep(delay).await;
806
807 if archived_segments_sender.send(segment_header).is_err() {
808 break;
809 }
810 }
811
812 Ok(())
813}
814
815struct SectorToReplot {
816 sector_index: SectorIndex,
817 expires_at: SegmentIndex,
818}
819
820#[allow(clippy::too_many_arguments)]
821async fn send_plotting_notifications<NC>(
822 public_key_hash: Blake3Hash,
823 sectors_indices_left_to_plot: Range<SectorIndex>,
824 target_sector_count: SectorIndex,
825 min_sector_lifetime: HistorySize,
826 node_client: &NC,
827 handlers: &Handlers,
828 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
829 mut archived_segments_receiver: watch::Receiver<SegmentHeader>,
830 mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
831 metrics: &Option<Arc<SingleDiskFarmMetrics>>,
832) -> Result<(), BackgroundTaskError>
833where
834 NC: NodeClient,
835{
836 for sector_index in sectors_indices_left_to_plot {
838 let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
839 if let Err(error) = sectors_to_plot_sender
840 .send(SectorToPlot {
841 sector_index,
842 progress: sector_index as f32 / target_sector_count as f32 * 100.0,
843 last_queued: sector_index + 1 == target_sector_count,
844 acknowledgement_sender,
845 })
846 .await
847 {
848 warn!(%error, "Failed to send sector index for initial plotting");
849 return Ok(());
850 }
851
852 let _ = acknowledgement_receiver.await;
854 }
855
856 let mut sectors_expire_at = vec![None::<SegmentIndex>; usize::from(target_sector_count)];
857 let mut sectors_to_replot = Vec::with_capacity(usize::from(target_sector_count) / 10);
859
860 loop {
861 let segment_index = archived_segments_receiver
862 .borrow_and_update()
863 .segment_index();
864 trace!(%segment_index, "New archived segment received");
865
866 let sectors_metadata = sectors_metadata.read().await;
867 let sectors_to_check = sectors_metadata
868 .iter()
869 .map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size));
870 for (sector_index, history_size) in sectors_to_check {
871 if let Some(Some(expires_at)) =
872 sectors_expire_at.get(usize::from(sector_index)).copied()
873 {
874 trace!(
875 %sector_index,
876 %history_size,
877 %expires_at,
878 "Checking sector for expiration"
879 );
880 if expires_at <= (segment_index + SegmentIndex::ONE) {
883 debug!(
884 %sector_index,
885 %history_size,
886 %expires_at,
887 "Sector expires soon #1, scheduling replotting"
888 );
889
890 let expiration_details = if expires_at <= segment_index {
891 if let Some(metrics) = metrics {
892 metrics.update_sector_state(SectorState::Expired);
893 }
894 SectorExpirationDetails::Expired
895 } else {
896 if let Some(metrics) = metrics {
897 metrics.update_sector_state(SectorState::AboutToExpire);
898 }
899 SectorExpirationDetails::AboutToExpire
900 };
901 handlers
902 .sector_update
903 .call_simple(&(sector_index, SectorUpdate::Expiration(expiration_details)));
904
905 sectors_to_replot.push(SectorToReplot {
907 sector_index,
908 expires_at,
909 });
910 }
911 continue;
912 }
913
914 if let Some(expiration_check_segment_index) = history_size
915 .sector_expiration_check(min_sector_lifetime)
916 .map(|expiration_check_history_size| expiration_check_history_size.segment_index())
917 {
918 trace!(
919 %sector_index,
920 %history_size,
921 %expiration_check_segment_index,
922 "Determined sector expiration check segment index"
923 );
924 let maybe_sector_expiration_check_segment_commitment = node_client
925 .segment_headers(vec![expiration_check_segment_index])
926 .await
927 .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
928 .into_iter()
929 .next()
930 .flatten()
931 .map(|segment_header| segment_header.segment_commitment());
932
933 if let Some(sector_expiration_check_segment_commitment) =
934 maybe_sector_expiration_check_segment_commitment
935 {
936 let sector_id = SectorId::new(public_key_hash, sector_index, history_size);
937 let expiration_history_size = sector_id
938 .derive_expiration_history_size(
939 history_size,
940 §or_expiration_check_segment_commitment,
941 min_sector_lifetime,
942 )
943 .expect(
944 "Farmers internally stores correct history size in sector \
945 metadata; qed",
946 );
947
948 let expires_at = expiration_history_size.segment_index();
949
950 trace!(
951 %sector_index,
952 %history_size,
953 sector_expire_at = %expires_at,
954 "Determined sector expiration segment index"
955 );
956 if expires_at <= (segment_index + SegmentIndex::ONE) {
959 debug!(
960 %sector_index,
961 %history_size,
962 %expires_at,
963 "Sector expires soon #2, scheduling replotting"
964 );
965
966 let expiration_details = if expires_at <= segment_index {
967 if let Some(metrics) = metrics {
968 metrics.update_sector_state(SectorState::Expired);
969 }
970 SectorExpirationDetails::Expired
971 } else {
972 if let Some(metrics) = metrics {
973 metrics.update_sector_state(SectorState::AboutToExpire);
974 }
975 SectorExpirationDetails::AboutToExpire
976 };
977 handlers.sector_update.call_simple(&(
978 sector_index,
979 SectorUpdate::Expiration(expiration_details),
980 ));
981
982 sectors_to_replot.push(SectorToReplot {
984 sector_index,
985 expires_at,
986 });
987 } else {
988 trace!(
989 %sector_index,
990 %history_size,
991 sector_expire_at = %expires_at,
992 "Sector expires later, remembering sector expiration"
993 );
994
995 handlers.sector_update.call_simple(&(
996 sector_index,
997 SectorUpdate::Expiration(SectorExpirationDetails::Determined {
998 expires_at,
999 }),
1000 ));
1001
1002 if let Some(expires_at_entry) =
1004 sectors_expire_at.get_mut(usize::from(sector_index))
1005 {
1006 expires_at_entry.replace(expires_at);
1007 }
1008 }
1009 }
1010 }
1011 }
1012 drop(sectors_metadata);
1013
1014 let sectors_queued = sectors_to_replot.len();
1015 sectors_to_replot.sort_by_key(|sector_to_replot| sector_to_replot.expires_at);
1016 for (index, SectorToReplot { sector_index, .. }) in sectors_to_replot.drain(..).enumerate()
1017 {
1018 let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
1019 if let Err(error) = sectors_to_plot_sender
1020 .send(SectorToPlot {
1021 sector_index,
1022 progress: index as f32 / sectors_queued as f32 * 100.0,
1023 last_queued: index + 1 == sectors_queued,
1024 acknowledgement_sender,
1025 })
1026 .await
1027 {
1028 warn!(%error, "Failed to send sector index for replotting");
1029 return Ok(());
1030 }
1031
1032 let _ = acknowledgement_receiver.await;
1034
1035 if let Some(expires_at_entry) = sectors_expire_at.get_mut(usize::from(sector_index)) {
1036 expires_at_entry.take();
1037 }
1038 }
1039
1040 if archived_segments_receiver.changed().await.is_err() {
1041 break;
1042 }
1043 }
1044
1045 Ok(())
1046}