subspace_farmer/cluster/
farmer.rs

1//! Farming cluster farmer
2//!
3//! Farmer is responsible for maintaining farms, doing audits and generating proofs when solution is
4//! found in one of the plots.
5//!
6//! This module exposes some data structures for NATS communication, custom farm implementation
7//! designed to work with cluster farmer and a service function to drive the backend part
8//! of the farmer.
9
10use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
11use crate::cluster::nats_client::{
12    GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{
15    Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader,
16    PlottedSectors, SectorUpdate,
17};
18use crate::utils::AsyncJoinOnDrop;
19use anyhow::anyhow;
20use async_trait::async_trait;
21use derive_more::{Display, From};
22use event_listener_primitives::Bag;
23use futures::channel::mpsc;
24use futures::stream::FuturesUnordered;
25use futures::{select, stream, FutureExt, Stream, StreamExt};
26use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
27use std::future::Future;
28use std::pin::{pin, Pin};
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use subspace_core_primitives::pieces::{Piece, PieceOffset};
32use subspace_core_primitives::sectors::SectorIndex;
33use subspace_farmer_components::plotting::PlottedSector;
34use subspace_rpc_primitives::SolutionResponse;
35use tokio::time::MissedTickBehavior;
36use tracing::{debug, error, info_span, trace, warn, Instrument};
37use ulid::Ulid;
38
39const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000;
40const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
41
42type Handler<A> = Bag<HandlerFn<A>, A>;
43
44/// An identifier for a cluster farmer, can be used for in logs, thread names, etc.
45#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
46pub struct ClusterFarmerId(Ulid);
47
48impl Encode for ClusterFarmerId {
49    #[inline]
50    fn size_hint(&self) -> usize {
51        Encode::size_hint(&self.0 .0)
52    }
53
54    #[inline]
55    fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
56        Encode::encode_to(&self.0 .0, output)
57    }
58}
59
60impl EncodeLike for ClusterFarmerId {}
61
62impl Decode for ClusterFarmerId {
63    #[inline]
64    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
65        u128::decode(input)
66            .map(|ulid| Self(Ulid(ulid)))
67            .map_err(|e| e.chain("Could not decode `ClusterFarmerId.0.0`"))
68    }
69}
70
71#[allow(clippy::new_without_default)]
72impl ClusterFarmerId {
73    /// Create a new cluster farmer ID
74    pub fn new() -> Self {
75        Self(Ulid::new())
76    }
77}
78
79/// Broadcast with cluster farmer id for identification
80#[derive(Debug, Clone, Encode, Decode)]
81pub struct ClusterFarmerIdentifyBroadcast {
82    /// Cluster farmer ID
83    pub farmer_id: ClusterFarmerId,
84}
85
86impl GenericBroadcast for ClusterFarmerIdentifyBroadcast {
87    /// `*` here stands for cluster farmer ID
88    const SUBJECT: &'static str = "subspace.farmer.*.farmer-identify";
89}
90
91/// Request farm details from farmer
92#[derive(Debug, Clone, Encode, Decode)]
93pub struct ClusterFarmerFarmDetailsRequest;
94
95impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest {
96    /// `*` here stands for cluster farmer ID
97    const SUBJECT: &'static str = "subspace.farmer.*.farm.details";
98    type Response = ClusterFarmerFarmDetails;
99}
100
101/// Farm details
102#[derive(Debug, Clone, Encode, Decode)]
103pub struct ClusterFarmerFarmDetails {
104    /// Farm ID
105    pub farm_id: FarmId,
106    /// Total number of sectors in the farm
107    pub total_sectors_count: SectorIndex,
108}
109
110/// Broadcast with sector updates by farmers
111#[derive(Debug, Clone, Encode, Decode)]
112struct ClusterFarmerSectorUpdateBroadcast {
113    /// Farm ID
114    farm_id: FarmId,
115    /// Sector index
116    sector_index: SectorIndex,
117    /// Sector update
118    sector_update: SectorUpdate,
119}
120
121impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast {
122    /// `*` here stands for single farm ID
123    const SUBJECT: &'static str = "subspace.farmer.*.sector-update";
124}
125
126/// Broadcast with farming notifications by farmers
127#[derive(Debug, Clone, Encode, Decode)]
128struct ClusterFarmerFarmingNotificationBroadcast {
129    /// Farm ID
130    farm_id: FarmId,
131    /// Farming notification
132    farming_notification: FarmingNotification,
133}
134
135impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast {
136    /// `*` here stands for single farm ID
137    const SUBJECT: &'static str = "subspace.farmer.*.farming-notification";
138}
139
140/// Broadcast with solutions by farmers
141#[derive(Debug, Clone, Encode, Decode)]
142struct ClusterFarmerSolutionBroadcast {
143    /// Farm ID
144    farm_id: FarmId,
145    /// Solution response
146    solution_response: SolutionResponse,
147}
148
149impl GenericBroadcast for ClusterFarmerSolutionBroadcast {
150    /// `*` here stands for single farm ID
151    const SUBJECT: &'static str = "subspace.farmer.*.solution-response";
152}
153
154/// Read piece from farm
155#[derive(Debug, Clone, Encode, Decode)]
156struct ClusterFarmerReadPieceRequest {
157    sector_index: SectorIndex,
158    piece_offset: PieceOffset,
159}
160
161impl GenericRequest for ClusterFarmerReadPieceRequest {
162    /// `*` here stands for single farm ID
163    const SUBJECT: &'static str = "subspace.farmer.*.farm.read-piece";
164    type Response = Result<Option<Piece>, String>;
165}
166
167/// Request plotted sectors from farmer
168#[derive(Debug, Clone, Encode, Decode)]
169struct ClusterFarmerPlottedSectorsRequest;
170
171impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest {
172    /// `*` here stands for single farm ID
173    const SUBJECT: &'static str = "subspace.farmer.*.farm.plotted-sectors";
174    type Response = Result<PlottedSector, String>;
175}
176
177#[derive(Debug)]
178struct ClusterPlottedSectors {
179    farm_id_string: String,
180    nats_client: NatsClient,
181}
182
183#[async_trait]
184impl PlottedSectors for ClusterPlottedSectors {
185    async fn get(
186        &self,
187    ) -> Result<
188        Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
189        FarmError,
190    > {
191        Ok(Box::new(
192            self.nats_client
193                .stream_request(
194                    &ClusterFarmerPlottedSectorsRequest,
195                    Some(&self.farm_id_string),
196                )
197                .await?
198                .map(|response| response.map_err(FarmError::from)),
199        ))
200    }
201}
202
203#[derive(Debug)]
204struct ClusterPieceReader {
205    farm_id_string: String,
206    nats_client: NatsClient,
207}
208
209#[async_trait]
210impl PieceReader for ClusterPieceReader {
211    async fn read_piece(
212        &self,
213        sector_index: SectorIndex,
214        piece_offset: PieceOffset,
215    ) -> Result<Option<Piece>, FarmError> {
216        Ok(self
217            .nats_client
218            .request(
219                &ClusterFarmerReadPieceRequest {
220                    sector_index,
221                    piece_offset,
222                },
223                Some(&self.farm_id_string),
224            )
225            .await??)
226    }
227}
228
229#[derive(Default, Debug)]
230struct Handlers {
231    sector_update: Handler<(SectorIndex, SectorUpdate)>,
232    farming_notification: Handler<FarmingNotification>,
233    solution: Handler<SolutionResponse>,
234}
235
236/// Cluster farm implementation
237#[derive(Debug)]
238pub struct ClusterFarm {
239    farm_id: FarmId,
240    farm_id_string: String,
241    total_sectors_count: SectorIndex,
242    nats_client: NatsClient,
243    handlers: Arc<Handlers>,
244    background_tasks: AsyncJoinOnDrop<()>,
245}
246
247#[async_trait(?Send)]
248impl Farm for ClusterFarm {
249    fn id(&self) -> &FarmId {
250        &self.farm_id
251    }
252
253    fn total_sectors_count(&self) -> SectorIndex {
254        self.total_sectors_count
255    }
256
257    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
258        Arc::new(ClusterPlottedSectors {
259            farm_id_string: self.farm_id_string.clone(),
260            nats_client: self.nats_client.clone(),
261        })
262    }
263
264    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
265        Arc::new(ClusterPieceReader {
266            farm_id_string: self.farm_id_string.clone(),
267            nats_client: self.nats_client.clone(),
268        })
269    }
270
271    fn on_sector_update(
272        &self,
273        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
274    ) -> Box<dyn HandlerId> {
275        Box::new(self.handlers.sector_update.add(callback))
276    }
277
278    fn on_farming_notification(
279        &self,
280        callback: HandlerFn<FarmingNotification>,
281    ) -> Box<dyn HandlerId> {
282        Box::new(self.handlers.farming_notification.add(callback))
283    }
284
285    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
286        Box::new(self.handlers.solution.add(callback))
287    }
288
289    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
290        Box::pin((*self).run())
291    }
292}
293
294impl ClusterFarm {
295    /// Create new instance using information from previously received
296    /// [`ClusterFarmerIdentifyBroadcast`]
297    pub async fn new(
298        farm_id: FarmId,
299        total_sectors_count: SectorIndex,
300        nats_client: NatsClient,
301    ) -> anyhow::Result<Self> {
302        let farm_id_string = farm_id.to_string();
303
304        let sector_updates_subscription = nats_client
305            .subscribe_to_broadcasts::<ClusterFarmerSectorUpdateBroadcast>(
306                Some(&farm_id_string),
307                None,
308            )
309            .await
310            .map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))?;
311        let farming_notifications_subscription = nats_client
312            .subscribe_to_broadcasts::<ClusterFarmerFarmingNotificationBroadcast>(
313                Some(&farm_id_string),
314                None,
315            )
316            .await
317            .map_err(|error| {
318                anyhow!("Failed to subscribe to farming notifications broadcast: {error}")
319            })?;
320        let solution_subscription = nats_client
321            .subscribe_to_broadcasts::<ClusterFarmerSolutionBroadcast>(Some(&farm_id_string), None)
322            .await
323            .map_err(|error| {
324                anyhow!("Failed to subscribe to solution responses broadcast: {error}")
325            })?;
326
327        let handlers = Arc::<Handlers>::default();
328        // Run background tasks and fire corresponding notifications
329        let background_tasks = {
330            let handlers = Arc::clone(&handlers);
331
332            async move {
333                let mut sector_updates_subscription = pin!(sector_updates_subscription);
334                let mut farming_notifications_subscription =
335                    pin!(farming_notifications_subscription);
336                let mut solution_subscription = pin!(solution_subscription);
337
338                let sector_updates_fut = async {
339                    while let Some(ClusterFarmerSectorUpdateBroadcast {
340                        sector_index,
341                        sector_update,
342                        ..
343                    }) = sector_updates_subscription.next().await
344                    {
345                        handlers
346                            .sector_update
347                            .call_simple(&(sector_index, sector_update));
348                    }
349                };
350                let farming_notifications_fut = async {
351                    while let Some(ClusterFarmerFarmingNotificationBroadcast {
352                        farming_notification,
353                        ..
354                    }) = farming_notifications_subscription.next().await
355                    {
356                        handlers
357                            .farming_notification
358                            .call_simple(&farming_notification);
359                    }
360                };
361                let solutions_fut = async {
362                    while let Some(ClusterFarmerSolutionBroadcast {
363                        solution_response, ..
364                    }) = solution_subscription.next().await
365                    {
366                        handlers.solution.call_simple(&solution_response);
367                    }
368                };
369
370                select! {
371                    _ = sector_updates_fut.fuse() => {}
372                    _ = farming_notifications_fut.fuse() => {}
373                    _ = solutions_fut.fuse() => {}
374                }
375            }
376        };
377
378        Ok(Self {
379            farm_id,
380            farm_id_string,
381            total_sectors_count,
382            nats_client,
383            handlers,
384            background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true),
385        })
386    }
387
388    /// Run and wait for background tasks to exit or return an error
389    pub async fn run(self) -> anyhow::Result<()> {
390        Ok(self.background_tasks.await?)
391    }
392}
393
394#[derive(Debug)]
395struct FarmDetails {
396    farm_id: FarmId,
397    farm_id_string: String,
398    total_sectors_count: SectorIndex,
399    piece_reader: Arc<dyn PieceReader + 'static>,
400    plotted_sectors: Arc<dyn PlottedSectors + 'static>,
401    _background_tasks: Option<AsyncJoinOnDrop<()>>,
402}
403
404/// Create farmer service for specified farms that will be processing incoming requests and send
405/// periodic identify notifications.
406///
407/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
408/// per controller instance in order to parallelize more work across threads if needed.
409pub fn farmer_service<F>(
410    nats_client: NatsClient,
411    farms: &[F],
412    identification_broadcast_interval: Duration,
413    primary_instance: bool,
414) -> impl Future<Output = anyhow::Result<()>> + Send + 'static
415where
416    F: Farm,
417{
418    let farmer_id = ClusterFarmerId::new();
419    let farmer_id_string = farmer_id.to_string();
420
421    // For each farm start forwarding notifications as broadcast messages and create farm details
422    // that can be used to respond to incoming requests
423    let farms_details = farms
424        .iter()
425        .map(|farm| {
426            let farm_id = *farm.id();
427            let nats_client = nats_client.clone();
428
429            let background_tasks = if primary_instance {
430                let (sector_updates_sender, mut sector_updates_receiver) =
431                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
432                let (farming_notifications_sender, mut farming_notifications_receiver) =
433                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
434                let (solutions_sender, mut solutions_receiver) =
435                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
436
437                let sector_updates_handler_id =
438                    farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| {
439                        if let Err(error) = sector_updates_sender.clone().try_send(
440                            ClusterFarmerSectorUpdateBroadcast {
441                                farm_id,
442                                sector_index: *sector_index,
443                                sector_update: sector_update.clone(),
444                            },
445                        ) {
446                            warn!(%farm_id, %error, "Failed to send sector update notification");
447                        }
448                    }));
449
450                let farming_notifications_handler_id =
451                    farm.on_farming_notification(Arc::new(move |farming_notification| {
452                        if let Err(error) = farming_notifications_sender.clone().try_send(
453                            ClusterFarmerFarmingNotificationBroadcast {
454                                farm_id,
455                                farming_notification: farming_notification.clone(),
456                            },
457                        ) {
458                            warn!(%farm_id, %error, "Failed to send farming notification");
459                        }
460                    }));
461
462                let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| {
463                    if let Err(error) =
464                        solutions_sender
465                            .clone()
466                            .try_send(ClusterFarmerSolutionBroadcast {
467                                farm_id,
468                                solution_response: solution_response.clone(),
469                            })
470                    {
471                        warn!(%farm_id, %error, "Failed to send solution notification");
472                    }
473                }));
474
475                Some(AsyncJoinOnDrop::new(
476                    tokio::spawn(async move {
477                        let farm_id_string = farm_id.to_string();
478
479                        let sector_updates_fut = async {
480                            while let Some(broadcast) = sector_updates_receiver.next().await {
481                                if let Err(error) =
482                                    nats_client.broadcast(&broadcast, &farm_id_string).await
483                                {
484                                    warn!(%farm_id, %error, "Failed to broadcast sector update");
485                                }
486                            }
487                        };
488                        let farming_notifications_fut = async {
489                            while let Some(broadcast) = farming_notifications_receiver.next().await
490                            {
491                                if let Err(error) =
492                                    nats_client.broadcast(&broadcast, &farm_id_string).await
493                                {
494                                    warn!(
495                                        %farm_id,
496                                        %error,
497                                        "Failed to broadcast farming notification"
498                                    );
499                                }
500                            }
501                        };
502                        let solutions_fut = async {
503                            while let Some(broadcast) = solutions_receiver.next().await {
504                                if let Err(error) =
505                                    nats_client.broadcast(&broadcast, &farm_id_string).await
506                                {
507                                    warn!(%farm_id, %error, "Failed to broadcast solution");
508                                }
509                            }
510                        };
511
512                        select! {
513                            _ = sector_updates_fut.fuse() => {}
514                            _ = farming_notifications_fut.fuse() => {}
515                            _ = solutions_fut.fuse() => {}
516                        }
517
518                        drop(sector_updates_handler_id);
519                        drop(farming_notifications_handler_id);
520                        drop(solutions_handler_id);
521                    }),
522                    true,
523                ))
524            } else {
525                None
526            };
527
528            FarmDetails {
529                farm_id,
530                farm_id_string: farm_id.to_string(),
531                total_sectors_count: farm.total_sectors_count(),
532                piece_reader: farm.piece_reader(),
533                plotted_sectors: farm.plotted_sectors(),
534                _background_tasks: background_tasks,
535            }
536        })
537        .collect::<Vec<_>>();
538
539    async move {
540        if primary_instance {
541            select! {
542                result = identify_responder(
543                    &nats_client,
544                    farmer_id,
545                    &farmer_id_string,
546                    identification_broadcast_interval
547                ).fuse() => {
548                    result
549                },
550                result = farms_details_responder(
551                    &nats_client,
552                    &farmer_id_string,
553                    &farms_details
554                ).fuse() => {
555                    result
556                },
557                result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
558                    result
559                },
560                result = read_piece_responder(&nats_client, &farms_details).fuse() => {
561                    result
562                },
563            }
564        } else {
565            select! {
566                result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
567                    result
568                },
569                result = read_piece_responder(&nats_client, &farms_details).fuse() => {
570                    result
571                },
572            }
573        }
574    }
575}
576
577/// Listen for farmer identification broadcast from controller and publish identification
578/// broadcast in response, also send periodic notifications reminding that farm exists
579async fn identify_responder(
580    nats_client: &NatsClient,
581    farmer_id: ClusterFarmerId,
582    farmer_id_string: &str,
583    identification_broadcast_interval: Duration,
584) -> anyhow::Result<()> {
585    let mut subscription = nats_client
586        .subscribe_to_broadcasts::<ClusterControllerFarmerIdentifyBroadcast>(None, None)
587        .await
588        .map_err(|error| {
589            anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}")
590        })?
591        .fuse();
592
593    // Also send periodic updates in addition to the subscription response
594    let mut interval = tokio::time::interval(identification_broadcast_interval);
595    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
596
597    let mut last_identification = Instant::now();
598
599    loop {
600        select! {
601            maybe_message = subscription.next() => {
602                let Some(message) = maybe_message else {
603                    debug!("Identify broadcast stream ended");
604                    break;
605                };
606
607                trace!(?message, "Farmer received identify broadcast message");
608
609                if last_identification.elapsed() < MIN_FARMER_IDENTIFICATION_INTERVAL {
610                    // Skip too frequent identification requests
611                    continue;
612                }
613
614                last_identification = Instant::now();
615                send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
616                interval.reset();
617            }
618            _ = interval.tick().fuse() => {
619                last_identification = Instant::now();
620                trace!("Farmer self-identification");
621
622                send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
623            }
624        }
625    }
626
627    Ok(())
628}
629
630async fn send_identify_broadcast(
631    nats_client: &NatsClient,
632    farmer_id: ClusterFarmerId,
633    farmer_id_string: &str,
634) {
635    if let Err(error) = nats_client
636        .broadcast(&new_identify_message(farmer_id), farmer_id_string)
637        .await
638    {
639        warn!(%farmer_id, %error, "Failed to send farmer identify notification");
640    }
641}
642
643fn new_identify_message(farmer_id: ClusterFarmerId) -> ClusterFarmerIdentifyBroadcast {
644    ClusterFarmerIdentifyBroadcast { farmer_id }
645}
646
647async fn farms_details_responder(
648    nats_client: &NatsClient,
649    farmer_id_string: &str,
650    farms_details: &[FarmDetails],
651) -> anyhow::Result<()> {
652    nats_client
653        .stream_request_responder(
654            Some(farmer_id_string),
655            Some(farmer_id_string.to_string()),
656            |_request: ClusterFarmerFarmDetailsRequest| async {
657                Some(stream::iter(farms_details.iter().map(|farm_details| {
658                    ClusterFarmerFarmDetails {
659                        farm_id: farm_details.farm_id,
660                        total_sectors_count: farm_details.total_sectors_count,
661                    }
662                })))
663            },
664        )
665        .await
666}
667
668async fn plotted_sectors_responder(
669    nats_client: &NatsClient,
670    farms_details: &[FarmDetails],
671) -> anyhow::Result<()> {
672    farms_details
673        .iter()
674        .map(|farm_details| async move {
675            nats_client
676                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
677                    Some(&farm_details.farm_id_string),
678                    Some(farm_details.farm_id_string.clone()),
679                    |_request: ClusterFarmerPlottedSectorsRequest| async move {
680                        Some(match farm_details.plotted_sectors.get().await {
681                            Ok(plotted_sectors) => {
682                                Box::pin(plotted_sectors.map(|maybe_plotted_sector| {
683                                    maybe_plotted_sector.map_err(|error| error.to_string())
684                                })) as _
685                            }
686                            Err(error) => {
687                                error!(
688                                    %error,
689                                    farm_id = %farm_details.farm_id,
690                                    "Failed to get plotted sectors"
691                                );
692
693                                Box::pin(stream::once(async move {
694                                    Err(format!("Failed to get plotted sectors: {error}"))
695                                })) as _
696                            }
697                        })
698                    },
699                )
700                .instrument(info_span!("", cache_id = %farm_details.farm_id))
701                .await
702        })
703        .collect::<FuturesUnordered<_>>()
704        .next()
705        .await
706        .ok_or_else(|| anyhow!("No farms"))?
707}
708
709async fn read_piece_responder(
710    nats_client: &NatsClient,
711    farms_details: &[FarmDetails],
712) -> anyhow::Result<()> {
713    farms_details
714        .iter()
715        .map(|farm_details| async move {
716            nats_client
717                .request_responder(
718                    Some(farm_details.farm_id_string.as_str()),
719                    Some(farm_details.farm_id_string.clone()),
720                    |request: ClusterFarmerReadPieceRequest| async move {
721                        Some(
722                            farm_details
723                                .piece_reader
724                                .read_piece(request.sector_index, request.piece_offset)
725                                .await
726                                .map_err(|error| error.to_string()),
727                        )
728                    },
729                )
730                .instrument(info_span!("", cache_id = %farm_details.farm_id))
731                .await
732        })
733        .collect::<FuturesUnordered<_>>()
734        .next()
735        .await
736        .ok_or_else(|| anyhow!("No farms"))?
737}