subspace_farmer/cluster/controller/
farms.rs

1//! This module exposed implementation of farms maintenance.
2//!
3//! The goal is to observe farms in a cluster and keep controller's data structures
4//! about which pieces are plotted in which sectors of which farm up to date. Implementation
5//! automatically handles dynamic farm addition and removal, etc.
6
7use crate::cluster::controller::stream_map::StreamMap;
8use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
9use crate::cluster::farmer::{
10    ClusterFarm, ClusterFarmerFarmDetails, ClusterFarmerFarmDetailsRequest, ClusterFarmerId,
11    ClusterFarmerIdentifyBroadcast,
12};
13use crate::cluster::nats_client::NatsClient;
14use crate::farm::plotted_pieces::PlottedPieces;
15use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
16use anyhow::anyhow;
17use async_lock::RwLock as AsyncRwLock;
18use futures::stream::FuturesUnordered;
19use futures::{select, FutureExt, StreamExt};
20use parking_lot::Mutex;
21use std::collections::{HashMap, HashSet};
22use std::mem;
23use std::pin::pin;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use subspace_core_primitives::sectors::SectorIndex;
27use tokio::sync::broadcast;
28use tokio::task;
29use tokio::time::MissedTickBehavior;
30use tracing::{debug, error, info, trace, warn};
31
32/// Number of farms in a cluster is currently limited to 2^16
33pub type FarmIndex = u16;
34
35enum FarmAddRemoveResult {
36    Add {
37        close_receiver: broadcast::Receiver<()>,
38        farm: ClusterFarm,
39    },
40    Remove {
41        farm_index: FarmIndex,
42    },
43}
44
45struct FarmerAddResult<I> {
46    close_receiver: broadcast::Receiver<()>,
47    added_farms: I,
48}
49
50#[derive(Debug)]
51struct KnownFarmer {
52    farmer_id: ClusterFarmerId,
53    last_identification: Instant,
54    known_farms: HashMap<FarmIndex, FarmId>,
55    close_sender: Option<broadcast::Sender<()>>,
56}
57
58impl KnownFarmer {
59    fn notify_cleanup(&mut self) -> bool {
60        let Some(close_sender) = self.close_sender.take() else {
61            return false;
62        };
63        let _ = close_sender.send(());
64        true
65    }
66}
67
68#[derive(Debug)]
69struct KnownFarmers {
70    identification_broadcast_interval: Duration,
71    known_farmers: Vec<KnownFarmer>,
72}
73
74impl KnownFarmers {
75    fn new(identification_broadcast_interval: Duration) -> Self {
76        Self {
77            identification_broadcast_interval,
78            known_farmers: Vec::new(),
79        }
80    }
81
82    /// Return `false` if the farmer is unknown and initialization is required
83    fn refresh(&mut self, farmer_id: ClusterFarmerId) -> bool {
84        self.known_farmers.iter_mut().any(|known_farmer| {
85            if known_farmer.farmer_id == farmer_id {
86                trace!(%farmer_id, "Updating last identification for farmer");
87                known_farmer.last_identification = Instant::now();
88                true
89            } else {
90                false
91            }
92        })
93    }
94
95    fn add(
96        &mut self,
97        farmer_id: ClusterFarmerId,
98        farms: Vec<ClusterFarmerFarmDetails>,
99    ) -> FarmerAddResult<impl Iterator<Item = (FarmIndex, ClusterFarmerFarmDetails)>> {
100        let farm_indices = self.pick_farm_indices(farms.len());
101        let farm_ids_to_add = farms
102            .iter()
103            .map(|farm_details| farm_details.farm_id)
104            .collect::<HashSet<FarmId>>();
105
106        if let Some(old_farmer) = self.known_farmers.iter_mut().find(|known_farmer| {
107            known_farmer
108                .known_farms
109                .values()
110                .any(|farm_id| farm_ids_to_add.contains(farm_id))
111        }) {
112            warn!(old_farmer_id = %old_farmer.farmer_id, "Some farms are already known, notify for cleanup them first");
113            old_farmer.notify_cleanup();
114        }
115
116        let (close_sender, close_receiver) = broadcast::channel(1);
117        self.known_farmers.push(KnownFarmer {
118            farmer_id,
119            last_identification: Instant::now(),
120            known_farms: farm_indices
121                .iter()
122                .copied()
123                .zip(farms.iter().map(|farm_details| farm_details.farm_id))
124                .collect(),
125            close_sender: Some(close_sender),
126        });
127
128        FarmerAddResult {
129            close_receiver,
130            added_farms: farm_indices.into_iter().zip(farms),
131        }
132    }
133
134    fn pick_farm_indices(&self, len: usize) -> Vec<u16> {
135        let used_indices = self
136            .known_farmers
137            .iter()
138            .flat_map(|known_farmer| known_farmer.known_farms.keys())
139            .collect::<HashSet<_>>();
140
141        let mut available_indices = Vec::with_capacity(len);
142
143        for farm_index in FarmIndex::MIN..=FarmIndex::MAX {
144            if !used_indices.contains(&farm_index) {
145                if available_indices.len() < len {
146                    available_indices.push(farm_index);
147                } else {
148                    return available_indices;
149                }
150            }
151        }
152
153        warn!(max_supported_farm_index = %FarmIndex::MAX, "Too many farms");
154        available_indices
155    }
156
157    fn remove_expired(&mut self) -> impl Iterator<Item = (ClusterFarmerId, &FarmIndex, &FarmId)> {
158        self.known_farmers
159            .iter_mut()
160            .filter_map(|known_farmer| {
161                if known_farmer.last_identification.elapsed()
162                    > self.identification_broadcast_interval * 2
163                    && known_farmer.notify_cleanup()
164                {
165                    Some(
166                        known_farmer
167                            .known_farms
168                            .iter()
169                            .map(|(farm_index, farm_id)| {
170                                (known_farmer.farmer_id, farm_index, farm_id)
171                            }),
172                    )
173                } else {
174                    None
175                }
176            })
177            .flatten()
178    }
179
180    fn remove_farm(&mut self, farm_index: FarmIndex) {
181        self.known_farmers.retain_mut(|known_farmer| {
182            if known_farmer.known_farms.contains_key(&farm_index) {
183                known_farmer.known_farms.remove(&farm_index);
184                !known_farmer.known_farms.is_empty()
185            } else {
186                true
187            }
188        });
189    }
190}
191
192/// Utility function for maintaining farms by controller in a cluster environment
193pub async fn maintain_farms(
194    instance: &str,
195    nats_client: &NatsClient,
196    plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
197    identification_broadcast_interval: Duration,
198) -> anyhow::Result<()> {
199    let mut known_farmers = KnownFarmers::new(identification_broadcast_interval);
200
201    let mut farmers_to_add = StreamMap::default();
202    // Stream map for adding/removing farms
203    let mut farms_to_add_remove = StreamMap::default();
204    let mut farms = FuturesUnordered::new();
205
206    let farmer_identify_subscription = pin!(nats_client
207        .subscribe_to_broadcasts::<ClusterFarmerIdentifyBroadcast>(None, None)
208        .await
209        .map_err(|error| anyhow!("Failed to subscribe to farmer identify broadcast: {error}"))?);
210
211    // Request farmer to identify themselves
212    if let Err(error) = nats_client
213        .broadcast(&ClusterControllerFarmerIdentifyBroadcast, instance)
214        .await
215    {
216        warn!(%error, "Failed to send farmer identification broadcast");
217    }
218
219    let mut farmer_identify_subscription = farmer_identify_subscription.fuse();
220    let mut farm_pruning_interval = tokio::time::interval_at(
221        (Instant::now() + identification_broadcast_interval * 2).into(),
222        identification_broadcast_interval * 2,
223    );
224    farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
225
226    loop {
227        select! {
228            (farm_index, result) = farms.select_next_some() => {
229                farms_to_add_remove.push(farm_index, Box::pin(async move {
230                    let plotted_pieces = Arc::clone(plotted_pieces);
231
232                    let delete_farm_fut = task::spawn_blocking(move || {
233                        plotted_pieces.write_blocking().delete_farm(farm_index);
234                    });
235                    if let Err(error) = delete_farm_fut.await {
236                        error!(
237                            %farm_index,
238                            %error,
239                            "Failed to delete farm that exited"
240                        );
241                    }
242
243                    FarmAddRemoveResult::Remove { farm_index }
244                }));
245
246                match result {
247                    Ok(()) => {
248                        info!(%farm_index, "Farm exited successfully");
249                    }
250                    Err(error) => {
251                        error!(%farm_index, %error, "Farm exited with error");
252                    }
253                }
254            }
255            maybe_identify_message = farmer_identify_subscription.next() => {
256                let Some(identify_message) = maybe_identify_message else {
257                    return Err(anyhow!("Farmer identify stream ended"));
258                };
259                let ClusterFarmerIdentifyBroadcast {
260                    farmer_id,
261                } = identify_message;
262
263                if known_farmers.refresh(farmer_id) {
264                    trace!(
265                        %farmer_id,
266                        "Received identification for already known farmer"
267                    );
268                } else if farmers_to_add.add_if_not_in_progress(farmer_id, Box::pin(collect_farmer_farms(farmer_id, nats_client))) {
269                    debug!(
270                        %farmer_id,
271                        "Received identification for new farmer, collecting farms"
272                    );
273                } else {
274                    debug!(
275                        %farmer_id,
276                        "Received identification for new farmer, which is already in progress"
277                    );
278                }
279            }
280            (farmer_id, maybe_farms) = farmers_to_add.select_next_some() => {
281                let Ok(farms) = maybe_farms.inspect_err(|error| {
282                    warn!(
283                        %farmer_id,
284                        %error,
285                        "Failed to collect farms to add, may retry later"
286                    );
287                }) else {
288                    continue;
289                };
290
291                let farm_add_result = known_farmers.add(farmer_id, farms);
292                let FarmerAddResult {
293                    close_receiver,
294                    added_farms,
295                } = farm_add_result;
296                for (farm_index, farm_details) in added_farms {
297                    let ClusterFarmerFarmDetails {
298                        farm_id,
299                        total_sectors_count,
300                    } = farm_details;
301
302                    let plotted_pieces = Arc::clone(plotted_pieces);
303                    let close_receiver = close_receiver.resubscribe();
304                    farms_to_add_remove.push(
305                        farm_index,
306                        Box::pin(async move {
307                            match initialize_farm(
308                                farm_index,
309                                farm_id,
310                                total_sectors_count,
311                                plotted_pieces,
312                                nats_client,
313                            )
314                            .await
315                            {
316                                Ok(farm) => {
317                                    info!(
318                                        %farmer_id,
319                                        %farm_index,
320                                        %farm_id,
321                                        "Farm initialized successfully"
322                                    );
323
324                                    FarmAddRemoveResult::Add {
325                                        close_receiver,
326                                        farm,
327                                    }
328                                }
329                                Err(error) => {
330                                    warn!(
331                                        %farmer_id,
332                                        %farm_index,
333                                        %farm_id,
334                                        %error,
335                                        "Failed to initialize farm"
336                                    );
337                                    // We should remove the farm if it failed to initialize
338                                    FarmAddRemoveResult::Remove { farm_index }
339                                }
340                            }
341                        }),
342                    );
343                }
344            }
345            _ = farm_pruning_interval.tick().fuse() => {
346                for (farmer_id, farm_index, farm_id) in known_farmers.remove_expired() {
347                    warn!(
348                        %farmer_id,
349                        %farm_index,
350                        %farm_id,
351                        "Farm expired, notify for cleanup"
352                    );
353                }
354            }
355            (farm_index, result) = farms_to_add_remove.select_next_some() => {
356                match result {
357                    FarmAddRemoveResult::Add {
358                        mut close_receiver,
359                        farm,
360                    } => {
361                        farms.push(async move {
362                            select! {
363                                result = farm.run().fuse() => {
364                                    (farm_index, result)
365                                }
366                                _ = close_receiver.recv().fuse() => {
367                                    // Nothing to do
368                                    (farm_index, Ok(()))
369                                }
370                            }
371                        });
372                    }
373                    FarmAddRemoveResult::Remove { farm_index } => {
374                        known_farmers.remove_farm(farm_index);
375                    }
376                }
377            }
378        }
379    }
380}
381
382/// Collect `ClusterFarmerFarmDetails` from the farmer by sending a stream request
383async fn collect_farmer_farms(
384    farmer_id: ClusterFarmerId,
385    nats_client: &NatsClient,
386) -> anyhow::Result<Vec<ClusterFarmerFarmDetails>> {
387    trace!(%farmer_id, "Requesting farmer farm details");
388    Ok(nats_client
389        .stream_request(
390            &ClusterFarmerFarmDetailsRequest,
391            Some(&farmer_id.to_string()),
392        )
393        .await
394        .inspect_err(|error| {
395            warn!(
396                %error,
397                %farmer_id,
398                "Failed to request farmer farm details"
399            )
400        })?
401        .collect()
402        .await)
403}
404
405async fn initialize_farm(
406    farm_index: FarmIndex,
407    farm_id: FarmId,
408    total_sectors_count: SectorIndex,
409    plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
410    nats_client: &NatsClient,
411) -> anyhow::Result<ClusterFarm> {
412    let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
413        .await
414        .map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;
415
416    // Buffer sectors that are plotted while already plotted sectors are being iterated over
417    let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
418    let sector_update_handler = farm.on_sector_update(Arc::new({
419        let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer);
420
421        move |(_sector_index, sector_update)| {
422            if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
423                plotted_sector,
424                old_plotted_sector,
425                ..
426            }) = sector_update
427            {
428                plotted_sectors_buffer
429                    .lock()
430                    .push((plotted_sector.clone(), old_plotted_sector.clone()));
431            }
432        }
433    }));
434
435    // Add plotted sectors of the farm to global plotted pieces
436    let plotted_sectors = farm.plotted_sectors();
437    let mut plotted_sectors = plotted_sectors
438        .get()
439        .await
440        .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
441
442    {
443        plotted_pieces
444            .write()
445            .await
446            .add_farm(farm_index, farm.piece_reader());
447
448        while let Some(plotted_sector_result) = plotted_sectors.next().await {
449            let plotted_sector = plotted_sector_result.map_err(|error| {
450                anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
451            })?;
452
453            let mut plotted_pieces_guard = plotted_pieces.write().await;
454            plotted_pieces_guard.add_sector(farm_index, &plotted_sector);
455
456            // Drop the guard immediately to make sure other tasks are able to access the plotted pieces
457            drop(plotted_pieces_guard);
458
459            task::yield_now().await;
460        }
461    }
462
463    // Add sectors that were plotted while above iteration was happening to plotted sectors
464    // too
465    drop(sector_update_handler);
466    let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
467    let add_buffered_sectors_fut = task::spawn_blocking(move || {
468        let mut plotted_pieces = plotted_pieces.write_blocking();
469
470        for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
471            if let Some(old_plotted_sector) = old_plotted_sector {
472                plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
473            }
474            // Call delete first to avoid adding duplicates
475            plotted_pieces.delete_sector(farm_index, &plotted_sector);
476            plotted_pieces.add_sector(farm_index, &plotted_sector);
477        }
478    });
479
480    add_buffered_sectors_fut
481        .await
482        .map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;
483
484    Ok(farm)
485}