subspace_farmer/cluster/controller/
farms.rs

1//! This module exposed implementation of farms maintenance.
2//!
3//! The goal is to observe farms in a cluster and keep controller's data structures
4//! about which pieces are plotted in which sectors of which farm up to date. Implementation
5//! automatically handles dynamic farm addition and removal, etc.
6
7use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
8use crate::cluster::controller::stream_map::StreamMap;
9use crate::cluster::farmer::{
10    ClusterFarm, ClusterFarmerFarmDetails, ClusterFarmerFarmDetailsRequest, ClusterFarmerId,
11    ClusterFarmerIdentifyBroadcast,
12};
13use crate::cluster::nats_client::NatsClient;
14use crate::farm::plotted_pieces::PlottedPieces;
15use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
16use anyhow::anyhow;
17use async_lock::RwLock as AsyncRwLock;
18use futures::stream::FuturesUnordered;
19use futures::{FutureExt, StreamExt, select};
20use parking_lot::Mutex;
21use std::collections::{HashMap, HashSet};
22use std::mem;
23use std::pin::pin;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use subspace_core_primitives::sectors::SectorIndex;
27use tokio::sync::broadcast;
28use tokio::task;
29use tokio::time::MissedTickBehavior;
30use tracing::{debug, error, info, trace, warn};
31
32/// Number of farms in a cluster is currently limited to 2^16
33pub type FarmIndex = u16;
34
35enum FarmAddRemoveResult {
36    Add {
37        close_receiver: broadcast::Receiver<()>,
38        farm: ClusterFarm,
39    },
40    Remove {
41        farm_index: FarmIndex,
42    },
43}
44
45struct FarmerAddResult<I> {
46    close_receiver: broadcast::Receiver<()>,
47    added_farms: I,
48}
49
50#[derive(Debug)]
51struct KnownFarmer {
52    farmer_id: ClusterFarmerId,
53    last_identification: Instant,
54    known_farms: HashMap<FarmIndex, FarmId>,
55    close_sender: Option<broadcast::Sender<()>>,
56}
57
58impl KnownFarmer {
59    fn notify_cleanup(&mut self) -> bool {
60        let Some(close_sender) = self.close_sender.take() else {
61            return false;
62        };
63        let _ = close_sender.send(());
64        true
65    }
66}
67
68#[derive(Debug)]
69struct KnownFarmers {
70    identification_broadcast_interval: Duration,
71    known_farmers: Vec<KnownFarmer>,
72}
73
74impl KnownFarmers {
75    fn new(identification_broadcast_interval: Duration) -> Self {
76        Self {
77            identification_broadcast_interval,
78            known_farmers: Vec::new(),
79        }
80    }
81
82    /// Return `false` if the farmer is unknown and initialization is required
83    fn refresh(&mut self, farmer_id: ClusterFarmerId) -> bool {
84        self.known_farmers.iter_mut().any(|known_farmer| {
85            if known_farmer.farmer_id == farmer_id {
86                trace!(%farmer_id, "Updating last identification for farmer");
87                known_farmer.last_identification = Instant::now();
88                true
89            } else {
90                false
91            }
92        })
93    }
94
95    fn add(
96        &mut self,
97        farmer_id: ClusterFarmerId,
98        farms: Vec<ClusterFarmerFarmDetails>,
99    ) -> FarmerAddResult<impl Iterator<Item = (FarmIndex, ClusterFarmerFarmDetails)>> {
100        let farm_indices = self.pick_farm_indices(farms.len());
101        let farm_ids_to_add = farms
102            .iter()
103            .map(|farm_details| farm_details.farm_id)
104            .collect::<HashSet<FarmId>>();
105
106        if let Some(old_farmer) = self.known_farmers.iter_mut().find(|known_farmer| {
107            known_farmer
108                .known_farms
109                .values()
110                .any(|farm_id| farm_ids_to_add.contains(farm_id))
111        }) {
112            warn!(old_farmer_id = %old_farmer.farmer_id, "Some farms are already known, notify for cleanup them first");
113            old_farmer.notify_cleanup();
114        }
115
116        let (close_sender, close_receiver) = broadcast::channel(1);
117        self.known_farmers.push(KnownFarmer {
118            farmer_id,
119            last_identification: Instant::now(),
120            known_farms: farm_indices
121                .iter()
122                .copied()
123                .zip(farms.iter().map(|farm_details| farm_details.farm_id))
124                .collect(),
125            close_sender: Some(close_sender),
126        });
127
128        FarmerAddResult {
129            close_receiver,
130            added_farms: farm_indices.into_iter().zip(farms),
131        }
132    }
133
134    fn pick_farm_indices(&self, len: usize) -> Vec<u16> {
135        let used_indices = self
136            .known_farmers
137            .iter()
138            .flat_map(|known_farmer| known_farmer.known_farms.keys())
139            .collect::<HashSet<_>>();
140
141        let mut available_indices = Vec::with_capacity(len);
142
143        for farm_index in FarmIndex::MIN..=FarmIndex::MAX {
144            if !used_indices.contains(&farm_index) {
145                if available_indices.len() < len {
146                    available_indices.push(farm_index);
147                } else {
148                    return available_indices;
149                }
150            }
151        }
152
153        warn!(max_supported_farm_index = %FarmIndex::MAX, "Too many farms");
154        available_indices
155    }
156
157    fn remove_expired(&mut self) -> impl Iterator<Item = (ClusterFarmerId, &FarmIndex, &FarmId)> {
158        self.known_farmers
159            .iter_mut()
160            .filter_map(|known_farmer| {
161                if known_farmer.last_identification.elapsed()
162                    > self.identification_broadcast_interval * 2
163                    && known_farmer.notify_cleanup()
164                {
165                    Some(
166                        known_farmer
167                            .known_farms
168                            .iter()
169                            .map(|(farm_index, farm_id)| {
170                                (known_farmer.farmer_id, farm_index, farm_id)
171                            }),
172                    )
173                } else {
174                    None
175                }
176            })
177            .flatten()
178    }
179
180    fn remove_farm(&mut self, farm_index: FarmIndex) {
181        self.known_farmers.retain_mut(|known_farmer| {
182            if known_farmer.known_farms.contains_key(&farm_index) {
183                known_farmer.known_farms.remove(&farm_index);
184                !known_farmer.known_farms.is_empty()
185            } else {
186                true
187            }
188        });
189    }
190}
191
192/// Utility function for maintaining farms by controller in a cluster environment
193pub async fn maintain_farms(
194    instance: &str,
195    nats_client: &NatsClient,
196    plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
197    identification_broadcast_interval: Duration,
198) -> anyhow::Result<()> {
199    let mut known_farmers = KnownFarmers::new(identification_broadcast_interval);
200
201    let mut farmers_to_add = StreamMap::default();
202    // Stream map for adding/removing farms
203    let mut farms_to_add_remove = StreamMap::default();
204    let mut farms = FuturesUnordered::new();
205
206    let farmer_identify_subscription = pin!(
207        nats_client
208            .subscribe_to_broadcasts::<ClusterFarmerIdentifyBroadcast>(None, None)
209            .await
210            .map_err(|error| anyhow!(
211                "Failed to subscribe to farmer identify broadcast: {error}"
212            ))?
213    );
214
215    // Request farmer to identify themselves
216    if let Err(error) = nats_client
217        .broadcast(&ClusterControllerFarmerIdentifyBroadcast, instance)
218        .await
219    {
220        warn!(%error, "Failed to send farmer identification broadcast");
221    }
222
223    let mut farmer_identify_subscription = farmer_identify_subscription.fuse();
224    let mut farm_pruning_interval = tokio::time::interval_at(
225        (Instant::now() + identification_broadcast_interval * 2).into(),
226        identification_broadcast_interval * 2,
227    );
228    farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
229
230    loop {
231        select! {
232            (farm_index, result) = farms.select_next_some() => {
233                farms_to_add_remove.push(farm_index, Box::pin(async move {
234                    let plotted_pieces = Arc::clone(plotted_pieces);
235
236                    let delete_farm_fut = task::spawn_blocking(move || {
237                        plotted_pieces.write_blocking().delete_farm(farm_index);
238                    });
239                    if let Err(error) = delete_farm_fut.await {
240                        error!(
241                            %farm_index,
242                            %error,
243                            "Failed to delete farm that exited"
244                        );
245                    }
246
247                    FarmAddRemoveResult::Remove { farm_index }
248                }));
249
250                match result {
251                    Ok(()) => {
252                        info!(%farm_index, "Farm exited successfully");
253                    }
254                    Err(error) => {
255                        error!(%farm_index, %error, "Farm exited with error");
256                    }
257                }
258            }
259            maybe_identify_message = farmer_identify_subscription.next() => {
260                let Some(identify_message) = maybe_identify_message else {
261                    return Err(anyhow!("Farmer identify stream ended"));
262                };
263                let ClusterFarmerIdentifyBroadcast {
264                    farmer_id,
265                } = identify_message;
266
267                if known_farmers.refresh(farmer_id) {
268                    trace!(
269                        %farmer_id,
270                        "Received identification for already known farmer"
271                    );
272                } else if farmers_to_add.add_if_not_in_progress(farmer_id, Box::pin(collect_farmer_farms(farmer_id, nats_client))) {
273                    debug!(
274                        %farmer_id,
275                        "Received identification for new farmer, collecting farms"
276                    );
277                } else {
278                    debug!(
279                        %farmer_id,
280                        "Received identification for new farmer, which is already in progress"
281                    );
282                }
283            }
284            (farmer_id, maybe_farms) = farmers_to_add.select_next_some() => {
285                let Ok(farms) = maybe_farms.inspect_err(|error| {
286                    warn!(
287                        %farmer_id,
288                        %error,
289                        "Failed to collect farms to add, may retry later"
290                    );
291                }) else {
292                    continue;
293                };
294
295                let farm_add_result = known_farmers.add(farmer_id, farms);
296                let FarmerAddResult {
297                    close_receiver,
298                    added_farms,
299                } = farm_add_result;
300                for (farm_index, farm_details) in added_farms {
301                    let ClusterFarmerFarmDetails {
302                        farm_id,
303                        total_sectors_count,
304                    } = farm_details;
305
306                    let plotted_pieces = Arc::clone(plotted_pieces);
307                    let close_receiver = close_receiver.resubscribe();
308                    farms_to_add_remove.push(
309                        farm_index,
310                        Box::pin(async move {
311                            match initialize_farm(
312                                farm_index,
313                                farm_id,
314                                total_sectors_count,
315                                plotted_pieces,
316                                nats_client,
317                            )
318                            .await
319                            {
320                                Ok(farm) => {
321                                    info!(
322                                        %farmer_id,
323                                        %farm_index,
324                                        %farm_id,
325                                        "Farm initialized successfully"
326                                    );
327
328                                    FarmAddRemoveResult::Add {
329                                        close_receiver,
330                                        farm,
331                                    }
332                                }
333                                Err(error) => {
334                                    warn!(
335                                        %farmer_id,
336                                        %farm_index,
337                                        %farm_id,
338                                        %error,
339                                        "Failed to initialize farm"
340                                    );
341                                    // We should remove the farm if it failed to initialize
342                                    FarmAddRemoveResult::Remove { farm_index }
343                                }
344                            }
345                        }),
346                    );
347                }
348            }
349            _ = farm_pruning_interval.tick().fuse() => {
350                for (farmer_id, farm_index, farm_id) in known_farmers.remove_expired() {
351                    warn!(
352                        %farmer_id,
353                        %farm_index,
354                        %farm_id,
355                        "Farm expired, notify for cleanup"
356                    );
357                }
358            }
359            (farm_index, result) = farms_to_add_remove.select_next_some() => {
360                match result {
361                    FarmAddRemoveResult::Add {
362                        mut close_receiver,
363                        farm,
364                    } => {
365                        farms.push(async move {
366                            select! {
367                                result = farm.run().fuse() => {
368                                    (farm_index, result)
369                                }
370                                _ = close_receiver.recv().fuse() => {
371                                    // Nothing to do
372                                    (farm_index, Ok(()))
373                                }
374                            }
375                        });
376                    }
377                    FarmAddRemoveResult::Remove { farm_index } => {
378                        known_farmers.remove_farm(farm_index);
379                    }
380                }
381            }
382        }
383    }
384}
385
386/// Collect `ClusterFarmerFarmDetails` from the farmer by sending a stream request
387async fn collect_farmer_farms(
388    farmer_id: ClusterFarmerId,
389    nats_client: &NatsClient,
390) -> anyhow::Result<Vec<ClusterFarmerFarmDetails>> {
391    trace!(%farmer_id, "Requesting farmer farm details");
392    Ok(nats_client
393        .stream_request(
394            &ClusterFarmerFarmDetailsRequest,
395            Some(&farmer_id.to_string()),
396        )
397        .await
398        .inspect_err(|error| {
399            warn!(
400                %error,
401                %farmer_id,
402                "Failed to request farmer farm details"
403            )
404        })?
405        .collect()
406        .await)
407}
408
409async fn initialize_farm(
410    farm_index: FarmIndex,
411    farm_id: FarmId,
412    total_sectors_count: SectorIndex,
413    plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
414    nats_client: &NatsClient,
415) -> anyhow::Result<ClusterFarm> {
416    let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
417        .await
418        .map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;
419
420    // Buffer sectors that are plotted while already plotted sectors are being iterated over
421    let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
422    let sector_update_handler = farm.on_sector_update(Arc::new({
423        let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer);
424
425        move |(_sector_index, sector_update)| {
426            if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
427                plotted_sector,
428                old_plotted_sector,
429                ..
430            }) = sector_update
431            {
432                plotted_sectors_buffer
433                    .lock()
434                    .push((plotted_sector.clone(), old_plotted_sector.clone()));
435            }
436        }
437    }));
438
439    // Add plotted sectors of the farm to global plotted pieces
440    let plotted_sectors = farm.plotted_sectors();
441    let mut plotted_sectors = plotted_sectors
442        .get()
443        .await
444        .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
445
446    {
447        plotted_pieces
448            .write()
449            .await
450            .add_farm(farm_index, farm.piece_reader());
451
452        while let Some(plotted_sector_result) = plotted_sectors.next().await {
453            let plotted_sector = plotted_sector_result.map_err(|error| {
454                anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
455            })?;
456
457            let mut plotted_pieces_guard = plotted_pieces.write().await;
458            plotted_pieces_guard.add_sector(farm_index, &plotted_sector);
459
460            // Drop the guard immediately to make sure other tasks are able to access the plotted pieces
461            drop(plotted_pieces_guard);
462
463            task::yield_now().await;
464        }
465    }
466
467    // Add sectors that were plotted while above iteration was happening to plotted sectors
468    // too
469    drop(sector_update_handler);
470    let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
471    let add_buffered_sectors_fut = task::spawn_blocking(move || {
472        let mut plotted_pieces = plotted_pieces.write_blocking();
473
474        for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
475            if let Some(old_plotted_sector) = old_plotted_sector {
476                plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
477            }
478            // Call delete first to avoid adding duplicates
479            plotted_pieces.delete_sector(farm_index, &plotted_sector);
480            plotted_pieces.add_sector(farm_index, &plotted_sector);
481        }
482    });
483
484    add_buffered_sectors_fut
485        .await
486        .map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;
487
488    Ok(farm)
489}