1use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
11use crate::cluster::nats_client::{
12 GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{
15 Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader,
16 PlottedSectors, SectorUpdate,
17};
18use crate::utils::AsyncJoinOnDrop;
19use anyhow::anyhow;
20use async_trait::async_trait;
21use derive_more::{Display, From};
22use event_listener_primitives::Bag;
23use futures::channel::mpsc;
24use futures::stream::FuturesUnordered;
25use futures::{select, stream, FutureExt, Stream, StreamExt};
26use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
27use std::future::Future;
28use std::pin::{pin, Pin};
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use subspace_core_primitives::pieces::{Piece, PieceOffset};
32use subspace_core_primitives::sectors::SectorIndex;
33use subspace_farmer_components::plotting::PlottedSector;
34use subspace_rpc_primitives::SolutionResponse;
35use tokio::time::MissedTickBehavior;
36use tracing::{debug, error, info_span, trace, warn, Instrument};
37use ulid::Ulid;
38
39const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000;
40const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
41
42type Handler<A> = Bag<HandlerFn<A>, A>;
43
44#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
46pub struct ClusterFarmerId(Ulid);
47
48impl Encode for ClusterFarmerId {
49 #[inline]
50 fn size_hint(&self) -> usize {
51 Encode::size_hint(&self.0 .0)
52 }
53
54 #[inline]
55 fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
56 Encode::encode_to(&self.0 .0, output)
57 }
58}
59
60impl EncodeLike for ClusterFarmerId {}
61
62impl Decode for ClusterFarmerId {
63 #[inline]
64 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
65 u128::decode(input)
66 .map(|ulid| Self(Ulid(ulid)))
67 .map_err(|e| e.chain("Could not decode `ClusterFarmerId.0.0`"))
68 }
69}
70
71#[allow(clippy::new_without_default)]
72impl ClusterFarmerId {
73 pub fn new() -> Self {
75 Self(Ulid::new())
76 }
77}
78
79#[derive(Debug, Clone, Encode, Decode)]
81pub struct ClusterFarmerIdentifyBroadcast {
82 pub farmer_id: ClusterFarmerId,
84}
85
86impl GenericBroadcast for ClusterFarmerIdentifyBroadcast {
87 const SUBJECT: &'static str = "subspace.farmer.*.farmer-identify";
89}
90
91#[derive(Debug, Clone, Encode, Decode)]
93pub struct ClusterFarmerFarmDetailsRequest;
94
95impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest {
96 const SUBJECT: &'static str = "subspace.farmer.*.farm.details";
98 type Response = ClusterFarmerFarmDetails;
99}
100
101#[derive(Debug, Clone, Encode, Decode)]
103pub struct ClusterFarmerFarmDetails {
104 pub farm_id: FarmId,
106 pub total_sectors_count: SectorIndex,
108}
109
110#[derive(Debug, Clone, Encode, Decode)]
112struct ClusterFarmerSectorUpdateBroadcast {
113 farm_id: FarmId,
115 sector_index: SectorIndex,
117 sector_update: SectorUpdate,
119}
120
121impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast {
122 const SUBJECT: &'static str = "subspace.farmer.*.sector-update";
124}
125
126#[derive(Debug, Clone, Encode, Decode)]
128struct ClusterFarmerFarmingNotificationBroadcast {
129 farm_id: FarmId,
131 farming_notification: FarmingNotification,
133}
134
135impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast {
136 const SUBJECT: &'static str = "subspace.farmer.*.farming-notification";
138}
139
140#[derive(Debug, Clone, Encode, Decode)]
142struct ClusterFarmerSolutionBroadcast {
143 farm_id: FarmId,
145 solution_response: SolutionResponse,
147}
148
149impl GenericBroadcast for ClusterFarmerSolutionBroadcast {
150 const SUBJECT: &'static str = "subspace.farmer.*.solution-response";
152}
153
154#[derive(Debug, Clone, Encode, Decode)]
156struct ClusterFarmerReadPieceRequest {
157 sector_index: SectorIndex,
158 piece_offset: PieceOffset,
159}
160
161impl GenericRequest for ClusterFarmerReadPieceRequest {
162 const SUBJECT: &'static str = "subspace.farmer.*.farm.read-piece";
164 type Response = Result<Option<Piece>, String>;
165}
166
167#[derive(Debug, Clone, Encode, Decode)]
169struct ClusterFarmerPlottedSectorsRequest;
170
171impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest {
172 const SUBJECT: &'static str = "subspace.farmer.*.farm.plotted-sectors";
174 type Response = Result<PlottedSector, String>;
175}
176
177#[derive(Debug)]
178struct ClusterPlottedSectors {
179 farm_id_string: String,
180 nats_client: NatsClient,
181}
182
183#[async_trait]
184impl PlottedSectors for ClusterPlottedSectors {
185 async fn get(
186 &self,
187 ) -> Result<
188 Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
189 FarmError,
190 > {
191 Ok(Box::new(
192 self.nats_client
193 .stream_request(
194 &ClusterFarmerPlottedSectorsRequest,
195 Some(&self.farm_id_string),
196 )
197 .await?
198 .map(|response| response.map_err(FarmError::from)),
199 ))
200 }
201}
202
203#[derive(Debug)]
204struct ClusterPieceReader {
205 farm_id_string: String,
206 nats_client: NatsClient,
207}
208
209#[async_trait]
210impl PieceReader for ClusterPieceReader {
211 async fn read_piece(
212 &self,
213 sector_index: SectorIndex,
214 piece_offset: PieceOffset,
215 ) -> Result<Option<Piece>, FarmError> {
216 Ok(self
217 .nats_client
218 .request(
219 &ClusterFarmerReadPieceRequest {
220 sector_index,
221 piece_offset,
222 },
223 Some(&self.farm_id_string),
224 )
225 .await??)
226 }
227}
228
229#[derive(Default, Debug)]
230struct Handlers {
231 sector_update: Handler<(SectorIndex, SectorUpdate)>,
232 farming_notification: Handler<FarmingNotification>,
233 solution: Handler<SolutionResponse>,
234}
235
236#[derive(Debug)]
238pub struct ClusterFarm {
239 farm_id: FarmId,
240 farm_id_string: String,
241 total_sectors_count: SectorIndex,
242 nats_client: NatsClient,
243 handlers: Arc<Handlers>,
244 background_tasks: AsyncJoinOnDrop<()>,
245}
246
247#[async_trait(?Send)]
248impl Farm for ClusterFarm {
249 fn id(&self) -> &FarmId {
250 &self.farm_id
251 }
252
253 fn total_sectors_count(&self) -> SectorIndex {
254 self.total_sectors_count
255 }
256
257 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
258 Arc::new(ClusterPlottedSectors {
259 farm_id_string: self.farm_id_string.clone(),
260 nats_client: self.nats_client.clone(),
261 })
262 }
263
264 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
265 Arc::new(ClusterPieceReader {
266 farm_id_string: self.farm_id_string.clone(),
267 nats_client: self.nats_client.clone(),
268 })
269 }
270
271 fn on_sector_update(
272 &self,
273 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
274 ) -> Box<dyn HandlerId> {
275 Box::new(self.handlers.sector_update.add(callback))
276 }
277
278 fn on_farming_notification(
279 &self,
280 callback: HandlerFn<FarmingNotification>,
281 ) -> Box<dyn HandlerId> {
282 Box::new(self.handlers.farming_notification.add(callback))
283 }
284
285 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
286 Box::new(self.handlers.solution.add(callback))
287 }
288
289 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
290 Box::pin((*self).run())
291 }
292}
293
294impl ClusterFarm {
295 pub async fn new(
298 farm_id: FarmId,
299 total_sectors_count: SectorIndex,
300 nats_client: NatsClient,
301 ) -> anyhow::Result<Self> {
302 let farm_id_string = farm_id.to_string();
303
304 let sector_updates_subscription = nats_client
305 .subscribe_to_broadcasts::<ClusterFarmerSectorUpdateBroadcast>(
306 Some(&farm_id_string),
307 None,
308 )
309 .await
310 .map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))?;
311 let farming_notifications_subscription = nats_client
312 .subscribe_to_broadcasts::<ClusterFarmerFarmingNotificationBroadcast>(
313 Some(&farm_id_string),
314 None,
315 )
316 .await
317 .map_err(|error| {
318 anyhow!("Failed to subscribe to farming notifications broadcast: {error}")
319 })?;
320 let solution_subscription = nats_client
321 .subscribe_to_broadcasts::<ClusterFarmerSolutionBroadcast>(Some(&farm_id_string), None)
322 .await
323 .map_err(|error| {
324 anyhow!("Failed to subscribe to solution responses broadcast: {error}")
325 })?;
326
327 let handlers = Arc::<Handlers>::default();
328 let background_tasks = {
330 let handlers = Arc::clone(&handlers);
331
332 async move {
333 let mut sector_updates_subscription = pin!(sector_updates_subscription);
334 let mut farming_notifications_subscription =
335 pin!(farming_notifications_subscription);
336 let mut solution_subscription = pin!(solution_subscription);
337
338 let sector_updates_fut = async {
339 while let Some(ClusterFarmerSectorUpdateBroadcast {
340 sector_index,
341 sector_update,
342 ..
343 }) = sector_updates_subscription.next().await
344 {
345 handlers
346 .sector_update
347 .call_simple(&(sector_index, sector_update));
348 }
349 };
350 let farming_notifications_fut = async {
351 while let Some(ClusterFarmerFarmingNotificationBroadcast {
352 farming_notification,
353 ..
354 }) = farming_notifications_subscription.next().await
355 {
356 handlers
357 .farming_notification
358 .call_simple(&farming_notification);
359 }
360 };
361 let solutions_fut = async {
362 while let Some(ClusterFarmerSolutionBroadcast {
363 solution_response, ..
364 }) = solution_subscription.next().await
365 {
366 handlers.solution.call_simple(&solution_response);
367 }
368 };
369
370 select! {
371 _ = sector_updates_fut.fuse() => {}
372 _ = farming_notifications_fut.fuse() => {}
373 _ = solutions_fut.fuse() => {}
374 }
375 }
376 };
377
378 Ok(Self {
379 farm_id,
380 farm_id_string,
381 total_sectors_count,
382 nats_client,
383 handlers,
384 background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true),
385 })
386 }
387
388 pub async fn run(self) -> anyhow::Result<()> {
390 Ok(self.background_tasks.await?)
391 }
392}
393
394#[derive(Debug)]
395struct FarmDetails {
396 farm_id: FarmId,
397 farm_id_string: String,
398 total_sectors_count: SectorIndex,
399 piece_reader: Arc<dyn PieceReader + 'static>,
400 plotted_sectors: Arc<dyn PlottedSectors + 'static>,
401 _background_tasks: Option<AsyncJoinOnDrop<()>>,
402}
403
404pub fn farmer_service<F>(
410 nats_client: NatsClient,
411 farms: &[F],
412 identification_broadcast_interval: Duration,
413 primary_instance: bool,
414) -> impl Future<Output = anyhow::Result<()>> + Send + 'static
415where
416 F: Farm,
417{
418 let farmer_id = ClusterFarmerId::new();
419 let farmer_id_string = farmer_id.to_string();
420
421 let farms_details = farms
424 .iter()
425 .map(|farm| {
426 let farm_id = *farm.id();
427 let nats_client = nats_client.clone();
428
429 let background_tasks = if primary_instance {
430 let (sector_updates_sender, mut sector_updates_receiver) =
431 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
432 let (farming_notifications_sender, mut farming_notifications_receiver) =
433 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
434 let (solutions_sender, mut solutions_receiver) =
435 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
436
437 let sector_updates_handler_id =
438 farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| {
439 if let Err(error) = sector_updates_sender.clone().try_send(
440 ClusterFarmerSectorUpdateBroadcast {
441 farm_id,
442 sector_index: *sector_index,
443 sector_update: sector_update.clone(),
444 },
445 ) {
446 warn!(%farm_id, %error, "Failed to send sector update notification");
447 }
448 }));
449
450 let farming_notifications_handler_id =
451 farm.on_farming_notification(Arc::new(move |farming_notification| {
452 if let Err(error) = farming_notifications_sender.clone().try_send(
453 ClusterFarmerFarmingNotificationBroadcast {
454 farm_id,
455 farming_notification: farming_notification.clone(),
456 },
457 ) {
458 warn!(%farm_id, %error, "Failed to send farming notification");
459 }
460 }));
461
462 let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| {
463 if let Err(error) =
464 solutions_sender
465 .clone()
466 .try_send(ClusterFarmerSolutionBroadcast {
467 farm_id,
468 solution_response: solution_response.clone(),
469 })
470 {
471 warn!(%farm_id, %error, "Failed to send solution notification");
472 }
473 }));
474
475 Some(AsyncJoinOnDrop::new(
476 tokio::spawn(async move {
477 let farm_id_string = farm_id.to_string();
478
479 let sector_updates_fut = async {
480 while let Some(broadcast) = sector_updates_receiver.next().await {
481 if let Err(error) =
482 nats_client.broadcast(&broadcast, &farm_id_string).await
483 {
484 warn!(%farm_id, %error, "Failed to broadcast sector update");
485 }
486 }
487 };
488 let farming_notifications_fut = async {
489 while let Some(broadcast) = farming_notifications_receiver.next().await
490 {
491 if let Err(error) =
492 nats_client.broadcast(&broadcast, &farm_id_string).await
493 {
494 warn!(
495 %farm_id,
496 %error,
497 "Failed to broadcast farming notification"
498 );
499 }
500 }
501 };
502 let solutions_fut = async {
503 while let Some(broadcast) = solutions_receiver.next().await {
504 if let Err(error) =
505 nats_client.broadcast(&broadcast, &farm_id_string).await
506 {
507 warn!(%farm_id, %error, "Failed to broadcast solution");
508 }
509 }
510 };
511
512 select! {
513 _ = sector_updates_fut.fuse() => {}
514 _ = farming_notifications_fut.fuse() => {}
515 _ = solutions_fut.fuse() => {}
516 }
517
518 drop(sector_updates_handler_id);
519 drop(farming_notifications_handler_id);
520 drop(solutions_handler_id);
521 }),
522 true,
523 ))
524 } else {
525 None
526 };
527
528 FarmDetails {
529 farm_id,
530 farm_id_string: farm_id.to_string(),
531 total_sectors_count: farm.total_sectors_count(),
532 piece_reader: farm.piece_reader(),
533 plotted_sectors: farm.plotted_sectors(),
534 _background_tasks: background_tasks,
535 }
536 })
537 .collect::<Vec<_>>();
538
539 async move {
540 if primary_instance {
541 select! {
542 result = identify_responder(
543 &nats_client,
544 farmer_id,
545 &farmer_id_string,
546 identification_broadcast_interval
547 ).fuse() => {
548 result
549 },
550 result = farms_details_responder(
551 &nats_client,
552 &farmer_id_string,
553 &farms_details
554 ).fuse() => {
555 result
556 },
557 result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
558 result
559 },
560 result = read_piece_responder(&nats_client, &farms_details).fuse() => {
561 result
562 },
563 }
564 } else {
565 select! {
566 result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
567 result
568 },
569 result = read_piece_responder(&nats_client, &farms_details).fuse() => {
570 result
571 },
572 }
573 }
574 }
575}
576
577async fn identify_responder(
580 nats_client: &NatsClient,
581 farmer_id: ClusterFarmerId,
582 farmer_id_string: &str,
583 identification_broadcast_interval: Duration,
584) -> anyhow::Result<()> {
585 let mut subscription = nats_client
586 .subscribe_to_broadcasts::<ClusterControllerFarmerIdentifyBroadcast>(None, None)
587 .await
588 .map_err(|error| {
589 anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}")
590 })?
591 .fuse();
592
593 let mut interval = tokio::time::interval(identification_broadcast_interval);
595 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
596
597 let mut last_identification = Instant::now();
598
599 loop {
600 select! {
601 maybe_message = subscription.next() => {
602 let Some(message) = maybe_message else {
603 debug!("Identify broadcast stream ended");
604 break;
605 };
606
607 trace!(?message, "Farmer received identify broadcast message");
608
609 if last_identification.elapsed() < MIN_FARMER_IDENTIFICATION_INTERVAL {
610 continue;
612 }
613
614 last_identification = Instant::now();
615 send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
616 interval.reset();
617 }
618 _ = interval.tick().fuse() => {
619 last_identification = Instant::now();
620 trace!("Farmer self-identification");
621
622 send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
623 }
624 }
625 }
626
627 Ok(())
628}
629
630async fn send_identify_broadcast(
631 nats_client: &NatsClient,
632 farmer_id: ClusterFarmerId,
633 farmer_id_string: &str,
634) {
635 if let Err(error) = nats_client
636 .broadcast(&new_identify_message(farmer_id), farmer_id_string)
637 .await
638 {
639 warn!(%farmer_id, %error, "Failed to send farmer identify notification");
640 }
641}
642
643fn new_identify_message(farmer_id: ClusterFarmerId) -> ClusterFarmerIdentifyBroadcast {
644 ClusterFarmerIdentifyBroadcast { farmer_id }
645}
646
647async fn farms_details_responder(
648 nats_client: &NatsClient,
649 farmer_id_string: &str,
650 farms_details: &[FarmDetails],
651) -> anyhow::Result<()> {
652 nats_client
653 .stream_request_responder(
654 Some(farmer_id_string),
655 Some(farmer_id_string.to_string()),
656 |_request: ClusterFarmerFarmDetailsRequest| async {
657 Some(stream::iter(farms_details.iter().map(|farm_details| {
658 ClusterFarmerFarmDetails {
659 farm_id: farm_details.farm_id,
660 total_sectors_count: farm_details.total_sectors_count,
661 }
662 })))
663 },
664 )
665 .await
666}
667
668async fn plotted_sectors_responder(
669 nats_client: &NatsClient,
670 farms_details: &[FarmDetails],
671) -> anyhow::Result<()> {
672 farms_details
673 .iter()
674 .map(|farm_details| async move {
675 nats_client
676 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
677 Some(&farm_details.farm_id_string),
678 Some(farm_details.farm_id_string.clone()),
679 |_request: ClusterFarmerPlottedSectorsRequest| async move {
680 Some(match farm_details.plotted_sectors.get().await {
681 Ok(plotted_sectors) => {
682 Box::pin(plotted_sectors.map(|maybe_plotted_sector| {
683 maybe_plotted_sector.map_err(|error| error.to_string())
684 })) as _
685 }
686 Err(error) => {
687 error!(
688 %error,
689 farm_id = %farm_details.farm_id,
690 "Failed to get plotted sectors"
691 );
692
693 Box::pin(stream::once(async move {
694 Err(format!("Failed to get plotted sectors: {error}"))
695 })) as _
696 }
697 })
698 },
699 )
700 .instrument(info_span!("", cache_id = %farm_details.farm_id))
701 .await
702 })
703 .collect::<FuturesUnordered<_>>()
704 .next()
705 .await
706 .ok_or_else(|| anyhow!("No farms"))?
707}
708
709async fn read_piece_responder(
710 nats_client: &NatsClient,
711 farms_details: &[FarmDetails],
712) -> anyhow::Result<()> {
713 farms_details
714 .iter()
715 .map(|farm_details| async move {
716 nats_client
717 .request_responder(
718 Some(farm_details.farm_id_string.as_str()),
719 Some(farm_details.farm_id_string.clone()),
720 |request: ClusterFarmerReadPieceRequest| async move {
721 Some(
722 farm_details
723 .piece_reader
724 .read_piece(request.sector_index, request.piece_offset)
725 .await
726 .map_err(|error| error.to_string()),
727 )
728 },
729 )
730 .instrument(info_span!("", cache_id = %farm_details.farm_id))
731 .await
732 })
733 .collect::<FuturesUnordered<_>>()
734 .next()
735 .await
736 .ok_or_else(|| anyhow!("No farms"))?
737}