1pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15 GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use anyhow::anyhow;
21use async_nats::HeaderValue;
22use async_trait::async_trait;
23use futures::channel::mpsc;
24use futures::future::FusedFuture;
25use futures::stream::FuturesUnordered;
26use futures::{select, stream, FutureExt, Stream, StreamExt};
27use parity_scale_codec::{Decode, Encode};
28use parking_lot::Mutex;
29use rand::prelude::*;
30use std::collections::{HashMap, HashSet};
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::Poll;
34use subspace_core_primitives::pieces::{Piece, PieceIndex};
35use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
36use subspace_data_retrieval::piece_getter::PieceGetter;
37use subspace_rpc_primitives::{
38 FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
39};
40use tracing::{debug, error, trace, warn};
41
42const GLOBAL_CACHE_GROUP: &str = "_";
45
46#[derive(Debug, Copy, Clone, Encode, Decode)]
48pub struct ClusterControllerFarmerIdentifyBroadcast;
49
50impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
51 const SUBJECT: &'static str = "subspace.controller.farmer-identify";
52}
53
54#[derive(Debug, Copy, Clone, Encode, Decode)]
56pub struct ClusterControllerCacheIdentifyBroadcast;
57
58impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
59 const SUBJECT: &'static str = "subspace.controller.*.cache-identify";
61}
62
63#[derive(Debug, Clone, Encode, Decode)]
65struct ClusterControllerSlotInfoBroadcast {
66 slot_info: SlotInfo,
67 instance: String,
68}
69
70impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
71 const SUBJECT: &'static str = "subspace.controller.slot-info";
72
73 fn deterministic_message_id(&self) -> Option<HeaderValue> {
74 Some(HeaderValue::from(
77 format!("slot-info-{}", self.slot_info.slot_number).as_str(),
78 ))
79 }
80}
81
82#[derive(Debug, Clone, Encode, Decode)]
84struct ClusterControllerRewardSigningBroadcast {
85 reward_signing_info: RewardSigningInfo,
86}
87
88impl GenericBroadcast for ClusterControllerRewardSigningBroadcast {
89 const SUBJECT: &'static str = "subspace.controller.reward-signing-info";
90}
91
92#[derive(Debug, Clone, Encode, Decode)]
94struct ClusterControllerArchivedSegmentHeaderBroadcast {
95 archived_segment_header: SegmentHeader,
96}
97
98impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast {
99 const SUBJECT: &'static str = "subspace.controller.archived-segment-header";
100
101 fn deterministic_message_id(&self) -> Option<HeaderValue> {
102 Some(HeaderValue::from(
105 format!(
106 "archived-segment-{}",
107 self.archived_segment_header.segment_index()
108 )
109 .as_str(),
110 ))
111 }
112}
113
114#[derive(Debug, Clone, Encode, Decode)]
116struct ClusterControllerSolutionNotification {
117 solution_response: SolutionResponse,
118}
119
120impl GenericNotification for ClusterControllerSolutionNotification {
121 const SUBJECT: &'static str = "subspace.controller.*.solution";
122}
123
124#[derive(Debug, Clone, Encode, Decode)]
126struct ClusterControllerRewardSignatureNotification {
127 reward_signature: RewardSignatureResponse,
128}
129
130impl GenericNotification for ClusterControllerRewardSignatureNotification {
131 const SUBJECT: &'static str = "subspace.controller.reward-signature";
132}
133
134#[derive(Debug, Clone, Encode, Decode)]
136struct ClusterControllerFarmerAppInfoRequest;
137
138impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
139 const SUBJECT: &'static str = "subspace.controller.farmer-app-info";
140 type Response = Result<FarmerAppInfo, String>;
141}
142
143#[derive(Debug, Clone, Encode, Decode)]
145struct ClusterControllerSegmentHeadersRequest {
146 segment_indices: Vec<SegmentIndex>,
147}
148
149impl GenericRequest for ClusterControllerSegmentHeadersRequest {
150 const SUBJECT: &'static str = "subspace.controller.segment-headers";
151 type Response = Vec<Option<SegmentHeader>>;
152}
153
154#[derive(Debug, Clone, Encode, Decode)]
156struct ClusterControllerFindPieceInCacheRequest {
157 piece_index: PieceIndex,
158}
159
160impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
161 const SUBJECT: &'static str = "subspace.controller.*.find-piece-in-cache";
162 type Response = Option<(PieceCacheId, PieceCacheOffset)>;
163}
164
165#[derive(Debug, Clone, Encode, Decode)]
167struct ClusterControllerFindPiecesInCacheRequest {
168 piece_indices: Vec<PieceIndex>,
169}
170
171impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
172 const SUBJECT: &'static str = "subspace.controller.*.find-pieces-in-cache";
173 type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
175}
176
177#[derive(Debug, Clone, Encode, Decode)]
179struct ClusterControllerPieceRequest {
180 piece_index: PieceIndex,
181}
182
183impl GenericRequest for ClusterControllerPieceRequest {
184 const SUBJECT: &'static str = "subspace.controller.piece";
185 type Response = Option<Piece>;
186}
187
188#[derive(Debug, Clone, Encode, Decode)]
190struct ClusterControllerPiecesRequest {
191 piece_indices: Vec<PieceIndex>,
192}
193
194impl GenericStreamRequest for ClusterControllerPiecesRequest {
195 const SUBJECT: &'static str = "subspace.controller.pieces";
196 type Response = (PieceIndex, Piece);
198}
199
200#[derive(Debug, Clone)]
202pub struct ClusterPieceGetter {
203 nats_client: NatsClient,
204 cache_group: String,
205}
206
207#[async_trait]
208impl PieceGetter for ClusterPieceGetter {
209 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
210 if let Some((piece_cache_id, piece_cache_offset)) = self
211 .nats_client
212 .request(
213 &ClusterControllerFindPieceInCacheRequest { piece_index },
214 Some(&self.cache_group),
215 )
216 .await?
217 {
218 trace!(
219 %piece_index,
220 %piece_cache_id,
221 %piece_cache_offset,
222 "Found piece in cache, retrieving"
223 );
224
225 match self
226 .nats_client
227 .request(
228 &ClusterCacheReadPieceRequest {
229 offset: piece_cache_offset,
230 },
231 Some(&piece_cache_id.to_string()),
232 )
233 .await
234 .map_err(|error| error.to_string())
235 .flatten()
236 {
237 Ok(Some((retrieved_piece_index, piece))) => {
238 if retrieved_piece_index == piece_index {
239 trace!(
240 %piece_index,
241 %piece_cache_id,
242 %piece_cache_offset,
243 "Retrieved piece from cache successfully"
244 );
245
246 return Ok(Some(piece));
247 } else {
248 trace!(
249 %piece_index,
250 %piece_cache_id,
251 %piece_cache_offset,
252 "Retrieving piece was replaced in cache during retrieval"
253 );
254 }
255 }
256 Ok(None) => {
257 trace!(
258 %piece_index,
259 %piece_cache_id,
260 %piece_cache_offset,
261 "Piece cache didn't have piece at offset"
262 );
263 }
264 Err(error) => {
265 debug!(
266 %piece_index,
267 %piece_cache_id,
268 %piece_cache_offset,
269 %error,
270 "Retrieving piece from cache failed"
271 );
272 }
273 }
274 } else {
275 trace!(%piece_index, "Piece not found in cache");
276 }
277
278 Ok(self
279 .nats_client
280 .request(&ClusterControllerPieceRequest { piece_index }, None)
281 .await?)
282 }
283
284 async fn get_pieces<'a>(
285 &'a self,
286 piece_indices: Vec<PieceIndex>,
287 ) -> anyhow::Result<
288 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
289 > {
290 let (tx, mut rx) = mpsc::unbounded();
291
292 let piece_indices_to_get =
293 Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
294
295 let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
296
297 {
298 let mut cached_pieces = self
299 .nats_client
300 .stream_request(
301 &ClusterControllerFindPiecesInCacheRequest { piece_indices },
302 Some(&self.cache_group),
303 )
304 .await?;
305
306 while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
307 cached_pieces.next().await
308 {
309 cached_pieces_by_cache_id
310 .entry(piece_cache_id)
311 .or_default()
312 .push(piece_cache_offset);
313 }
314 }
315
316 let fut = async move {
317 let tx = &tx;
318
319 cached_pieces_by_cache_id
320 .into_iter()
321 .map(|(piece_cache_id, offsets)| {
322 let piece_indices_to_get = &piece_indices_to_get;
323
324 async move {
325 let mut pieces_stream = match self
326 .nats_client
327 .stream_request(
328 &ClusterCacheReadPiecesRequest { offsets },
329 Some(&piece_cache_id.to_string()),
330 )
331 .await
332 {
333 Ok(pieces) => pieces,
334 Err(error) => {
335 warn!(
336 %error,
337 %piece_cache_id,
338 "Failed to request pieces from cache"
339 );
340
341 return;
342 }
343 };
344
345 while let Some(piece_result) = pieces_stream.next().await {
346 let (piece_offset, maybe_piece) = match piece_result {
347 Ok(result) => result,
348 Err(error) => {
349 warn!(%error, "Failed to get piece from cache");
350 continue;
351 }
352 };
353
354 if let Some((piece_index, piece)) = maybe_piece {
355 piece_indices_to_get.lock().remove(&piece_index);
356
357 tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
358 "This future isn't polled after receiver is dropped; qed",
359 );
360 } else {
361 warn!(
362 %piece_cache_id,
363 %piece_offset,
364 "Failed to get piece from cache, it was missing or already gone"
365 );
366 }
367 }
368 }
369 })
370 .collect::<FuturesUnordered<_>>()
371 .for_each(|()| async {})
373 .await;
374
375 let mut piece_indices_to_get = piece_indices_to_get.into_inner();
376 if piece_indices_to_get.is_empty() {
377 return;
378 }
379
380 let mut pieces_from_controller = match self
381 .nats_client
382 .stream_request(
383 &ClusterControllerPiecesRequest {
384 piece_indices: piece_indices_to_get.iter().copied().collect(),
385 },
386 None,
387 )
388 .await
389 {
390 Ok(pieces_from_controller) => pieces_from_controller,
391 Err(error) => {
392 error!(%error, "Failed to get pieces from controller");
393
394 for piece_index in piece_indices_to_get {
395 tx.unbounded_send((
396 piece_index,
397 Err(anyhow::anyhow!("Failed to get piece from controller")),
398 ))
399 .expect("This future isn't polled after receiver is dropped; qed");
400 }
401 return;
402 }
403 };
404
405 while let Some((piece_index, piece)) = pieces_from_controller.next().await {
406 piece_indices_to_get.remove(&piece_index);
407 tx.unbounded_send((piece_index, Ok(Some(piece))))
408 .expect("This future isn't polled after receiver is dropped; qed");
409 }
410
411 for piece_index in piece_indices_to_get {
412 tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
413 .expect("This future isn't polled after receiver is dropped; qed");
414 }
415 };
416 let mut fut = Box::pin(fut.fuse());
417
418 Ok(Box::new(stream::poll_fn(move |cx| {
420 if !fut.is_terminated() {
421 let _ = fut.poll_unpin(cx);
423 }
424
425 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
426 return Poll::Ready(maybe_result);
427 }
428
429 Poll::Pending
431 })))
432 }
433}
434
435impl ClusterPieceGetter {
436 #[inline]
438 pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
439 Self {
440 nats_client,
441 cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
442 }
443 }
444}
445
446#[derive(Debug, Clone)]
449pub struct ClusterNodeClient {
450 nats_client: NatsClient,
451 last_slot_info_instance: Arc<Mutex<String>>,
454}
455
456impl ClusterNodeClient {
457 pub fn new(nats_client: NatsClient) -> Self {
459 Self {
460 nats_client,
461 last_slot_info_instance: Arc::default(),
462 }
463 }
464}
465
466#[async_trait]
467impl NodeClient for ClusterNodeClient {
468 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
469 Ok(self
470 .nats_client
471 .request(&ClusterControllerFarmerAppInfoRequest, None)
472 .await?
473 .map_err(anyhow::Error::msg)?)
474 }
475
476 async fn subscribe_slot_info(
477 &self,
478 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
479 let subscription = self
480 .nats_client
481 .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
482 .await?
483 .filter_map({
484 let mut last_slot_number = None;
485 let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
486
487 move |broadcast| {
488 let slot_info = broadcast.slot_info;
489
490 let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
491 && last_slot_number >= slot_info.slot_number
492 {
493 None
494 } else {
495 last_slot_number.replace(slot_info.slot_number);
496 *last_slot_info_instance.lock() = broadcast.instance;
497
498 Some(slot_info)
499 };
500
501 async move { maybe_slot_info }
502 }
503 });
504
505 Ok(Box::pin(subscription))
506 }
507
508 async fn submit_solution_response(
509 &self,
510 solution_response: SolutionResponse,
511 ) -> anyhow::Result<()> {
512 let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
513 Ok(self
514 .nats_client
515 .notification(
516 &ClusterControllerSolutionNotification { solution_response },
517 Some(&last_slot_info_instance),
518 )
519 .await?)
520 }
521
522 async fn subscribe_reward_signing(
523 &self,
524 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
525 let subscription = self
526 .nats_client
527 .subscribe_to_broadcasts::<ClusterControllerRewardSigningBroadcast>(None, None)
528 .await?
529 .map(|broadcast| broadcast.reward_signing_info);
530
531 Ok(Box::pin(subscription))
532 }
533
534 async fn submit_reward_signature(
536 &self,
537 reward_signature: RewardSignatureResponse,
538 ) -> anyhow::Result<()> {
539 Ok(self
540 .nats_client
541 .notification(
542 &ClusterControllerRewardSignatureNotification { reward_signature },
543 None,
544 )
545 .await?)
546 }
547
548 async fn subscribe_archived_segment_headers(
549 &self,
550 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
551 let subscription = self
552 .nats_client
553 .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
554 .await?
555 .filter_map({
556 let mut last_archived_segment_index = None;
557
558 move |broadcast| {
559 let archived_segment_header = broadcast.archived_segment_header;
560 let segment_index = archived_segment_header.segment_index();
561
562 let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
563 last_archived_segment_index
564 && last_archived_segment_index >= segment_index
565 {
566 None
567 } else {
568 last_archived_segment_index.replace(segment_index);
569
570 Some(archived_segment_header)
571 };
572
573 async move { maybe_archived_segment_header }
574 }
575 });
576
577 Ok(Box::pin(subscription))
578 }
579
580 async fn segment_headers(
581 &self,
582 segment_indices: Vec<SegmentIndex>,
583 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
584 Ok(self
585 .nats_client
586 .request(
587 &ClusterControllerSegmentHeadersRequest { segment_indices },
588 None,
589 )
590 .await?)
591 }
592
593 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
594 Ok(self
595 .nats_client
596 .request(&ClusterControllerPieceRequest { piece_index }, None)
597 .await?)
598 }
599
600 async fn acknowledge_archived_segment_header(
601 &self,
602 _segment_index: SegmentIndex,
603 ) -> anyhow::Result<()> {
604 Ok(())
606 }
607}
608
609pub async fn controller_service<NC, PG>(
615 nats_client: &NatsClient,
616 node_client: &NC,
617 piece_getter: &PG,
618 farmer_caches: &[(&str, &FarmerCache)],
619 instance: &str,
620 primary_instance: bool,
621) -> anyhow::Result<()>
622where
623 NC: NodeClient,
624 PG: PieceGetter + Sync,
625{
626 if primary_instance {
627 select! {
628 result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
629 result
630 },
631 result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
632 result
633 },
634 result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
635 result
636 },
637 result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
638 result
639 },
640 result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => {
641 result
642 },
643 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
644 result
645 },
646 result = segment_headers_responder(nats_client, node_client).fuse() => {
647 result
648 },
649 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
650 result
651 },
652 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
653 result
654 },
655 result = piece_responder(nats_client, piece_getter).fuse() => {
656 result
657 },
658 result = pieces_responder(nats_client, piece_getter).fuse() => {
659 result
660 },
661 }
662 } else {
663 select! {
664 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
665 result
666 },
667 result = segment_headers_responder(nats_client, node_client).fuse() => {
668 result
669 },
670 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
671 result
672 },
673 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
674 result
675 },
676 result = piece_responder(nats_client, piece_getter).fuse() => {
677 result
678 },
679 result = pieces_responder(nats_client, piece_getter).fuse() => {
680 result
681 },
682 }
683 }
684}
685
686async fn slot_info_broadcaster<NC>(
687 nats_client: &NatsClient,
688 node_client: &NC,
689 instance: &str,
690) -> anyhow::Result<()>
691where
692 NC: NodeClient,
693{
694 let mut slot_info_notifications = node_client
695 .subscribe_slot_info()
696 .await
697 .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
698
699 while let Some(slot_info) = slot_info_notifications.next().await {
700 debug!(?slot_info, "New slot");
701
702 let slot = slot_info.slot_number;
703
704 if let Err(error) = nats_client
705 .broadcast(
706 &ClusterControllerSlotInfoBroadcast {
707 slot_info,
708 instance: instance.to_string(),
709 },
710 instance,
711 )
712 .await
713 {
714 warn!(%slot, %error, "Failed to broadcast slot info");
715 }
716 }
717
718 Ok(())
719}
720
721async fn reward_signing_broadcaster<NC>(
722 nats_client: &NatsClient,
723 node_client: &NC,
724 instance: &str,
725) -> anyhow::Result<()>
726where
727 NC: NodeClient,
728{
729 let mut reward_signing_notifications = node_client
730 .subscribe_reward_signing()
731 .await
732 .map_err(|error| anyhow!("Failed to subscribe to reward signing notifications: {error}"))?;
733
734 while let Some(reward_signing_info) = reward_signing_notifications.next().await {
735 trace!(?reward_signing_info, "New reward signing notification");
736
737 if let Err(error) = nats_client
738 .broadcast(
739 &ClusterControllerRewardSigningBroadcast {
740 reward_signing_info,
741 },
742 instance,
743 )
744 .await
745 {
746 warn!(%error, "Failed to broadcast reward signing info");
747 }
748 }
749
750 Ok(())
751}
752
753async fn archived_segment_headers_broadcaster<NC>(
754 nats_client: &NatsClient,
755 node_client: &NC,
756 instance: &str,
757) -> anyhow::Result<()>
758where
759 NC: NodeClient,
760{
761 let mut archived_segments_notifications = node_client
762 .subscribe_archived_segment_headers()
763 .await
764 .map_err(|error| {
765 anyhow!("Failed to subscribe to archived segment header notifications: {error}")
766 })?;
767
768 while let Some(archived_segment_header) = archived_segments_notifications.next().await {
769 trace!(
770 ?archived_segment_header,
771 "New archived archived segment header notification"
772 );
773
774 node_client
775 .acknowledge_archived_segment_header(archived_segment_header.segment_index())
776 .await
777 .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?;
778
779 if let Err(error) = nats_client
780 .broadcast(
781 &ClusterControllerArchivedSegmentHeaderBroadcast {
782 archived_segment_header,
783 },
784 instance,
785 )
786 .await
787 {
788 warn!(%error, "Failed to broadcast archived segment header info");
789 }
790 }
791
792 Ok(())
793}
794
795async fn solution_response_forwarder<NC>(
796 nats_client: &NatsClient,
797 node_client: &NC,
798 instance: &str,
799) -> anyhow::Result<()>
800where
801 NC: NodeClient,
802{
803 let mut subscription = nats_client
804 .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
805 Some(instance),
806 Some(instance.to_string()),
807 )
808 .await
809 .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
810
811 while let Some(notification) = subscription.next().await {
812 debug!(?notification, "Solution notification");
813
814 let slot = notification.solution_response.slot_number;
815 let public_key = notification.solution_response.solution.public_key;
816 let sector_index = notification.solution_response.solution.sector_index;
817
818 if let Err(error) = node_client
819 .submit_solution_response(notification.solution_response)
820 .await
821 {
822 warn!(
823 %error,
824 %slot,
825 %public_key,
826 %sector_index,
827 "Failed to send solution response"
828 );
829 }
830 }
831
832 Ok(())
833}
834
835async fn reward_signature_forwarder<NC>(
836 nats_client: &NatsClient,
837 node_client: &NC,
838 instance: &str,
839) -> anyhow::Result<()>
840where
841 NC: NodeClient,
842{
843 let mut subscription = nats_client
844 .subscribe_to_notifications::<ClusterControllerRewardSignatureNotification>(
845 None,
846 Some(instance.to_string()),
847 )
848 .await
849 .map_err(|error| {
850 anyhow!("Failed to subscribe to reward signature notifications: {error}")
851 })?;
852
853 while let Some(notification) = subscription.next().await {
854 debug!(?notification, "Reward signature notification");
855
856 if let Err(error) = node_client
857 .submit_reward_signature(notification.reward_signature)
858 .await
859 {
860 warn!(%error, "Failed to send reward signature");
861 }
862 }
863
864 Ok(())
865}
866
867async fn farmer_app_info_responder<NC>(
868 nats_client: &NatsClient,
869 node_client: &NC,
870) -> anyhow::Result<()>
871where
872 NC: NodeClient,
873{
874 nats_client
875 .request_responder(
876 None,
877 Some("subspace.controller".to_string()),
878 |_: ClusterControllerFarmerAppInfoRequest| async move {
879 Some(
880 node_client
881 .farmer_app_info()
882 .await
883 .map_err(|error| error.to_string()),
884 )
885 },
886 )
887 .await
888}
889
890async fn segment_headers_responder<NC>(
891 nats_client: &NatsClient,
892 node_client: &NC,
893) -> anyhow::Result<()>
894where
895 NC: NodeClient,
896{
897 nats_client
898 .request_responder(
899 None,
900 Some("subspace.controller".to_string()),
901 |ClusterControllerSegmentHeadersRequest { segment_indices }| async move {
902 node_client
903 .segment_headers(segment_indices.clone())
904 .await
905 .inspect_err(|error| {
906 warn!(%error, ?segment_indices, "Failed to get segment headers");
907 })
908 .ok()
909 },
910 )
911 .await
912}
913
914async fn find_piece_responder(
915 nats_client: &NatsClient,
916 farmer_caches: &[(&str, &FarmerCache)],
917) -> anyhow::Result<()> {
918 futures::future::try_join(
919 farmer_caches
920 .iter()
921 .map(|(cache_group, farmer_cache)| {
922 nats_client.request_responder(
923 Some(cache_group),
924 Some("subspace.controller".to_string()),
925 move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
926 Some(farmer_cache.find_piece(piece_index).await)
927 },
928 )
929 })
930 .collect::<FuturesUnordered<_>>()
931 .next()
932 .map(|result| result.unwrap_or(Ok(()))),
933 nats_client.request_responder(
934 Some(GLOBAL_CACHE_GROUP),
935 Some("subspace.controller".to_string()),
936 |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
937 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut thread_rng())?;
938 Some(farmer_cache.find_piece(piece_index).await)
939 },
940 ),
941 )
942 .await
943 .map(|((), ())| ())
944}
945
946async fn find_pieces_responder(
947 nats_client: &NatsClient,
948 farmer_caches: &[(&str, &FarmerCache)],
949) -> anyhow::Result<()> {
950 futures::future::try_join(
951 farmer_caches
952 .iter()
953 .map(|(cache_group, farmer_cache)| {
954 nats_client.stream_request_responder(
955 Some(cache_group),
956 Some("subspace.controller".to_string()),
957 move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
958 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
959 },
960 )
961 })
962 .collect::<FuturesUnordered<_>>()
963 .next()
964 .map(|result| result.unwrap_or(Ok(()))),
965 nats_client.stream_request_responder(
966 Some(GLOBAL_CACHE_GROUP),
967 Some("subspace.controller".to_string()),
968 |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
969 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut thread_rng())?;
970 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
971 },
972 ),
973 )
974 .await
975 .map(|((), ())| ())
976}
977
978async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
979where
980 PG: PieceGetter + Sync,
981{
982 nats_client
983 .request_responder(
984 None,
985 Some("subspace.controller".to_string()),
986 |ClusterControllerPieceRequest { piece_index }| async move {
987 piece_getter
988 .get_piece(piece_index)
989 .await
990 .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
991 .ok()
992 },
993 )
994 .await
995}
996
997async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
998where
999 PG: PieceGetter + Sync,
1000{
1001 nats_client
1002 .stream_request_responder(
1003 None,
1004 Some("subspace.controller".to_string()),
1005 |ClusterControllerPiecesRequest { piece_indices }| async move {
1006 piece_getter
1007 .get_pieces(piece_indices)
1008 .await
1009 .map(|stream| {
1010 Box::pin(stream.filter_map(
1011 |(piece_index, maybe_piece_result)| async move {
1012 match maybe_piece_result {
1013 Ok(Some(piece)) => Some((piece_index, piece)),
1014 Ok(None) => None,
1015 Err(error) => {
1016 warn!(%error, %piece_index, "Failed to get piece");
1017 None
1018 }
1019 }
1020 },
1021 ))
1022 })
1023 .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1024 .ok()
1025 },
1026 )
1027 .await
1028}