subspace_farmer/cluster/
plotter.rs

1//! Farming cluster plotter
2//!
3//! Plotter is responsible for plotting sectors in response to farmer requests.
4//!
5//! This module exposes some data structures for NATS communication, custom plotter
6//! implementation designed to work with cluster plotter and a service function to drive the backend
7//! part of the plotter.
8
9use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use anyhow::anyhow;
13use async_nats::RequestErrorKind;
14use async_trait::async_trait;
15use backoff::backoff::Backoff;
16use backoff::ExponentialBackoff;
17use bytes::Bytes;
18use derive_more::Display;
19use event_listener_primitives::{Bag, HandlerId};
20use futures::channel::mpsc;
21use futures::future::FusedFuture;
22use futures::stream::FuturesUnordered;
23use futures::{select, stream, FutureExt, Sink, SinkExt, StreamExt};
24use parity_scale_codec::{Decode, Encode};
25use std::error::Error;
26use std::future::pending;
27use std::num::NonZeroUsize;
28use std::pin::pin;
29use std::sync::Arc;
30use std::task::Poll;
31use std::time::{Duration, Instant};
32use subspace_core_primitives::sectors::SectorIndex;
33use subspace_core_primitives::PublicKey;
34use subspace_farmer_components::plotting::PlottedSector;
35use subspace_farmer_components::sector::sector_size;
36use subspace_farmer_components::FarmerProtocolInfo;
37use tokio::sync::{OwnedSemaphorePermit, Semaphore};
38use tokio::time::MissedTickBehavior;
39use tracing::{debug, info, info_span, trace, warn, Instrument};
40use ulid::Ulid;
41
42const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
43/// Intervals between pings from plotter server to client
44const PING_INTERVAL: Duration = Duration::from_secs(10);
45/// Timeout after which plotter that doesn't send pings is assumed to be down
46const PING_TIMEOUT: Duration = Duration::from_mins(1);
47
48/// Type alias used for event handlers
49pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
50type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
51
52/// An ephemeral identifier for a plotter
53#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
54pub enum ClusterPlotterId {
55    /// Plotter ID
56    Ulid(Ulid),
57}
58
59#[allow(clippy::new_without_default)]
60impl ClusterPlotterId {
61    /// Creates new ID
62    pub fn new() -> Self {
63        Self::Ulid(Ulid::new())
64    }
65}
66
67/// Request for free plotter instance
68#[derive(Debug, Clone, Encode, Decode)]
69struct ClusterPlotterFreeInstanceRequest;
70
71impl GenericRequest for ClusterPlotterFreeInstanceRequest {
72    const SUBJECT: &'static str = "subspace.plotter.free-instance";
73    /// Might be `None` if instance had to respond, but turned out it was fully occupied already
74    type Response = Option<String>;
75}
76
77#[derive(Debug, Encode, Decode)]
78enum ClusterSectorPlottingProgress {
79    /// Plotter is already fully occupied with other work
80    Occupied,
81    /// Periodic ping indicating plotter is still busy
82    Ping,
83    /// Downloading sector pieces
84    Downloading,
85    /// Downloaded sector pieces
86    Downloaded(Duration),
87    /// Encoding sector pieces
88    Encoding,
89    /// Encoded sector pieces
90    Encoded(Duration),
91    /// Finished plotting, followed by a series of sector chunks
92    Finished {
93        /// Information about plotted sector
94        plotted_sector: PlottedSector,
95        /// How much time it took to plot a sector
96        time: Duration,
97    },
98    /// Sector chunk after finished plotting
99    SectorChunk(Result<Bytes, String>),
100    /// Plotting failed
101    Error {
102        /// Error message
103        error: String,
104    },
105}
106
107/// Request to plot sector from plotter
108#[derive(Debug, Clone, Encode, Decode)]
109struct ClusterPlotterPlotSectorRequest {
110    public_key: PublicKey,
111    sector_index: SectorIndex,
112    farmer_protocol_info: FarmerProtocolInfo,
113    pieces_in_sector: u16,
114}
115
116impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
117    const SUBJECT: &'static str = "subspace.plotter.*.plot-sector";
118    type Response = ClusterSectorPlottingProgress;
119}
120
121#[derive(Default, Debug)]
122struct Handlers {
123    plotting_progress: Handler3<PublicKey, SectorIndex, SectorPlottingProgress>,
124}
125
126/// Cluster plotter
127#[derive(Debug)]
128pub struct ClusterPlotter {
129    sector_encoding_semaphore: Arc<Semaphore>,
130    retry_backoff_policy: ExponentialBackoff,
131    nats_client: NatsClient,
132    handlers: Arc<Handlers>,
133    tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
134    _background_tasks: AsyncJoinOnDrop<()>,
135}
136
137impl Drop for ClusterPlotter {
138    #[inline]
139    fn drop(&mut self) {
140        self.tasks_sender.close_channel();
141    }
142}
143
144#[async_trait]
145impl Plotter for ClusterPlotter {
146    async fn has_free_capacity(&self) -> Result<bool, String> {
147        Ok(self.sector_encoding_semaphore.available_permits() > 0
148            && self
149                .nats_client
150                .request(&ClusterPlotterFreeInstanceRequest, None)
151                .await
152                .map_err(|error| error.to_string())?
153                .is_some())
154    }
155
156    async fn plot_sector(
157        &self,
158        public_key: PublicKey,
159        sector_index: SectorIndex,
160        farmer_protocol_info: FarmerProtocolInfo,
161        pieces_in_sector: u16,
162        _replotting: bool,
163        mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
164    ) {
165        let start = Instant::now();
166
167        // Done outside the future below as a backpressure, ensuring that it is not possible to
168        // schedule unbounded number of plotting tasks
169        let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
170            .acquire_owned()
171            .await
172        {
173            Ok(sector_encoding_permit) => sector_encoding_permit,
174            Err(error) => {
175                warn!(%error, "Failed to acquire sector encoding permit");
176
177                let progress_updater = ProgressUpdater {
178                    public_key,
179                    sector_index,
180                    handlers: Arc::clone(&self.handlers),
181                };
182
183                progress_updater
184                    .update_progress_and_events(
185                        &mut progress_sender,
186                        SectorPlottingProgress::Error {
187                            error: format!("Failed to acquire sector encoding permit: {error}"),
188                        },
189                    )
190                    .await;
191
192                return;
193            }
194        };
195
196        self.plot_sector_internal(
197            start,
198            sector_encoding_permit,
199            public_key,
200            sector_index,
201            farmer_protocol_info,
202            pieces_in_sector,
203            progress_sender,
204        )
205        .await
206    }
207
208    async fn try_plot_sector(
209        &self,
210        public_key: PublicKey,
211        sector_index: SectorIndex,
212        farmer_protocol_info: FarmerProtocolInfo,
213        pieces_in_sector: u16,
214        _replotting: bool,
215        progress_sender: mpsc::Sender<SectorPlottingProgress>,
216    ) -> bool {
217        let start = Instant::now();
218
219        let Ok(sector_encoding_permit) =
220            Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
221        else {
222            return false;
223        };
224
225        self.plot_sector_internal(
226            start,
227            sector_encoding_permit,
228            public_key,
229            sector_index,
230            farmer_protocol_info,
231            pieces_in_sector,
232            progress_sender,
233        )
234        .await;
235
236        true
237    }
238}
239
240impl ClusterPlotter {
241    /// Create new instance
242    pub fn new(
243        nats_client: NatsClient,
244        sector_encoding_concurrency: NonZeroUsize,
245        retry_backoff_policy: ExponentialBackoff,
246    ) -> Self {
247        let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
248
249        let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
250
251        // Basically runs plotting tasks in the background and allows to abort on drop
252        let background_tasks = AsyncJoinOnDrop::new(
253            tokio::spawn(async move {
254                let background_tasks = FuturesUnordered::new();
255                let mut background_tasks = pin!(background_tasks);
256                // Just so that `FuturesUnordered` will never end
257                background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
258
259                loop {
260                    select! {
261                        maybe_background_task = tasks_receiver.next().fuse() => {
262                            let Some(background_task) = maybe_background_task else {
263                                break;
264                            };
265
266                            background_tasks.push(background_task);
267                        },
268                        _ = background_tasks.select_next_some() => {
269                            // Nothing to do
270                        }
271                    }
272                }
273            }),
274            true,
275        );
276
277        Self {
278            sector_encoding_semaphore,
279            retry_backoff_policy,
280            nats_client,
281            handlers: Arc::default(),
282            tasks_sender,
283            _background_tasks: background_tasks,
284        }
285    }
286
287    /// Subscribe to plotting progress notifications
288    pub fn on_plotting_progress(
289        &self,
290        callback: HandlerFn3<PublicKey, SectorIndex, SectorPlottingProgress>,
291    ) -> HandlerId {
292        self.handlers.plotting_progress.add(callback)
293    }
294
295    #[allow(clippy::too_many_arguments)]
296    async fn plot_sector_internal<PS>(
297        &self,
298        start: Instant,
299        sector_encoding_permit: OwnedSemaphorePermit,
300        public_key: PublicKey,
301        sector_index: SectorIndex,
302        farmer_protocol_info: FarmerProtocolInfo,
303        pieces_in_sector: u16,
304        mut progress_sender: PS,
305    ) where
306        PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
307        PS::Error: Error,
308    {
309        trace!("Starting plotting, getting plotting permit");
310
311        let progress_updater = ProgressUpdater {
312            public_key,
313            sector_index,
314            handlers: Arc::clone(&self.handlers),
315        };
316
317        let mut retry_backoff_policy = self.retry_backoff_policy.clone();
318        retry_backoff_policy.reset();
319
320        // Try to get plotter instance here first as a backpressure measure
321        let free_plotter_instance_fut = get_free_plotter_instance(
322            &self.nats_client,
323            &progress_updater,
324            &mut progress_sender,
325            &mut retry_backoff_policy,
326        );
327        let mut maybe_free_instance = free_plotter_instance_fut.await;
328        if maybe_free_instance.is_none() {
329            return;
330        }
331
332        trace!("Got plotting permit #1");
333
334        let nats_client = self.nats_client.clone();
335
336        let plotting_fut = async move {
337            'outer: loop {
338                // Take free instance that was found earlier if available or try to find a new one
339                let free_instance = match maybe_free_instance.take() {
340                    Some(free_instance) => free_instance,
341                    None => {
342                        let free_plotter_instance_fut = get_free_plotter_instance(
343                            &nats_client,
344                            &progress_updater,
345                            &mut progress_sender,
346                            &mut retry_backoff_policy,
347                        );
348                        let Some(free_instance) = free_plotter_instance_fut.await else {
349                            break;
350                        };
351                        trace!("Got plotting permit #2");
352                        free_instance
353                    }
354                };
355
356                let response_stream_result = nats_client
357                    .stream_request(
358                        &ClusterPlotterPlotSectorRequest {
359                            public_key,
360                            sector_index,
361                            farmer_protocol_info,
362                            pieces_in_sector,
363                        },
364                        Some(&free_instance),
365                    )
366                    .await;
367                trace!("Subscribed to plotting notifications");
368
369                let mut response_stream = match response_stream_result {
370                    Ok(response_stream) => response_stream,
371                    Err(error) => {
372                        progress_updater
373                            .update_progress_and_events(
374                                &mut progress_sender,
375                                SectorPlottingProgress::Error {
376                                    error: format!("Failed make stream request: {error}"),
377                                },
378                            )
379                            .await;
380
381                        break;
382                    }
383                };
384
385                // Allow to buffer up to the whole sector in memory to not block plotter on the
386                // other side
387                let (mut sector_sender, sector_receiver) = mpsc::channel(
388                    (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
389                        .max(1),
390                );
391                let mut maybe_sector_receiver = Some(sector_receiver);
392                loop {
393                    match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
394                        Ok(Some(response)) => {
395                            match process_response_notification(
396                                &start,
397                                &free_instance,
398                                &progress_updater,
399                                &mut progress_sender,
400                                &mut retry_backoff_policy,
401                                response,
402                                &mut sector_sender,
403                                &mut maybe_sector_receiver,
404                            )
405                            .await
406                            {
407                                ResponseProcessingResult::Retry => {
408                                    debug!("Retrying");
409                                    continue 'outer;
410                                }
411                                ResponseProcessingResult::Abort => {
412                                    debug!("Aborting");
413                                    break 'outer;
414                                }
415                                ResponseProcessingResult::Continue => {
416                                    trace!("Continue");
417                                    // Nothing to do
418                                }
419                            }
420                        }
421                        Ok(None) => {
422                            trace!("Plotting done");
423                            break;
424                        }
425                        Err(_error) => {
426                            progress_updater
427                                .update_progress_and_events(
428                                    &mut progress_sender,
429                                    SectorPlottingProgress::Error {
430                                        error: "Timed out without ping from plotter".to_string(),
431                                    },
432                                )
433                                .await;
434                            break;
435                        }
436                    }
437                }
438
439                break;
440            }
441
442            drop(sector_encoding_permit);
443        };
444
445        let plotting_task =
446            AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
447        if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
448            warn!(%error, "Failed to send plotting task");
449
450            let progress = SectorPlottingProgress::Error {
451                error: format!("Failed to send plotting task: {error}"),
452            };
453
454            self.handlers
455                .plotting_progress
456                .call_simple(&public_key, &sector_index, &progress);
457        }
458    }
459}
460
461// Try to get free plotter instance and return `None` if it is not possible
462async fn get_free_plotter_instance<PS>(
463    nats_client: &NatsClient,
464    progress_updater: &ProgressUpdater,
465    progress_sender: &mut PS,
466    retry_backoff_policy: &mut ExponentialBackoff,
467) -> Option<String>
468where
469    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
470    PS::Error: Error,
471{
472    loop {
473        match nats_client
474            .request(&ClusterPlotterFreeInstanceRequest, None)
475            .await
476        {
477            Ok(Some(free_instance)) => {
478                return Some(free_instance);
479            }
480            Ok(None) => {
481                if let Some(delay) = retry_backoff_policy.next_backoff() {
482                    debug!("Instance was occupied, retrying #1");
483
484                    tokio::time::sleep(delay).await;
485                    continue;
486                } else {
487                    progress_updater
488                        .update_progress_and_events(
489                            progress_sender,
490                            SectorPlottingProgress::Error {
491                                error: "Instance was occupied, exiting #1".to_string(),
492                            },
493                        )
494                        .await;
495                    return None;
496                }
497            }
498            Err(error) => match error.kind() {
499                RequestErrorKind::TimedOut => {
500                    if let Some(delay) = retry_backoff_policy.next_backoff() {
501                        debug!("Plotter request timed out, retrying");
502
503                        tokio::time::sleep(delay).await;
504                        continue;
505                    } else {
506                        progress_updater
507                            .update_progress_and_events(
508                                progress_sender,
509                                SectorPlottingProgress::Error {
510                                    error: "Plotter request timed out, exiting".to_string(),
511                                },
512                            )
513                            .await;
514                        return None;
515                    }
516                }
517                RequestErrorKind::NoResponders => {
518                    if let Some(delay) = retry_backoff_policy.next_backoff() {
519                        debug!("No plotters, retrying");
520
521                        tokio::time::sleep(delay).await;
522                        continue;
523                    } else {
524                        progress_updater
525                            .update_progress_and_events(
526                                progress_sender,
527                                SectorPlottingProgress::Error {
528                                    error: "No plotters, exiting".to_string(),
529                                },
530                            )
531                            .await;
532                        return None;
533                    }
534                }
535                RequestErrorKind::Other => {
536                    progress_updater
537                        .update_progress_and_events(
538                            progress_sender,
539                            SectorPlottingProgress::Error {
540                                error: format!("Failed to get free plotter instance: {error}"),
541                            },
542                        )
543                        .await;
544                    return None;
545                }
546            },
547        };
548    }
549}
550
551enum ResponseProcessingResult {
552    Retry,
553    Abort,
554    Continue,
555}
556
557#[allow(clippy::too_many_arguments)]
558async fn process_response_notification<PS>(
559    start: &Instant,
560    free_instance: &str,
561    progress_updater: &ProgressUpdater,
562    progress_sender: &mut PS,
563    retry_backoff_policy: &mut ExponentialBackoff,
564    response: ClusterSectorPlottingProgress,
565    sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
566    maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
567) -> ResponseProcessingResult
568where
569    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
570    PS::Error: Error,
571{
572    if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
573        trace!(?response, "Processing plotting response notification");
574    } else {
575        trace!("Processing plotting response notification (sector chunk)");
576    }
577
578    match response {
579        ClusterSectorPlottingProgress::Occupied => {
580            debug!(%free_instance, "Instance was occupied, retrying #2");
581
582            if let Some(delay) = retry_backoff_policy.next_backoff() {
583                debug!("Instance was occupied, retrying #2");
584
585                tokio::time::sleep(delay).await;
586                return ResponseProcessingResult::Retry;
587            } else {
588                debug!("Instance was occupied, exiting #2");
589                return ResponseProcessingResult::Abort;
590            }
591        }
592        ClusterSectorPlottingProgress::Ping => {
593            // Expected
594        }
595        ClusterSectorPlottingProgress::Downloading => {
596            if !progress_updater
597                .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
598                .await
599            {
600                return ResponseProcessingResult::Abort;
601            }
602        }
603        ClusterSectorPlottingProgress::Downloaded(time) => {
604            if !progress_updater
605                .update_progress_and_events(
606                    progress_sender,
607                    SectorPlottingProgress::Downloaded(time),
608                )
609                .await
610            {
611                return ResponseProcessingResult::Abort;
612            }
613        }
614        ClusterSectorPlottingProgress::Encoding => {
615            if !progress_updater
616                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
617                .await
618            {
619                return ResponseProcessingResult::Abort;
620            }
621        }
622        ClusterSectorPlottingProgress::Encoded(time) => {
623            if !progress_updater
624                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
625                .await
626            {
627                return ResponseProcessingResult::Abort;
628            }
629        }
630        ClusterSectorPlottingProgress::Finished {
631            plotted_sector,
632            time: _,
633        } => {
634            let Some(sector_receiver) = maybe_sector_receiver.take() else {
635                debug!("Unexpected duplicated sector plotting progress Finished");
636
637                progress_updater
638                    .update_progress_and_events(
639                        progress_sender,
640                        SectorPlottingProgress::Error {
641                            error: "Unexpected duplicated sector plotting progress Finished"
642                                .to_string(),
643                        },
644                    )
645                    .await;
646                return ResponseProcessingResult::Abort;
647            };
648
649            let progress = SectorPlottingProgress::Finished {
650                plotted_sector,
651                // Use local time instead of reported by remote plotter
652                time: start.elapsed(),
653                sector: Box::pin(sector_receiver),
654            };
655            if !progress_updater
656                .update_progress_and_events(progress_sender, progress)
657                .await
658            {
659                return ResponseProcessingResult::Abort;
660            }
661
662            return ResponseProcessingResult::Continue;
663        }
664        // This variant must be sent after Finished and it handled above
665        ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
666            if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
667                warn!(%error, "Failed to send sector chunk");
668                return ResponseProcessingResult::Abort;
669            }
670            return ResponseProcessingResult::Continue;
671        }
672        ClusterSectorPlottingProgress::Error { error } => {
673            if !progress_updater
674                .update_progress_and_events(
675                    progress_sender,
676                    SectorPlottingProgress::Error { error },
677                )
678                .await
679            {
680                return ResponseProcessingResult::Abort;
681            }
682        }
683    }
684
685    ResponseProcessingResult::Continue
686}
687
688struct ProgressUpdater {
689    public_key: PublicKey,
690    sector_index: SectorIndex,
691    handlers: Arc<Handlers>,
692}
693
694impl ProgressUpdater {
695    /// Returns `true` on success and `false` if progress receiver channel is gone
696    async fn update_progress_and_events<PS>(
697        &self,
698        progress_sender: &mut PS,
699        progress: SectorPlottingProgress,
700    ) -> bool
701    where
702        PS: Sink<SectorPlottingProgress> + Unpin,
703        PS::Error: Error,
704    {
705        self.handlers.plotting_progress.call_simple(
706            &self.public_key,
707            &self.sector_index,
708            &progress,
709        );
710
711        if let Err(error) = progress_sender.send(progress).await {
712            warn!(%error, "Failed to send error progress update");
713
714            false
715        } else {
716            true
717        }
718    }
719}
720
721/// Create plotter service that will be processing incoming requests.
722///
723/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
724/// per controller instance in order to parallelize more work across threads if needed.
725pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
726where
727    P: Plotter + Sync,
728{
729    let plotter_id = ClusterPlotterId::new();
730
731    select! {
732        result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
733            result
734        }
735        result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
736            result
737        }
738    }
739}
740
741async fn free_instance_responder<P>(
742    plotter_id: &ClusterPlotterId,
743    nats_client: &NatsClient,
744    plotter: &P,
745) -> anyhow::Result<()>
746where
747    P: Plotter + Sync,
748{
749    loop {
750        while !plotter.has_free_capacity().await.unwrap_or_default() {
751            tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
752        }
753
754        let mut subscription = nats_client
755            .queue_subscribe(
756                ClusterPlotterFreeInstanceRequest::SUBJECT,
757                "subspace.plotter".to_string(),
758            )
759            .await
760            .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
761        debug!(?subscription, "Free instance subscription");
762
763        while let Some(message) = subscription.next().await {
764            let Some(reply_subject) = message.reply else {
765                continue;
766            };
767
768            debug!(%reply_subject, "Free instance request");
769
770            let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
771            let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
772                has_free_capacity.then(|| plotter_id.to_string());
773
774            if let Err(error) = nats_client
775                .publish(reply_subject, response.encode().into())
776                .await
777            {
778                warn!(%error, "Failed to send free instance response");
779            }
780
781            if !has_free_capacity {
782                subscription.unsubscribe().await.map_err(|error| {
783                    anyhow!("Failed to unsubscribe from free instance requests: {error}")
784                })?;
785            }
786        }
787    }
788}
789
790async fn plot_sector_responder<P>(
791    plotter_id: &ClusterPlotterId,
792    nats_client: &NatsClient,
793    plotter: &P,
794) -> anyhow::Result<()>
795where
796    P: Plotter + Sync,
797{
798    let plotter_id_string = plotter_id.to_string();
799
800    nats_client
801        .stream_request_responder(
802            Some(&plotter_id_string),
803            Some(plotter_id_string.clone()),
804            |request| async move {
805                let (progress_sender, mut progress_receiver) = mpsc::channel(10);
806
807                let fut =
808                    process_plot_sector_request(nats_client, plotter, request, progress_sender);
809                let mut fut = Box::pin(fut.fuse());
810
811                Some(
812                    // Drive above future and stream back any pieces that were downloaded so far
813                    stream::poll_fn(move |cx| {
814                        if !fut.is_terminated() {
815                            // Result doesn't matter, we'll need to poll stream below anyway
816                            let _ = fut.poll_unpin(cx);
817                        }
818
819                        if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
820                            return Poll::Ready(maybe_result);
821                        }
822
823                        // Exit will be done by the stream above
824                        Poll::Pending
825                    }),
826                )
827            },
828        )
829        .await
830}
831
832async fn process_plot_sector_request<P>(
833    nats_client: &NatsClient,
834    plotter: &P,
835    request: ClusterPlotterPlotSectorRequest,
836    mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
837) where
838    P: Plotter,
839{
840    let ClusterPlotterPlotSectorRequest {
841        public_key,
842        sector_index,
843        farmer_protocol_info,
844        pieces_in_sector,
845    } = request;
846
847    // Wrapper future just for instrumentation below
848    let inner_fut = async {
849        info!("Plot sector request");
850
851        let (progress_sender, mut progress_receiver) = mpsc::channel(1);
852
853        if !plotter
854            .try_plot_sector(
855                public_key,
856                sector_index,
857                farmer_protocol_info,
858                pieces_in_sector,
859                false,
860                progress_sender,
861            )
862            .await
863        {
864            debug!("Plotter is currently occupied and can't plot more sectors");
865
866            if let Err(error) = response_proxy_sender
867                .send(ClusterSectorPlottingProgress::Occupied)
868                .await
869            {
870                warn!(%error, "Failed to send plotting progress");
871                return;
872            }
873            return;
874        }
875
876        let progress_proxy_fut = {
877            let mut response_proxy_sender = response_proxy_sender.clone();
878            let approximate_max_message_size = nats_client.approximate_max_message_size();
879
880            async move {
881                while let Some(progress) = progress_receiver.next().await {
882                    send_publish_progress(
883                        &mut response_proxy_sender,
884                        progress,
885                        approximate_max_message_size,
886                    )
887                    .await;
888                }
889            }
890        };
891
892        let mut ping_interval = tokio::time::interval(PING_INTERVAL);
893        ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
894        let ping_fut = async {
895            loop {
896                ping_interval.tick().await;
897                if let Err(error) = response_proxy_sender
898                    .send(ClusterSectorPlottingProgress::Ping)
899                    .await
900                {
901                    warn!(%error, "Failed to send plotting ping");
902                    return;
903                }
904            }
905        };
906
907        select! {
908            _ = progress_proxy_fut.fuse() => {
909                // Done
910            }
911            _ = ping_fut.fuse() => {
912                unreachable!("Ping loop never ends");
913            }
914        }
915
916        info!("Finished plotting sector successfully");
917    };
918
919    inner_fut
920        .instrument(info_span!("", %public_key, %sector_index))
921        .await
922}
923
924async fn send_publish_progress(
925    response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
926    progress: SectorPlottingProgress,
927    approximate_max_message_size: usize,
928) {
929    // Finished response is large and needs special care
930    let cluster_progress = match progress {
931        SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
932        SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
933        SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
934        SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
935        SectorPlottingProgress::Finished {
936            plotted_sector,
937            time,
938            mut sector,
939        } => {
940            if let Err(error) = response_sender
941                .send(ClusterSectorPlottingProgress::Finished {
942                    plotted_sector,
943                    time,
944                })
945                .await
946            {
947                warn!(%error, "Failed to send plotting progress");
948                return;
949            }
950
951            while let Some(maybe_sector_chunk) = sector.next().await {
952                match maybe_sector_chunk {
953                    Ok(sector_chunk) => {
954                        // Slice large chunks into smaller ones before publishing
955                        for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
956                        {
957                            if let Err(error) = response_sender
958                                .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
959                                    sector_chunk.slice_ref(small_sector_chunk)
960                                )))
961                                .await
962                            {
963                                warn!(%error, "Failed to send plotting progress");
964                                return;
965                            }
966                        }
967                    }
968                    Err(error) => {
969                        if let Err(error) = response_sender
970                            .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
971                            .await
972                        {
973                            warn!(%error, "Failed to send plotting progress");
974                            return;
975                        }
976                    }
977                }
978            }
979
980            return;
981        }
982        SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
983    };
984
985    if let Err(error) = response_sender.send(cluster_progress).await {
986        warn!(%error, "Failed to send plotting progress");
987    }
988}