subspace_farmer/cluster/controller/
farms.rs1use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
8use crate::cluster::controller::stream_map::StreamMap;
9use crate::cluster::farmer::{
10 ClusterFarm, ClusterFarmerFarmDetails, ClusterFarmerFarmDetailsRequest, ClusterFarmerId,
11 ClusterFarmerIdentifyBroadcast,
12};
13use crate::cluster::nats_client::NatsClient;
14use crate::farm::plotted_pieces::PlottedPieces;
15use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
16use anyhow::anyhow;
17use async_lock::RwLock as AsyncRwLock;
18use futures::stream::FuturesUnordered;
19use futures::{FutureExt, StreamExt, select};
20use parking_lot::Mutex;
21use std::collections::{HashMap, HashSet};
22use std::mem;
23use std::pin::pin;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use subspace_core_primitives::sectors::SectorIndex;
27use tokio::sync::broadcast;
28use tokio::task;
29use tokio::time::MissedTickBehavior;
30use tracing::{debug, error, info, trace, warn};
31
32pub type FarmIndex = u16;
34
35enum FarmAddRemoveResult {
36 Add {
37 close_receiver: broadcast::Receiver<()>,
38 farm: ClusterFarm,
39 },
40 Remove {
41 farm_index: FarmIndex,
42 },
43}
44
45struct FarmerAddResult<I> {
46 close_receiver: broadcast::Receiver<()>,
47 added_farms: I,
48}
49
50#[derive(Debug)]
51struct KnownFarmer {
52 farmer_id: ClusterFarmerId,
53 last_identification: Instant,
54 known_farms: HashMap<FarmIndex, FarmId>,
55 close_sender: Option<broadcast::Sender<()>>,
56}
57
58impl KnownFarmer {
59 fn notify_cleanup(&mut self) -> bool {
60 let Some(close_sender) = self.close_sender.take() else {
61 return false;
62 };
63 let _ = close_sender.send(());
64 true
65 }
66}
67
68#[derive(Debug)]
69struct KnownFarmers {
70 identification_broadcast_interval: Duration,
71 known_farmers: Vec<KnownFarmer>,
72}
73
74impl KnownFarmers {
75 fn new(identification_broadcast_interval: Duration) -> Self {
76 Self {
77 identification_broadcast_interval,
78 known_farmers: Vec::new(),
79 }
80 }
81
82 fn refresh(&mut self, farmer_id: ClusterFarmerId) -> bool {
84 self.known_farmers.iter_mut().any(|known_farmer| {
85 if known_farmer.farmer_id == farmer_id {
86 trace!(%farmer_id, "Updating last identification for farmer");
87 known_farmer.last_identification = Instant::now();
88 true
89 } else {
90 false
91 }
92 })
93 }
94
95 fn add(
96 &mut self,
97 farmer_id: ClusterFarmerId,
98 farms: Vec<ClusterFarmerFarmDetails>,
99 ) -> FarmerAddResult<impl Iterator<Item = (FarmIndex, ClusterFarmerFarmDetails)>> {
100 let farm_indices = self.pick_farm_indices(farms.len());
101 let farm_ids_to_add = farms
102 .iter()
103 .map(|farm_details| farm_details.farm_id)
104 .collect::<HashSet<FarmId>>();
105
106 if let Some(old_farmer) = self.known_farmers.iter_mut().find(|known_farmer| {
107 known_farmer
108 .known_farms
109 .values()
110 .any(|farm_id| farm_ids_to_add.contains(farm_id))
111 }) {
112 warn!(old_farmer_id = %old_farmer.farmer_id, "Some farms are already known, notify for cleanup them first");
113 old_farmer.notify_cleanup();
114 }
115
116 let (close_sender, close_receiver) = broadcast::channel(1);
117 self.known_farmers.push(KnownFarmer {
118 farmer_id,
119 last_identification: Instant::now(),
120 known_farms: farm_indices
121 .iter()
122 .copied()
123 .zip(farms.iter().map(|farm_details| farm_details.farm_id))
124 .collect(),
125 close_sender: Some(close_sender),
126 });
127
128 FarmerAddResult {
129 close_receiver,
130 added_farms: farm_indices.into_iter().zip(farms),
131 }
132 }
133
134 fn pick_farm_indices(&self, len: usize) -> Vec<u16> {
135 let used_indices = self
136 .known_farmers
137 .iter()
138 .flat_map(|known_farmer| known_farmer.known_farms.keys())
139 .collect::<HashSet<_>>();
140
141 let mut available_indices = Vec::with_capacity(len);
142
143 for farm_index in FarmIndex::MIN..=FarmIndex::MAX {
144 if !used_indices.contains(&farm_index) {
145 if available_indices.len() < len {
146 available_indices.push(farm_index);
147 } else {
148 return available_indices;
149 }
150 }
151 }
152
153 warn!(max_supported_farm_index = %FarmIndex::MAX, "Too many farms");
154 available_indices
155 }
156
157 fn remove_expired(&mut self) -> impl Iterator<Item = (ClusterFarmerId, &FarmIndex, &FarmId)> {
158 self.known_farmers
159 .iter_mut()
160 .filter_map(|known_farmer| {
161 if known_farmer.last_identification.elapsed()
162 > self.identification_broadcast_interval * 2
163 && known_farmer.notify_cleanup()
164 {
165 Some(
166 known_farmer
167 .known_farms
168 .iter()
169 .map(|(farm_index, farm_id)| {
170 (known_farmer.farmer_id, farm_index, farm_id)
171 }),
172 )
173 } else {
174 None
175 }
176 })
177 .flatten()
178 }
179
180 fn remove_farm(&mut self, farm_index: FarmIndex) {
181 self.known_farmers.retain_mut(|known_farmer| {
182 if known_farmer.known_farms.contains_key(&farm_index) {
183 known_farmer.known_farms.remove(&farm_index);
184 !known_farmer.known_farms.is_empty()
185 } else {
186 true
187 }
188 });
189 }
190}
191
192pub async fn maintain_farms(
194 instance: &str,
195 nats_client: &NatsClient,
196 plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
197 identification_broadcast_interval: Duration,
198) -> anyhow::Result<()> {
199 let mut known_farmers = KnownFarmers::new(identification_broadcast_interval);
200
201 let mut farmers_to_add = StreamMap::default();
202 let mut farms_to_add_remove = StreamMap::default();
204 let mut farms = FuturesUnordered::new();
205
206 let farmer_identify_subscription = pin!(
207 nats_client
208 .subscribe_to_broadcasts::<ClusterFarmerIdentifyBroadcast>(None, None)
209 .await
210 .map_err(|error| anyhow!(
211 "Failed to subscribe to farmer identify broadcast: {error}"
212 ))?
213 );
214
215 if let Err(error) = nats_client
217 .broadcast(&ClusterControllerFarmerIdentifyBroadcast, instance)
218 .await
219 {
220 warn!(%error, "Failed to send farmer identification broadcast");
221 }
222
223 let mut farmer_identify_subscription = farmer_identify_subscription.fuse();
224 let mut farm_pruning_interval = tokio::time::interval_at(
225 (Instant::now() + identification_broadcast_interval * 2).into(),
226 identification_broadcast_interval * 2,
227 );
228 farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
229
230 loop {
231 select! {
232 (farm_index, result) = farms.select_next_some() => {
233 farms_to_add_remove.push(farm_index, Box::pin(async move {
234 let plotted_pieces = Arc::clone(plotted_pieces);
235
236 let delete_farm_fut = task::spawn_blocking(move || {
237 plotted_pieces.write_blocking().delete_farm(farm_index);
238 });
239 if let Err(error) = delete_farm_fut.await {
240 error!(
241 %farm_index,
242 %error,
243 "Failed to delete farm that exited"
244 );
245 }
246
247 FarmAddRemoveResult::Remove { farm_index }
248 }));
249
250 match result {
251 Ok(()) => {
252 info!(%farm_index, "Farm exited successfully");
253 }
254 Err(error) => {
255 error!(%farm_index, %error, "Farm exited with error");
256 }
257 }
258 }
259 maybe_identify_message = farmer_identify_subscription.next() => {
260 let Some(identify_message) = maybe_identify_message else {
261 return Err(anyhow!("Farmer identify stream ended"));
262 };
263 let ClusterFarmerIdentifyBroadcast {
264 farmer_id,
265 } = identify_message;
266
267 if known_farmers.refresh(farmer_id) {
268 trace!(
269 %farmer_id,
270 "Received identification for already known farmer"
271 );
272 } else if farmers_to_add.add_if_not_in_progress(farmer_id, Box::pin(collect_farmer_farms(farmer_id, nats_client))) {
273 debug!(
274 %farmer_id,
275 "Received identification for new farmer, collecting farms"
276 );
277 } else {
278 debug!(
279 %farmer_id,
280 "Received identification for new farmer, which is already in progress"
281 );
282 }
283 }
284 (farmer_id, maybe_farms) = farmers_to_add.select_next_some() => {
285 let Ok(farms) = maybe_farms.inspect_err(|error| {
286 warn!(
287 %farmer_id,
288 %error,
289 "Failed to collect farms to add, may retry later"
290 );
291 }) else {
292 continue;
293 };
294
295 let farm_add_result = known_farmers.add(farmer_id, farms);
296 let FarmerAddResult {
297 close_receiver,
298 added_farms,
299 } = farm_add_result;
300 for (farm_index, farm_details) in added_farms {
301 let ClusterFarmerFarmDetails {
302 farm_id,
303 total_sectors_count,
304 } = farm_details;
305
306 let plotted_pieces = Arc::clone(plotted_pieces);
307 let close_receiver = close_receiver.resubscribe();
308 farms_to_add_remove.push(
309 farm_index,
310 Box::pin(async move {
311 match initialize_farm(
312 farm_index,
313 farm_id,
314 total_sectors_count,
315 plotted_pieces,
316 nats_client,
317 )
318 .await
319 {
320 Ok(farm) => {
321 info!(
322 %farmer_id,
323 %farm_index,
324 %farm_id,
325 "Farm initialized successfully"
326 );
327
328 FarmAddRemoveResult::Add {
329 close_receiver,
330 farm,
331 }
332 }
333 Err(error) => {
334 warn!(
335 %farmer_id,
336 %farm_index,
337 %farm_id,
338 %error,
339 "Failed to initialize farm"
340 );
341 FarmAddRemoveResult::Remove { farm_index }
343 }
344 }
345 }),
346 );
347 }
348 }
349 _ = farm_pruning_interval.tick().fuse() => {
350 for (farmer_id, farm_index, farm_id) in known_farmers.remove_expired() {
351 warn!(
352 %farmer_id,
353 %farm_index,
354 %farm_id,
355 "Farm expired, notify for cleanup"
356 );
357 }
358 }
359 (farm_index, result) = farms_to_add_remove.select_next_some() => {
360 match result {
361 FarmAddRemoveResult::Add {
362 mut close_receiver,
363 farm,
364 } => {
365 farms.push(async move {
366 select! {
367 result = farm.run().fuse() => {
368 (farm_index, result)
369 }
370 _ = close_receiver.recv().fuse() => {
371 (farm_index, Ok(()))
373 }
374 }
375 });
376 }
377 FarmAddRemoveResult::Remove { farm_index } => {
378 known_farmers.remove_farm(farm_index);
379 }
380 }
381 }
382 }
383 }
384}
385
386async fn collect_farmer_farms(
388 farmer_id: ClusterFarmerId,
389 nats_client: &NatsClient,
390) -> anyhow::Result<Vec<ClusterFarmerFarmDetails>> {
391 trace!(%farmer_id, "Requesting farmer farm details");
392 Ok(nats_client
393 .stream_request(
394 &ClusterFarmerFarmDetailsRequest,
395 Some(&farmer_id.to_string()),
396 )
397 .await
398 .inspect_err(|error| {
399 warn!(
400 %error,
401 %farmer_id,
402 "Failed to request farmer farm details"
403 )
404 })?
405 .collect()
406 .await)
407}
408
409async fn initialize_farm(
410 farm_index: FarmIndex,
411 farm_id: FarmId,
412 total_sectors_count: SectorIndex,
413 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
414 nats_client: &NatsClient,
415) -> anyhow::Result<ClusterFarm> {
416 let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
417 .await
418 .map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;
419
420 let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
422 let sector_update_handler = farm.on_sector_update(Arc::new({
423 let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer);
424
425 move |(_sector_index, sector_update)| {
426 if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
427 plotted_sector,
428 old_plotted_sector,
429 ..
430 }) = sector_update
431 {
432 plotted_sectors_buffer
433 .lock()
434 .push((plotted_sector.clone(), old_plotted_sector.clone()));
435 }
436 }
437 }));
438
439 let plotted_sectors = farm.plotted_sectors();
441 let mut plotted_sectors = plotted_sectors
442 .get()
443 .await
444 .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
445
446 {
447 plotted_pieces
448 .write()
449 .await
450 .add_farm(farm_index, farm.piece_reader());
451
452 while let Some(plotted_sector_result) = plotted_sectors.next().await {
453 let plotted_sector = plotted_sector_result.map_err(|error| {
454 anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
455 })?;
456
457 let mut plotted_pieces_guard = plotted_pieces.write().await;
458 plotted_pieces_guard.add_sector(farm_index, &plotted_sector);
459
460 drop(plotted_pieces_guard);
462
463 task::yield_now().await;
464 }
465 }
466
467 drop(sector_update_handler);
470 let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
471 let add_buffered_sectors_fut = task::spawn_blocking(move || {
472 let mut plotted_pieces = plotted_pieces.write_blocking();
473
474 for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
475 if let Some(old_plotted_sector) = old_plotted_sector {
476 plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
477 }
478 plotted_pieces.delete_sector(farm_index, &plotted_sector);
480 plotted_pieces.add_sector(farm_index, &plotted_sector);
481 }
482 });
483
484 add_buffered_sectors_fut
485 .await
486 .map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;
487
488 Ok(farm)
489}