subspace_farmer/cluster/
controller.rs

1//! Farming cluster controller
2//!
3//! Controller is responsible for managing farming cluster.
4//!
5//! This module exposes some data structures for NATS communication, custom piece getter and node
6//! client implementations designed to work with cluster controller and a service function to drive
7//! the backend part of the controller.
8
9pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15    GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use anyhow::anyhow;
21use async_nats::HeaderValue;
22use async_trait::async_trait;
23use futures::channel::mpsc;
24use futures::future::FusedFuture;
25use futures::stream::FuturesUnordered;
26use futures::{select, stream, FutureExt, Stream, StreamExt};
27use parity_scale_codec::{Decode, Encode};
28use parking_lot::Mutex;
29use rand::prelude::*;
30use std::collections::{HashMap, HashSet};
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::Poll;
34use subspace_core_primitives::pieces::{Piece, PieceIndex};
35use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
36use subspace_data_retrieval::piece_getter::PieceGetter;
37use subspace_rpc_primitives::{
38    FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
39};
40use tracing::{debug, error, trace, warn};
41
42/// Special "cache group" that all controllers subscribe to and that can be used to query any cache
43/// group. The cache group for each query is chosen at random.
44const GLOBAL_CACHE_GROUP: &str = "_";
45
46/// Broadcast sent by controllers requesting farmers to identify themselves
47#[derive(Debug, Copy, Clone, Encode, Decode)]
48pub struct ClusterControllerFarmerIdentifyBroadcast;
49
50impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
51    const SUBJECT: &'static str = "subspace.controller.farmer-identify";
52}
53
54/// Broadcast sent by controllers requesting caches in cache group to identify themselves
55#[derive(Debug, Copy, Clone, Encode, Decode)]
56pub struct ClusterControllerCacheIdentifyBroadcast;
57
58impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
59    /// `*` here stands for cache group
60    const SUBJECT: &'static str = "subspace.controller.*.cache-identify";
61}
62
63/// Broadcast with slot info sent by controllers
64#[derive(Debug, Clone, Encode, Decode)]
65struct ClusterControllerSlotInfoBroadcast {
66    slot_info: SlotInfo,
67    instance: String,
68}
69
70impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
71    const SUBJECT: &'static str = "subspace.controller.slot-info";
72
73    fn deterministic_message_id(&self) -> Option<HeaderValue> {
74        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
75        //  be simplified to just a slot number
76        Some(HeaderValue::from(
77            format!("slot-info-{}", self.slot_info.slot_number).as_str(),
78        ))
79    }
80}
81
82/// Broadcast with reward signing info by controllers
83#[derive(Debug, Clone, Encode, Decode)]
84struct ClusterControllerRewardSigningBroadcast {
85    reward_signing_info: RewardSigningInfo,
86}
87
88impl GenericBroadcast for ClusterControllerRewardSigningBroadcast {
89    const SUBJECT: &'static str = "subspace.controller.reward-signing-info";
90}
91
92/// Broadcast with archived segment headers by controllers
93#[derive(Debug, Clone, Encode, Decode)]
94struct ClusterControllerArchivedSegmentHeaderBroadcast {
95    archived_segment_header: SegmentHeader,
96}
97
98impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast {
99    const SUBJECT: &'static str = "subspace.controller.archived-segment-header";
100
101    fn deterministic_message_id(&self) -> Option<HeaderValue> {
102        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
103        //  be simplified to just a segment index
104        Some(HeaderValue::from(
105            format!(
106                "archived-segment-{}",
107                self.archived_segment_header.segment_index()
108            )
109            .as_str(),
110        ))
111    }
112}
113
114/// Notification messages with solution by farmers
115#[derive(Debug, Clone, Encode, Decode)]
116struct ClusterControllerSolutionNotification {
117    solution_response: SolutionResponse,
118}
119
120impl GenericNotification for ClusterControllerSolutionNotification {
121    const SUBJECT: &'static str = "subspace.controller.*.solution";
122}
123
124/// Notification messages with reward signature by farmers
125#[derive(Debug, Clone, Encode, Decode)]
126struct ClusterControllerRewardSignatureNotification {
127    reward_signature: RewardSignatureResponse,
128}
129
130impl GenericNotification for ClusterControllerRewardSignatureNotification {
131    const SUBJECT: &'static str = "subspace.controller.reward-signature";
132}
133
134/// Request farmer app info from controller
135#[derive(Debug, Clone, Encode, Decode)]
136struct ClusterControllerFarmerAppInfoRequest;
137
138impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
139    const SUBJECT: &'static str = "subspace.controller.farmer-app-info";
140    type Response = Result<FarmerAppInfo, String>;
141}
142
143/// Request segment headers with specified segment indices
144#[derive(Debug, Clone, Encode, Decode)]
145struct ClusterControllerSegmentHeadersRequest {
146    segment_indices: Vec<SegmentIndex>,
147}
148
149impl GenericRequest for ClusterControllerSegmentHeadersRequest {
150    const SUBJECT: &'static str = "subspace.controller.segment-headers";
151    type Response = Vec<Option<SegmentHeader>>;
152}
153
154/// Find piece with specified index in cache
155#[derive(Debug, Clone, Encode, Decode)]
156struct ClusterControllerFindPieceInCacheRequest {
157    piece_index: PieceIndex,
158}
159
160impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
161    const SUBJECT: &'static str = "subspace.controller.*.find-piece-in-cache";
162    type Response = Option<(PieceCacheId, PieceCacheOffset)>;
163}
164
165/// Find pieces with specified indices in cache
166#[derive(Debug, Clone, Encode, Decode)]
167struct ClusterControllerFindPiecesInCacheRequest {
168    piece_indices: Vec<PieceIndex>,
169}
170
171impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
172    const SUBJECT: &'static str = "subspace.controller.*.find-pieces-in-cache";
173    /// Only pieces that were found are returned
174    type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
175}
176
177/// Request piece with specified index
178#[derive(Debug, Clone, Encode, Decode)]
179struct ClusterControllerPieceRequest {
180    piece_index: PieceIndex,
181}
182
183impl GenericRequest for ClusterControllerPieceRequest {
184    const SUBJECT: &'static str = "subspace.controller.piece";
185    type Response = Option<Piece>;
186}
187
188/// Request pieces with specified index
189#[derive(Debug, Clone, Encode, Decode)]
190struct ClusterControllerPiecesRequest {
191    piece_indices: Vec<PieceIndex>,
192}
193
194impl GenericStreamRequest for ClusterControllerPiecesRequest {
195    const SUBJECT: &'static str = "subspace.controller.pieces";
196    /// Only pieces that were found are returned
197    type Response = (PieceIndex, Piece);
198}
199
200/// Cluster piece getter
201#[derive(Debug, Clone)]
202pub struct ClusterPieceGetter {
203    nats_client: NatsClient,
204    cache_group: String,
205}
206
207#[async_trait]
208impl PieceGetter for ClusterPieceGetter {
209    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
210        if let Some((piece_cache_id, piece_cache_offset)) = self
211            .nats_client
212            .request(
213                &ClusterControllerFindPieceInCacheRequest { piece_index },
214                Some(&self.cache_group),
215            )
216            .await?
217        {
218            trace!(
219                %piece_index,
220                %piece_cache_id,
221                %piece_cache_offset,
222                "Found piece in cache, retrieving"
223            );
224
225            match self
226                .nats_client
227                .request(
228                    &ClusterCacheReadPieceRequest {
229                        offset: piece_cache_offset,
230                    },
231                    Some(&piece_cache_id.to_string()),
232                )
233                .await
234                .map_err(|error| error.to_string())
235                .flatten()
236            {
237                Ok(Some((retrieved_piece_index, piece))) => {
238                    if retrieved_piece_index == piece_index {
239                        trace!(
240                            %piece_index,
241                            %piece_cache_id,
242                            %piece_cache_offset,
243                            "Retrieved piece from cache successfully"
244                        );
245
246                        return Ok(Some(piece));
247                    } else {
248                        trace!(
249                            %piece_index,
250                            %piece_cache_id,
251                            %piece_cache_offset,
252                            "Retrieving piece was replaced in cache during retrieval"
253                        );
254                    }
255                }
256                Ok(None) => {
257                    trace!(
258                        %piece_index,
259                        %piece_cache_id,
260                        %piece_cache_offset,
261                        "Piece cache didn't have piece at offset"
262                    );
263                }
264                Err(error) => {
265                    debug!(
266                        %piece_index,
267                        %piece_cache_id,
268                        %piece_cache_offset,
269                        %error,
270                        "Retrieving piece from cache failed"
271                    );
272                }
273            }
274        } else {
275            trace!(%piece_index, "Piece not found in cache");
276        }
277
278        Ok(self
279            .nats_client
280            .request(&ClusterControllerPieceRequest { piece_index }, None)
281            .await?)
282    }
283
284    async fn get_pieces<'a>(
285        &'a self,
286        piece_indices: Vec<PieceIndex>,
287    ) -> anyhow::Result<
288        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
289    > {
290        let (tx, mut rx) = mpsc::unbounded();
291
292        let piece_indices_to_get =
293            Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
294
295        let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
296
297        {
298            let mut cached_pieces = self
299                .nats_client
300                .stream_request(
301                    &ClusterControllerFindPiecesInCacheRequest { piece_indices },
302                    Some(&self.cache_group),
303                )
304                .await?;
305
306            while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
307                cached_pieces.next().await
308            {
309                cached_pieces_by_cache_id
310                    .entry(piece_cache_id)
311                    .or_default()
312                    .push(piece_cache_offset);
313            }
314        }
315
316        let fut = async move {
317            let tx = &tx;
318
319            cached_pieces_by_cache_id
320                .into_iter()
321                .map(|(piece_cache_id, offsets)| {
322                    let piece_indices_to_get = &piece_indices_to_get;
323
324                    async move {
325                        let mut pieces_stream = match self
326                            .nats_client
327                            .stream_request(
328                                &ClusterCacheReadPiecesRequest { offsets },
329                                Some(&piece_cache_id.to_string()),
330                            )
331                            .await
332                        {
333                            Ok(pieces) => pieces,
334                            Err(error) => {
335                                warn!(
336                                    %error,
337                                    %piece_cache_id,
338                                    "Failed to request pieces from cache"
339                                );
340
341                                return;
342                            }
343                        };
344
345                        while let Some(piece_result) = pieces_stream.next().await {
346                            let (piece_offset, maybe_piece) = match piece_result {
347                                Ok(result) => result,
348                                Err(error) => {
349                                    warn!(%error, "Failed to get piece from cache");
350                                    continue;
351                                }
352                            };
353
354                            if let Some((piece_index, piece)) = maybe_piece {
355                                piece_indices_to_get.lock().remove(&piece_index);
356
357                                tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
358                                    "This future isn't polled after receiver is dropped; qed",
359                                );
360                            } else {
361                                warn!(
362                                    %piece_cache_id,
363                                    %piece_offset,
364                                    "Failed to get piece from cache, it was missing or already gone"
365                                );
366                            }
367                        }
368                    }
369                })
370                .collect::<FuturesUnordered<_>>()
371                // Simply drain everything
372                .for_each(|()| async {})
373                .await;
374
375            let mut piece_indices_to_get = piece_indices_to_get.into_inner();
376            if piece_indices_to_get.is_empty() {
377                return;
378            }
379
380            let mut pieces_from_controller = match self
381                .nats_client
382                .stream_request(
383                    &ClusterControllerPiecesRequest {
384                        piece_indices: piece_indices_to_get.iter().copied().collect(),
385                    },
386                    None,
387                )
388                .await
389            {
390                Ok(pieces_from_controller) => pieces_from_controller,
391                Err(error) => {
392                    error!(%error, "Failed to get pieces from controller");
393
394                    for piece_index in piece_indices_to_get {
395                        tx.unbounded_send((
396                            piece_index,
397                            Err(anyhow::anyhow!("Failed to get piece from controller")),
398                        ))
399                        .expect("This future isn't polled after receiver is dropped; qed");
400                    }
401                    return;
402                }
403            };
404
405            while let Some((piece_index, piece)) = pieces_from_controller.next().await {
406                piece_indices_to_get.remove(&piece_index);
407                tx.unbounded_send((piece_index, Ok(Some(piece))))
408                    .expect("This future isn't polled after receiver is dropped; qed");
409            }
410
411            for piece_index in piece_indices_to_get {
412                tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
413                    .expect("This future isn't polled after receiver is dropped; qed");
414            }
415        };
416        let mut fut = Box::pin(fut.fuse());
417
418        // Drive above future and stream back any pieces that were downloaded so far
419        Ok(Box::new(stream::poll_fn(move |cx| {
420            if !fut.is_terminated() {
421                // Result doesn't matter, we'll need to poll stream below anyway
422                let _ = fut.poll_unpin(cx);
423            }
424
425            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
426                return Poll::Ready(maybe_result);
427            }
428
429            // Exit will be done by the stream above
430            Poll::Pending
431        })))
432    }
433}
434
435impl ClusterPieceGetter {
436    /// Create new instance
437    #[inline]
438    pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
439        Self {
440            nats_client,
441            cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
442        }
443    }
444}
445
446/// [`NodeClient`] used in cluster environment that connects to node through a controller instead
447/// of to the node directly
448#[derive(Debug, Clone)]
449pub struct ClusterNodeClient {
450    nats_client: NatsClient,
451    // Store last slot info instance that can be used to send solution response to (some instances
452    // may be not synced and not able to receive solution responses)
453    last_slot_info_instance: Arc<Mutex<String>>,
454}
455
456impl ClusterNodeClient {
457    /// Create a new instance
458    pub fn new(nats_client: NatsClient) -> Self {
459        Self {
460            nats_client,
461            last_slot_info_instance: Arc::default(),
462        }
463    }
464}
465
466#[async_trait]
467impl NodeClient for ClusterNodeClient {
468    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
469        Ok(self
470            .nats_client
471            .request(&ClusterControllerFarmerAppInfoRequest, None)
472            .await?
473            .map_err(anyhow::Error::msg)?)
474    }
475
476    async fn subscribe_slot_info(
477        &self,
478    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
479        let subscription = self
480            .nats_client
481            .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
482            .await?
483            .filter_map({
484                let mut last_slot_number = None;
485                let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
486
487                move |broadcast| {
488                    let slot_info = broadcast.slot_info;
489
490                    let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
491                        && last_slot_number >= slot_info.slot_number
492                    {
493                        None
494                    } else {
495                        last_slot_number.replace(slot_info.slot_number);
496                        *last_slot_info_instance.lock() = broadcast.instance;
497
498                        Some(slot_info)
499                    };
500
501                    async move { maybe_slot_info }
502                }
503            });
504
505        Ok(Box::pin(subscription))
506    }
507
508    async fn submit_solution_response(
509        &self,
510        solution_response: SolutionResponse,
511    ) -> anyhow::Result<()> {
512        let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
513        Ok(self
514            .nats_client
515            .notification(
516                &ClusterControllerSolutionNotification { solution_response },
517                Some(&last_slot_info_instance),
518            )
519            .await?)
520    }
521
522    async fn subscribe_reward_signing(
523        &self,
524    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
525        let subscription = self
526            .nats_client
527            .subscribe_to_broadcasts::<ClusterControllerRewardSigningBroadcast>(None, None)
528            .await?
529            .map(|broadcast| broadcast.reward_signing_info);
530
531        Ok(Box::pin(subscription))
532    }
533
534    /// Submit a block signature
535    async fn submit_reward_signature(
536        &self,
537        reward_signature: RewardSignatureResponse,
538    ) -> anyhow::Result<()> {
539        Ok(self
540            .nats_client
541            .notification(
542                &ClusterControllerRewardSignatureNotification { reward_signature },
543                None,
544            )
545            .await?)
546    }
547
548    async fn subscribe_archived_segment_headers(
549        &self,
550    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
551        let subscription = self
552            .nats_client
553            .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
554            .await?
555            .filter_map({
556                let mut last_archived_segment_index = None;
557
558                move |broadcast| {
559                    let archived_segment_header = broadcast.archived_segment_header;
560                    let segment_index = archived_segment_header.segment_index();
561
562                    let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
563                        last_archived_segment_index
564                        && last_archived_segment_index >= segment_index
565                    {
566                        None
567                    } else {
568                        last_archived_segment_index.replace(segment_index);
569
570                        Some(archived_segment_header)
571                    };
572
573                    async move { maybe_archived_segment_header }
574                }
575            });
576
577        Ok(Box::pin(subscription))
578    }
579
580    async fn segment_headers(
581        &self,
582        segment_indices: Vec<SegmentIndex>,
583    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
584        Ok(self
585            .nats_client
586            .request(
587                &ClusterControllerSegmentHeadersRequest { segment_indices },
588                None,
589            )
590            .await?)
591    }
592
593    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
594        Ok(self
595            .nats_client
596            .request(&ClusterControllerPieceRequest { piece_index }, None)
597            .await?)
598    }
599
600    async fn acknowledge_archived_segment_header(
601        &self,
602        _segment_index: SegmentIndex,
603    ) -> anyhow::Result<()> {
604        // Acknowledgement is unnecessary/unsupported
605        Ok(())
606    }
607}
608
609/// Create controller service that handles things like broadcasting information (for example slot
610/// notifications) as well as responding to incoming requests (like piece requests).
611///
612/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
613/// per controller instance in order to parallelize more work across threads if needed.
614pub async fn controller_service<NC, PG>(
615    nats_client: &NatsClient,
616    node_client: &NC,
617    piece_getter: &PG,
618    farmer_caches: &[(&str, &FarmerCache)],
619    instance: &str,
620    primary_instance: bool,
621) -> anyhow::Result<()>
622where
623    NC: NodeClient,
624    PG: PieceGetter + Sync,
625{
626    if primary_instance {
627        select! {
628            result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
629                result
630            },
631            result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
632                result
633            },
634            result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
635                result
636            },
637            result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
638                result
639            },
640            result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => {
641                result
642            },
643            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
644                result
645            },
646            result = segment_headers_responder(nats_client, node_client).fuse() => {
647                result
648            },
649            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
650                result
651            },
652            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
653                result
654            },
655            result = piece_responder(nats_client, piece_getter).fuse() => {
656                result
657            },
658            result = pieces_responder(nats_client, piece_getter).fuse() => {
659                result
660            },
661        }
662    } else {
663        select! {
664            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
665                result
666            },
667            result = segment_headers_responder(nats_client, node_client).fuse() => {
668                result
669            },
670            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
671                result
672            },
673            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
674                result
675            },
676            result = piece_responder(nats_client, piece_getter).fuse() => {
677                result
678            },
679            result = pieces_responder(nats_client, piece_getter).fuse() => {
680                result
681            },
682        }
683    }
684}
685
686async fn slot_info_broadcaster<NC>(
687    nats_client: &NatsClient,
688    node_client: &NC,
689    instance: &str,
690) -> anyhow::Result<()>
691where
692    NC: NodeClient,
693{
694    let mut slot_info_notifications = node_client
695        .subscribe_slot_info()
696        .await
697        .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
698
699    while let Some(slot_info) = slot_info_notifications.next().await {
700        debug!(?slot_info, "New slot");
701
702        let slot = slot_info.slot_number;
703
704        if let Err(error) = nats_client
705            .broadcast(
706                &ClusterControllerSlotInfoBroadcast {
707                    slot_info,
708                    instance: instance.to_string(),
709                },
710                instance,
711            )
712            .await
713        {
714            warn!(%slot, %error, "Failed to broadcast slot info");
715        }
716    }
717
718    Ok(())
719}
720
721async fn reward_signing_broadcaster<NC>(
722    nats_client: &NatsClient,
723    node_client: &NC,
724    instance: &str,
725) -> anyhow::Result<()>
726where
727    NC: NodeClient,
728{
729    let mut reward_signing_notifications = node_client
730        .subscribe_reward_signing()
731        .await
732        .map_err(|error| anyhow!("Failed to subscribe to reward signing notifications: {error}"))?;
733
734    while let Some(reward_signing_info) = reward_signing_notifications.next().await {
735        trace!(?reward_signing_info, "New reward signing notification");
736
737        if let Err(error) = nats_client
738            .broadcast(
739                &ClusterControllerRewardSigningBroadcast {
740                    reward_signing_info,
741                },
742                instance,
743            )
744            .await
745        {
746            warn!(%error, "Failed to broadcast reward signing info");
747        }
748    }
749
750    Ok(())
751}
752
753async fn archived_segment_headers_broadcaster<NC>(
754    nats_client: &NatsClient,
755    node_client: &NC,
756    instance: &str,
757) -> anyhow::Result<()>
758where
759    NC: NodeClient,
760{
761    let mut archived_segments_notifications = node_client
762        .subscribe_archived_segment_headers()
763        .await
764        .map_err(|error| {
765            anyhow!("Failed to subscribe to archived segment header notifications: {error}")
766        })?;
767
768    while let Some(archived_segment_header) = archived_segments_notifications.next().await {
769        trace!(
770            ?archived_segment_header,
771            "New archived archived segment header notification"
772        );
773
774        node_client
775            .acknowledge_archived_segment_header(archived_segment_header.segment_index())
776            .await
777            .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?;
778
779        if let Err(error) = nats_client
780            .broadcast(
781                &ClusterControllerArchivedSegmentHeaderBroadcast {
782                    archived_segment_header,
783                },
784                instance,
785            )
786            .await
787        {
788            warn!(%error, "Failed to broadcast archived segment header info");
789        }
790    }
791
792    Ok(())
793}
794
795async fn solution_response_forwarder<NC>(
796    nats_client: &NatsClient,
797    node_client: &NC,
798    instance: &str,
799) -> anyhow::Result<()>
800where
801    NC: NodeClient,
802{
803    let mut subscription = nats_client
804        .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
805            Some(instance),
806            Some(instance.to_string()),
807        )
808        .await
809        .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
810
811    while let Some(notification) = subscription.next().await {
812        debug!(?notification, "Solution notification");
813
814        let slot = notification.solution_response.slot_number;
815        let public_key = notification.solution_response.solution.public_key;
816        let sector_index = notification.solution_response.solution.sector_index;
817
818        if let Err(error) = node_client
819            .submit_solution_response(notification.solution_response)
820            .await
821        {
822            warn!(
823                %error,
824                %slot,
825                %public_key,
826                %sector_index,
827                "Failed to send solution response"
828            );
829        }
830    }
831
832    Ok(())
833}
834
835async fn reward_signature_forwarder<NC>(
836    nats_client: &NatsClient,
837    node_client: &NC,
838    instance: &str,
839) -> anyhow::Result<()>
840where
841    NC: NodeClient,
842{
843    let mut subscription = nats_client
844        .subscribe_to_notifications::<ClusterControllerRewardSignatureNotification>(
845            None,
846            Some(instance.to_string()),
847        )
848        .await
849        .map_err(|error| {
850            anyhow!("Failed to subscribe to reward signature notifications: {error}")
851        })?;
852
853    while let Some(notification) = subscription.next().await {
854        debug!(?notification, "Reward signature notification");
855
856        if let Err(error) = node_client
857            .submit_reward_signature(notification.reward_signature)
858            .await
859        {
860            warn!(%error, "Failed to send reward signature");
861        }
862    }
863
864    Ok(())
865}
866
867async fn farmer_app_info_responder<NC>(
868    nats_client: &NatsClient,
869    node_client: &NC,
870) -> anyhow::Result<()>
871where
872    NC: NodeClient,
873{
874    nats_client
875        .request_responder(
876            None,
877            Some("subspace.controller".to_string()),
878            |_: ClusterControllerFarmerAppInfoRequest| async move {
879                Some(
880                    node_client
881                        .farmer_app_info()
882                        .await
883                        .map_err(|error| error.to_string()),
884                )
885            },
886        )
887        .await
888}
889
890async fn segment_headers_responder<NC>(
891    nats_client: &NatsClient,
892    node_client: &NC,
893) -> anyhow::Result<()>
894where
895    NC: NodeClient,
896{
897    nats_client
898        .request_responder(
899            None,
900            Some("subspace.controller".to_string()),
901            |ClusterControllerSegmentHeadersRequest { segment_indices }| async move {
902                node_client
903                    .segment_headers(segment_indices.clone())
904                    .await
905                    .inspect_err(|error| {
906                        warn!(%error, ?segment_indices, "Failed to get segment headers");
907                    })
908                    .ok()
909            },
910        )
911        .await
912}
913
914async fn find_piece_responder(
915    nats_client: &NatsClient,
916    farmer_caches: &[(&str, &FarmerCache)],
917) -> anyhow::Result<()> {
918    futures::future::try_join(
919        farmer_caches
920            .iter()
921            .map(|(cache_group, farmer_cache)| {
922                nats_client.request_responder(
923                    Some(cache_group),
924                    Some("subspace.controller".to_string()),
925                    move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
926                        Some(farmer_cache.find_piece(piece_index).await)
927                    },
928                )
929            })
930            .collect::<FuturesUnordered<_>>()
931            .next()
932            .map(|result| result.unwrap_or(Ok(()))),
933        nats_client.request_responder(
934            Some(GLOBAL_CACHE_GROUP),
935            Some("subspace.controller".to_string()),
936            |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
937                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut thread_rng())?;
938                Some(farmer_cache.find_piece(piece_index).await)
939            },
940        ),
941    )
942    .await
943    .map(|((), ())| ())
944}
945
946async fn find_pieces_responder(
947    nats_client: &NatsClient,
948    farmer_caches: &[(&str, &FarmerCache)],
949) -> anyhow::Result<()> {
950    futures::future::try_join(
951        farmer_caches
952            .iter()
953            .map(|(cache_group, farmer_cache)| {
954                nats_client.stream_request_responder(
955                    Some(cache_group),
956                    Some("subspace.controller".to_string()),
957                    move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
958                        Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
959                    },
960                )
961            })
962            .collect::<FuturesUnordered<_>>()
963            .next()
964            .map(|result| result.unwrap_or(Ok(()))),
965        nats_client.stream_request_responder(
966            Some(GLOBAL_CACHE_GROUP),
967            Some("subspace.controller".to_string()),
968            |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
969                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut thread_rng())?;
970                Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
971            },
972        ),
973    )
974    .await
975    .map(|((), ())| ())
976}
977
978async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
979where
980    PG: PieceGetter + Sync,
981{
982    nats_client
983        .request_responder(
984            None,
985            Some("subspace.controller".to_string()),
986            |ClusterControllerPieceRequest { piece_index }| async move {
987                piece_getter
988                    .get_piece(piece_index)
989                    .await
990                    .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
991                    .ok()
992            },
993        )
994        .await
995}
996
997async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
998where
999    PG: PieceGetter + Sync,
1000{
1001    nats_client
1002        .stream_request_responder(
1003            None,
1004            Some("subspace.controller".to_string()),
1005            |ClusterControllerPiecesRequest { piece_indices }| async move {
1006                piece_getter
1007                    .get_pieces(piece_indices)
1008                    .await
1009                    .map(|stream| {
1010                        Box::pin(stream.filter_map(
1011                            |(piece_index, maybe_piece_result)| async move {
1012                                match maybe_piece_result {
1013                                    Ok(Some(piece)) => Some((piece_index, piece)),
1014                                    Ok(None) => None,
1015                                    Err(error) => {
1016                                        warn!(%error, %piece_index, "Failed to get piece");
1017                                        None
1018                                    }
1019                                }
1020                            },
1021                        ))
1022                    })
1023                    .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1024                    .ok()
1025            },
1026        )
1027        .await
1028}