1use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use anyhow::anyhow;
13use async_nats::RequestErrorKind;
14use async_trait::async_trait;
15use backoff::backoff::Backoff;
16use backoff::ExponentialBackoff;
17use bytes::Bytes;
18use derive_more::Display;
19use event_listener_primitives::{Bag, HandlerId};
20use futures::channel::mpsc;
21use futures::future::FusedFuture;
22use futures::stream::FuturesUnordered;
23use futures::{select, stream, FutureExt, Sink, SinkExt, StreamExt};
24use parity_scale_codec::{Decode, Encode};
25use std::error::Error;
26use std::future::pending;
27use std::num::NonZeroUsize;
28use std::pin::pin;
29use std::sync::Arc;
30use std::task::Poll;
31use std::time::{Duration, Instant};
32use subspace_core_primitives::sectors::SectorIndex;
33use subspace_core_primitives::PublicKey;
34use subspace_farmer_components::plotting::PlottedSector;
35use subspace_farmer_components::sector::sector_size;
36use subspace_farmer_components::FarmerProtocolInfo;
37use tokio::sync::{OwnedSemaphorePermit, Semaphore};
38use tokio::time::MissedTickBehavior;
39use tracing::{debug, info, info_span, trace, warn, Instrument};
40use ulid::Ulid;
41
42const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
43const PING_INTERVAL: Duration = Duration::from_secs(10);
45const PING_TIMEOUT: Duration = Duration::from_mins(1);
47
48pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
50type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
51
52#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
54pub enum ClusterPlotterId {
55 Ulid(Ulid),
57}
58
59#[allow(clippy::new_without_default)]
60impl ClusterPlotterId {
61 pub fn new() -> Self {
63 Self::Ulid(Ulid::new())
64 }
65}
66
67#[derive(Debug, Clone, Encode, Decode)]
69struct ClusterPlotterFreeInstanceRequest;
70
71impl GenericRequest for ClusterPlotterFreeInstanceRequest {
72 const SUBJECT: &'static str = "subspace.plotter.free-instance";
73 type Response = Option<String>;
75}
76
77#[derive(Debug, Encode, Decode)]
78enum ClusterSectorPlottingProgress {
79 Occupied,
81 Ping,
83 Downloading,
85 Downloaded(Duration),
87 Encoding,
89 Encoded(Duration),
91 Finished {
93 plotted_sector: PlottedSector,
95 time: Duration,
97 },
98 SectorChunk(Result<Bytes, String>),
100 Error {
102 error: String,
104 },
105}
106
107#[derive(Debug, Clone, Encode, Decode)]
109struct ClusterPlotterPlotSectorRequest {
110 public_key: PublicKey,
111 sector_index: SectorIndex,
112 farmer_protocol_info: FarmerProtocolInfo,
113 pieces_in_sector: u16,
114}
115
116impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
117 const SUBJECT: &'static str = "subspace.plotter.*.plot-sector";
118 type Response = ClusterSectorPlottingProgress;
119}
120
121#[derive(Default, Debug)]
122struct Handlers {
123 plotting_progress: Handler3<PublicKey, SectorIndex, SectorPlottingProgress>,
124}
125
126#[derive(Debug)]
128pub struct ClusterPlotter {
129 sector_encoding_semaphore: Arc<Semaphore>,
130 retry_backoff_policy: ExponentialBackoff,
131 nats_client: NatsClient,
132 handlers: Arc<Handlers>,
133 tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
134 _background_tasks: AsyncJoinOnDrop<()>,
135}
136
137impl Drop for ClusterPlotter {
138 #[inline]
139 fn drop(&mut self) {
140 self.tasks_sender.close_channel();
141 }
142}
143
144#[async_trait]
145impl Plotter for ClusterPlotter {
146 async fn has_free_capacity(&self) -> Result<bool, String> {
147 Ok(self.sector_encoding_semaphore.available_permits() > 0
148 && self
149 .nats_client
150 .request(&ClusterPlotterFreeInstanceRequest, None)
151 .await
152 .map_err(|error| error.to_string())?
153 .is_some())
154 }
155
156 async fn plot_sector(
157 &self,
158 public_key: PublicKey,
159 sector_index: SectorIndex,
160 farmer_protocol_info: FarmerProtocolInfo,
161 pieces_in_sector: u16,
162 _replotting: bool,
163 mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
164 ) {
165 let start = Instant::now();
166
167 let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
170 .acquire_owned()
171 .await
172 {
173 Ok(sector_encoding_permit) => sector_encoding_permit,
174 Err(error) => {
175 warn!(%error, "Failed to acquire sector encoding permit");
176
177 let progress_updater = ProgressUpdater {
178 public_key,
179 sector_index,
180 handlers: Arc::clone(&self.handlers),
181 };
182
183 progress_updater
184 .update_progress_and_events(
185 &mut progress_sender,
186 SectorPlottingProgress::Error {
187 error: format!("Failed to acquire sector encoding permit: {error}"),
188 },
189 )
190 .await;
191
192 return;
193 }
194 };
195
196 self.plot_sector_internal(
197 start,
198 sector_encoding_permit,
199 public_key,
200 sector_index,
201 farmer_protocol_info,
202 pieces_in_sector,
203 progress_sender,
204 )
205 .await
206 }
207
208 async fn try_plot_sector(
209 &self,
210 public_key: PublicKey,
211 sector_index: SectorIndex,
212 farmer_protocol_info: FarmerProtocolInfo,
213 pieces_in_sector: u16,
214 _replotting: bool,
215 progress_sender: mpsc::Sender<SectorPlottingProgress>,
216 ) -> bool {
217 let start = Instant::now();
218
219 let Ok(sector_encoding_permit) =
220 Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
221 else {
222 return false;
223 };
224
225 self.plot_sector_internal(
226 start,
227 sector_encoding_permit,
228 public_key,
229 sector_index,
230 farmer_protocol_info,
231 pieces_in_sector,
232 progress_sender,
233 )
234 .await;
235
236 true
237 }
238}
239
240impl ClusterPlotter {
241 pub fn new(
243 nats_client: NatsClient,
244 sector_encoding_concurrency: NonZeroUsize,
245 retry_backoff_policy: ExponentialBackoff,
246 ) -> Self {
247 let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
248
249 let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
250
251 let background_tasks = AsyncJoinOnDrop::new(
253 tokio::spawn(async move {
254 let background_tasks = FuturesUnordered::new();
255 let mut background_tasks = pin!(background_tasks);
256 background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
258
259 loop {
260 select! {
261 maybe_background_task = tasks_receiver.next().fuse() => {
262 let Some(background_task) = maybe_background_task else {
263 break;
264 };
265
266 background_tasks.push(background_task);
267 },
268 _ = background_tasks.select_next_some() => {
269 }
271 }
272 }
273 }),
274 true,
275 );
276
277 Self {
278 sector_encoding_semaphore,
279 retry_backoff_policy,
280 nats_client,
281 handlers: Arc::default(),
282 tasks_sender,
283 _background_tasks: background_tasks,
284 }
285 }
286
287 pub fn on_plotting_progress(
289 &self,
290 callback: HandlerFn3<PublicKey, SectorIndex, SectorPlottingProgress>,
291 ) -> HandlerId {
292 self.handlers.plotting_progress.add(callback)
293 }
294
295 #[allow(clippy::too_many_arguments)]
296 async fn plot_sector_internal<PS>(
297 &self,
298 start: Instant,
299 sector_encoding_permit: OwnedSemaphorePermit,
300 public_key: PublicKey,
301 sector_index: SectorIndex,
302 farmer_protocol_info: FarmerProtocolInfo,
303 pieces_in_sector: u16,
304 mut progress_sender: PS,
305 ) where
306 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
307 PS::Error: Error,
308 {
309 trace!("Starting plotting, getting plotting permit");
310
311 let progress_updater = ProgressUpdater {
312 public_key,
313 sector_index,
314 handlers: Arc::clone(&self.handlers),
315 };
316
317 let mut retry_backoff_policy = self.retry_backoff_policy.clone();
318 retry_backoff_policy.reset();
319
320 let free_plotter_instance_fut = get_free_plotter_instance(
322 &self.nats_client,
323 &progress_updater,
324 &mut progress_sender,
325 &mut retry_backoff_policy,
326 );
327 let mut maybe_free_instance = free_plotter_instance_fut.await;
328 if maybe_free_instance.is_none() {
329 return;
330 }
331
332 trace!("Got plotting permit #1");
333
334 let nats_client = self.nats_client.clone();
335
336 let plotting_fut = async move {
337 'outer: loop {
338 let free_instance = match maybe_free_instance.take() {
340 Some(free_instance) => free_instance,
341 None => {
342 let free_plotter_instance_fut = get_free_plotter_instance(
343 &nats_client,
344 &progress_updater,
345 &mut progress_sender,
346 &mut retry_backoff_policy,
347 );
348 let Some(free_instance) = free_plotter_instance_fut.await else {
349 break;
350 };
351 trace!("Got plotting permit #2");
352 free_instance
353 }
354 };
355
356 let response_stream_result = nats_client
357 .stream_request(
358 &ClusterPlotterPlotSectorRequest {
359 public_key,
360 sector_index,
361 farmer_protocol_info,
362 pieces_in_sector,
363 },
364 Some(&free_instance),
365 )
366 .await;
367 trace!("Subscribed to plotting notifications");
368
369 let mut response_stream = match response_stream_result {
370 Ok(response_stream) => response_stream,
371 Err(error) => {
372 progress_updater
373 .update_progress_and_events(
374 &mut progress_sender,
375 SectorPlottingProgress::Error {
376 error: format!("Failed make stream request: {error}"),
377 },
378 )
379 .await;
380
381 break;
382 }
383 };
384
385 let (mut sector_sender, sector_receiver) = mpsc::channel(
388 (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
389 .max(1),
390 );
391 let mut maybe_sector_receiver = Some(sector_receiver);
392 loop {
393 match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
394 Ok(Some(response)) => {
395 match process_response_notification(
396 &start,
397 &free_instance,
398 &progress_updater,
399 &mut progress_sender,
400 &mut retry_backoff_policy,
401 response,
402 &mut sector_sender,
403 &mut maybe_sector_receiver,
404 )
405 .await
406 {
407 ResponseProcessingResult::Retry => {
408 debug!("Retrying");
409 continue 'outer;
410 }
411 ResponseProcessingResult::Abort => {
412 debug!("Aborting");
413 break 'outer;
414 }
415 ResponseProcessingResult::Continue => {
416 trace!("Continue");
417 }
419 }
420 }
421 Ok(None) => {
422 trace!("Plotting done");
423 break;
424 }
425 Err(_error) => {
426 progress_updater
427 .update_progress_and_events(
428 &mut progress_sender,
429 SectorPlottingProgress::Error {
430 error: "Timed out without ping from plotter".to_string(),
431 },
432 )
433 .await;
434 break;
435 }
436 }
437 }
438
439 break;
440 }
441
442 drop(sector_encoding_permit);
443 };
444
445 let plotting_task =
446 AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
447 if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
448 warn!(%error, "Failed to send plotting task");
449
450 let progress = SectorPlottingProgress::Error {
451 error: format!("Failed to send plotting task: {error}"),
452 };
453
454 self.handlers
455 .plotting_progress
456 .call_simple(&public_key, §or_index, &progress);
457 }
458 }
459}
460
461async fn get_free_plotter_instance<PS>(
463 nats_client: &NatsClient,
464 progress_updater: &ProgressUpdater,
465 progress_sender: &mut PS,
466 retry_backoff_policy: &mut ExponentialBackoff,
467) -> Option<String>
468where
469 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
470 PS::Error: Error,
471{
472 loop {
473 match nats_client
474 .request(&ClusterPlotterFreeInstanceRequest, None)
475 .await
476 {
477 Ok(Some(free_instance)) => {
478 return Some(free_instance);
479 }
480 Ok(None) => {
481 if let Some(delay) = retry_backoff_policy.next_backoff() {
482 debug!("Instance was occupied, retrying #1");
483
484 tokio::time::sleep(delay).await;
485 continue;
486 } else {
487 progress_updater
488 .update_progress_and_events(
489 progress_sender,
490 SectorPlottingProgress::Error {
491 error: "Instance was occupied, exiting #1".to_string(),
492 },
493 )
494 .await;
495 return None;
496 }
497 }
498 Err(error) => match error.kind() {
499 RequestErrorKind::TimedOut => {
500 if let Some(delay) = retry_backoff_policy.next_backoff() {
501 debug!("Plotter request timed out, retrying");
502
503 tokio::time::sleep(delay).await;
504 continue;
505 } else {
506 progress_updater
507 .update_progress_and_events(
508 progress_sender,
509 SectorPlottingProgress::Error {
510 error: "Plotter request timed out, exiting".to_string(),
511 },
512 )
513 .await;
514 return None;
515 }
516 }
517 RequestErrorKind::NoResponders => {
518 if let Some(delay) = retry_backoff_policy.next_backoff() {
519 debug!("No plotters, retrying");
520
521 tokio::time::sleep(delay).await;
522 continue;
523 } else {
524 progress_updater
525 .update_progress_and_events(
526 progress_sender,
527 SectorPlottingProgress::Error {
528 error: "No plotters, exiting".to_string(),
529 },
530 )
531 .await;
532 return None;
533 }
534 }
535 RequestErrorKind::Other => {
536 progress_updater
537 .update_progress_and_events(
538 progress_sender,
539 SectorPlottingProgress::Error {
540 error: format!("Failed to get free plotter instance: {error}"),
541 },
542 )
543 .await;
544 return None;
545 }
546 },
547 };
548 }
549}
550
551enum ResponseProcessingResult {
552 Retry,
553 Abort,
554 Continue,
555}
556
557#[allow(clippy::too_many_arguments)]
558async fn process_response_notification<PS>(
559 start: &Instant,
560 free_instance: &str,
561 progress_updater: &ProgressUpdater,
562 progress_sender: &mut PS,
563 retry_backoff_policy: &mut ExponentialBackoff,
564 response: ClusterSectorPlottingProgress,
565 sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
566 maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
567) -> ResponseProcessingResult
568where
569 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
570 PS::Error: Error,
571{
572 if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
573 trace!(?response, "Processing plotting response notification");
574 } else {
575 trace!("Processing plotting response notification (sector chunk)");
576 }
577
578 match response {
579 ClusterSectorPlottingProgress::Occupied => {
580 debug!(%free_instance, "Instance was occupied, retrying #2");
581
582 if let Some(delay) = retry_backoff_policy.next_backoff() {
583 debug!("Instance was occupied, retrying #2");
584
585 tokio::time::sleep(delay).await;
586 return ResponseProcessingResult::Retry;
587 } else {
588 debug!("Instance was occupied, exiting #2");
589 return ResponseProcessingResult::Abort;
590 }
591 }
592 ClusterSectorPlottingProgress::Ping => {
593 }
595 ClusterSectorPlottingProgress::Downloading => {
596 if !progress_updater
597 .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
598 .await
599 {
600 return ResponseProcessingResult::Abort;
601 }
602 }
603 ClusterSectorPlottingProgress::Downloaded(time) => {
604 if !progress_updater
605 .update_progress_and_events(
606 progress_sender,
607 SectorPlottingProgress::Downloaded(time),
608 )
609 .await
610 {
611 return ResponseProcessingResult::Abort;
612 }
613 }
614 ClusterSectorPlottingProgress::Encoding => {
615 if !progress_updater
616 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
617 .await
618 {
619 return ResponseProcessingResult::Abort;
620 }
621 }
622 ClusterSectorPlottingProgress::Encoded(time) => {
623 if !progress_updater
624 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
625 .await
626 {
627 return ResponseProcessingResult::Abort;
628 }
629 }
630 ClusterSectorPlottingProgress::Finished {
631 plotted_sector,
632 time: _,
633 } => {
634 let Some(sector_receiver) = maybe_sector_receiver.take() else {
635 debug!("Unexpected duplicated sector plotting progress Finished");
636
637 progress_updater
638 .update_progress_and_events(
639 progress_sender,
640 SectorPlottingProgress::Error {
641 error: "Unexpected duplicated sector plotting progress Finished"
642 .to_string(),
643 },
644 )
645 .await;
646 return ResponseProcessingResult::Abort;
647 };
648
649 let progress = SectorPlottingProgress::Finished {
650 plotted_sector,
651 time: start.elapsed(),
653 sector: Box::pin(sector_receiver),
654 };
655 if !progress_updater
656 .update_progress_and_events(progress_sender, progress)
657 .await
658 {
659 return ResponseProcessingResult::Abort;
660 }
661
662 return ResponseProcessingResult::Continue;
663 }
664 ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
666 if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
667 warn!(%error, "Failed to send sector chunk");
668 return ResponseProcessingResult::Abort;
669 }
670 return ResponseProcessingResult::Continue;
671 }
672 ClusterSectorPlottingProgress::Error { error } => {
673 if !progress_updater
674 .update_progress_and_events(
675 progress_sender,
676 SectorPlottingProgress::Error { error },
677 )
678 .await
679 {
680 return ResponseProcessingResult::Abort;
681 }
682 }
683 }
684
685 ResponseProcessingResult::Continue
686}
687
688struct ProgressUpdater {
689 public_key: PublicKey,
690 sector_index: SectorIndex,
691 handlers: Arc<Handlers>,
692}
693
694impl ProgressUpdater {
695 async fn update_progress_and_events<PS>(
697 &self,
698 progress_sender: &mut PS,
699 progress: SectorPlottingProgress,
700 ) -> bool
701 where
702 PS: Sink<SectorPlottingProgress> + Unpin,
703 PS::Error: Error,
704 {
705 self.handlers.plotting_progress.call_simple(
706 &self.public_key,
707 &self.sector_index,
708 &progress,
709 );
710
711 if let Err(error) = progress_sender.send(progress).await {
712 warn!(%error, "Failed to send error progress update");
713
714 false
715 } else {
716 true
717 }
718 }
719}
720
721pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
726where
727 P: Plotter + Sync,
728{
729 let plotter_id = ClusterPlotterId::new();
730
731 select! {
732 result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
733 result
734 }
735 result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
736 result
737 }
738 }
739}
740
741async fn free_instance_responder<P>(
742 plotter_id: &ClusterPlotterId,
743 nats_client: &NatsClient,
744 plotter: &P,
745) -> anyhow::Result<()>
746where
747 P: Plotter + Sync,
748{
749 loop {
750 while !plotter.has_free_capacity().await.unwrap_or_default() {
751 tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
752 }
753
754 let mut subscription = nats_client
755 .queue_subscribe(
756 ClusterPlotterFreeInstanceRequest::SUBJECT,
757 "subspace.plotter".to_string(),
758 )
759 .await
760 .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
761 debug!(?subscription, "Free instance subscription");
762
763 while let Some(message) = subscription.next().await {
764 let Some(reply_subject) = message.reply else {
765 continue;
766 };
767
768 debug!(%reply_subject, "Free instance request");
769
770 let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
771 let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
772 has_free_capacity.then(|| plotter_id.to_string());
773
774 if let Err(error) = nats_client
775 .publish(reply_subject, response.encode().into())
776 .await
777 {
778 warn!(%error, "Failed to send free instance response");
779 }
780
781 if !has_free_capacity {
782 subscription.unsubscribe().await.map_err(|error| {
783 anyhow!("Failed to unsubscribe from free instance requests: {error}")
784 })?;
785 }
786 }
787 }
788}
789
790async fn plot_sector_responder<P>(
791 plotter_id: &ClusterPlotterId,
792 nats_client: &NatsClient,
793 plotter: &P,
794) -> anyhow::Result<()>
795where
796 P: Plotter + Sync,
797{
798 let plotter_id_string = plotter_id.to_string();
799
800 nats_client
801 .stream_request_responder(
802 Some(&plotter_id_string),
803 Some(plotter_id_string.clone()),
804 |request| async move {
805 let (progress_sender, mut progress_receiver) = mpsc::channel(10);
806
807 let fut =
808 process_plot_sector_request(nats_client, plotter, request, progress_sender);
809 let mut fut = Box::pin(fut.fuse());
810
811 Some(
812 stream::poll_fn(move |cx| {
814 if !fut.is_terminated() {
815 let _ = fut.poll_unpin(cx);
817 }
818
819 if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
820 return Poll::Ready(maybe_result);
821 }
822
823 Poll::Pending
825 }),
826 )
827 },
828 )
829 .await
830}
831
832async fn process_plot_sector_request<P>(
833 nats_client: &NatsClient,
834 plotter: &P,
835 request: ClusterPlotterPlotSectorRequest,
836 mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
837) where
838 P: Plotter,
839{
840 let ClusterPlotterPlotSectorRequest {
841 public_key,
842 sector_index,
843 farmer_protocol_info,
844 pieces_in_sector,
845 } = request;
846
847 let inner_fut = async {
849 info!("Plot sector request");
850
851 let (progress_sender, mut progress_receiver) = mpsc::channel(1);
852
853 if !plotter
854 .try_plot_sector(
855 public_key,
856 sector_index,
857 farmer_protocol_info,
858 pieces_in_sector,
859 false,
860 progress_sender,
861 )
862 .await
863 {
864 debug!("Plotter is currently occupied and can't plot more sectors");
865
866 if let Err(error) = response_proxy_sender
867 .send(ClusterSectorPlottingProgress::Occupied)
868 .await
869 {
870 warn!(%error, "Failed to send plotting progress");
871 return;
872 }
873 return;
874 }
875
876 let progress_proxy_fut = {
877 let mut response_proxy_sender = response_proxy_sender.clone();
878 let approximate_max_message_size = nats_client.approximate_max_message_size();
879
880 async move {
881 while let Some(progress) = progress_receiver.next().await {
882 send_publish_progress(
883 &mut response_proxy_sender,
884 progress,
885 approximate_max_message_size,
886 )
887 .await;
888 }
889 }
890 };
891
892 let mut ping_interval = tokio::time::interval(PING_INTERVAL);
893 ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
894 let ping_fut = async {
895 loop {
896 ping_interval.tick().await;
897 if let Err(error) = response_proxy_sender
898 .send(ClusterSectorPlottingProgress::Ping)
899 .await
900 {
901 warn!(%error, "Failed to send plotting ping");
902 return;
903 }
904 }
905 };
906
907 select! {
908 _ = progress_proxy_fut.fuse() => {
909 }
911 _ = ping_fut.fuse() => {
912 unreachable!("Ping loop never ends");
913 }
914 }
915
916 info!("Finished plotting sector successfully");
917 };
918
919 inner_fut
920 .instrument(info_span!("", %public_key, %sector_index))
921 .await
922}
923
924async fn send_publish_progress(
925 response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
926 progress: SectorPlottingProgress,
927 approximate_max_message_size: usize,
928) {
929 let cluster_progress = match progress {
931 SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
932 SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
933 SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
934 SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
935 SectorPlottingProgress::Finished {
936 plotted_sector,
937 time,
938 mut sector,
939 } => {
940 if let Err(error) = response_sender
941 .send(ClusterSectorPlottingProgress::Finished {
942 plotted_sector,
943 time,
944 })
945 .await
946 {
947 warn!(%error, "Failed to send plotting progress");
948 return;
949 }
950
951 while let Some(maybe_sector_chunk) = sector.next().await {
952 match maybe_sector_chunk {
953 Ok(sector_chunk) => {
954 for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
956 {
957 if let Err(error) = response_sender
958 .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
959 sector_chunk.slice_ref(small_sector_chunk)
960 )))
961 .await
962 {
963 warn!(%error, "Failed to send plotting progress");
964 return;
965 }
966 }
967 }
968 Err(error) => {
969 if let Err(error) = response_sender
970 .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
971 .await
972 {
973 warn!(%error, "Failed to send plotting progress");
974 return;
975 }
976 }
977 }
978 }
979
980 return;
981 }
982 SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
983 };
984
985 if let Err(error) = response_sender.send(cluster_progress).await {
986 warn!(%error, "Failed to send plotting progress");
987 }
988}