subspace_farmer/cluster/controller/
farms.rs1use crate::cluster::controller::stream_map::StreamMap;
8use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
9use crate::cluster::farmer::{
10 ClusterFarm, ClusterFarmerFarmDetails, ClusterFarmerFarmDetailsRequest, ClusterFarmerId,
11 ClusterFarmerIdentifyBroadcast,
12};
13use crate::cluster::nats_client::NatsClient;
14use crate::farm::plotted_pieces::PlottedPieces;
15use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
16use anyhow::anyhow;
17use async_lock::RwLock as AsyncRwLock;
18use futures::stream::FuturesUnordered;
19use futures::{select, FutureExt, StreamExt};
20use parking_lot::Mutex;
21use std::collections::{HashMap, HashSet};
22use std::mem;
23use std::pin::pin;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use subspace_core_primitives::sectors::SectorIndex;
27use tokio::sync::broadcast;
28use tokio::task;
29use tokio::time::MissedTickBehavior;
30use tracing::{debug, error, info, trace, warn};
31
32pub type FarmIndex = u16;
34
35enum FarmAddRemoveResult {
36 Add {
37 close_receiver: broadcast::Receiver<()>,
38 farm: ClusterFarm,
39 },
40 Remove {
41 farm_index: FarmIndex,
42 },
43}
44
45struct FarmerAddResult<I> {
46 close_receiver: broadcast::Receiver<()>,
47 added_farms: I,
48}
49
50#[derive(Debug)]
51struct KnownFarmer {
52 farmer_id: ClusterFarmerId,
53 last_identification: Instant,
54 known_farms: HashMap<FarmIndex, FarmId>,
55 close_sender: Option<broadcast::Sender<()>>,
56}
57
58impl KnownFarmer {
59 fn notify_cleanup(&mut self) -> bool {
60 let Some(close_sender) = self.close_sender.take() else {
61 return false;
62 };
63 let _ = close_sender.send(());
64 true
65 }
66}
67
68#[derive(Debug)]
69struct KnownFarmers {
70 identification_broadcast_interval: Duration,
71 known_farmers: Vec<KnownFarmer>,
72}
73
74impl KnownFarmers {
75 fn new(identification_broadcast_interval: Duration) -> Self {
76 Self {
77 identification_broadcast_interval,
78 known_farmers: Vec::new(),
79 }
80 }
81
82 fn refresh(&mut self, farmer_id: ClusterFarmerId) -> bool {
84 self.known_farmers.iter_mut().any(|known_farmer| {
85 if known_farmer.farmer_id == farmer_id {
86 trace!(%farmer_id, "Updating last identification for farmer");
87 known_farmer.last_identification = Instant::now();
88 true
89 } else {
90 false
91 }
92 })
93 }
94
95 fn add(
96 &mut self,
97 farmer_id: ClusterFarmerId,
98 farms: Vec<ClusterFarmerFarmDetails>,
99 ) -> FarmerAddResult<impl Iterator<Item = (FarmIndex, ClusterFarmerFarmDetails)>> {
100 let farm_indices = self.pick_farm_indices(farms.len());
101 let farm_ids_to_add = farms
102 .iter()
103 .map(|farm_details| farm_details.farm_id)
104 .collect::<HashSet<FarmId>>();
105
106 if let Some(old_farmer) = self.known_farmers.iter_mut().find(|known_farmer| {
107 known_farmer
108 .known_farms
109 .values()
110 .any(|farm_id| farm_ids_to_add.contains(farm_id))
111 }) {
112 warn!(old_farmer_id = %old_farmer.farmer_id, "Some farms are already known, notify for cleanup them first");
113 old_farmer.notify_cleanup();
114 }
115
116 let (close_sender, close_receiver) = broadcast::channel(1);
117 self.known_farmers.push(KnownFarmer {
118 farmer_id,
119 last_identification: Instant::now(),
120 known_farms: farm_indices
121 .iter()
122 .copied()
123 .zip(farms.iter().map(|farm_details| farm_details.farm_id))
124 .collect(),
125 close_sender: Some(close_sender),
126 });
127
128 FarmerAddResult {
129 close_receiver,
130 added_farms: farm_indices.into_iter().zip(farms),
131 }
132 }
133
134 fn pick_farm_indices(&self, len: usize) -> Vec<u16> {
135 let used_indices = self
136 .known_farmers
137 .iter()
138 .flat_map(|known_farmer| known_farmer.known_farms.keys())
139 .collect::<HashSet<_>>();
140
141 let mut available_indices = Vec::with_capacity(len);
142
143 for farm_index in FarmIndex::MIN..=FarmIndex::MAX {
144 if !used_indices.contains(&farm_index) {
145 if available_indices.len() < len {
146 available_indices.push(farm_index);
147 } else {
148 return available_indices;
149 }
150 }
151 }
152
153 warn!(max_supported_farm_index = %FarmIndex::MAX, "Too many farms");
154 available_indices
155 }
156
157 fn remove_expired(&mut self) -> impl Iterator<Item = (ClusterFarmerId, &FarmIndex, &FarmId)> {
158 self.known_farmers
159 .iter_mut()
160 .filter_map(|known_farmer| {
161 if known_farmer.last_identification.elapsed()
162 > self.identification_broadcast_interval * 2
163 && known_farmer.notify_cleanup()
164 {
165 Some(
166 known_farmer
167 .known_farms
168 .iter()
169 .map(|(farm_index, farm_id)| {
170 (known_farmer.farmer_id, farm_index, farm_id)
171 }),
172 )
173 } else {
174 None
175 }
176 })
177 .flatten()
178 }
179
180 fn remove_farm(&mut self, farm_index: FarmIndex) {
181 self.known_farmers.retain_mut(|known_farmer| {
182 if known_farmer.known_farms.contains_key(&farm_index) {
183 known_farmer.known_farms.remove(&farm_index);
184 !known_farmer.known_farms.is_empty()
185 } else {
186 true
187 }
188 });
189 }
190}
191
192pub async fn maintain_farms(
194 instance: &str,
195 nats_client: &NatsClient,
196 plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
197 identification_broadcast_interval: Duration,
198) -> anyhow::Result<()> {
199 let mut known_farmers = KnownFarmers::new(identification_broadcast_interval);
200
201 let mut farmers_to_add = StreamMap::default();
202 let mut farms_to_add_remove = StreamMap::default();
204 let mut farms = FuturesUnordered::new();
205
206 let farmer_identify_subscription = pin!(nats_client
207 .subscribe_to_broadcasts::<ClusterFarmerIdentifyBroadcast>(None, None)
208 .await
209 .map_err(|error| anyhow!("Failed to subscribe to farmer identify broadcast: {error}"))?);
210
211 if let Err(error) = nats_client
213 .broadcast(&ClusterControllerFarmerIdentifyBroadcast, instance)
214 .await
215 {
216 warn!(%error, "Failed to send farmer identification broadcast");
217 }
218
219 let mut farmer_identify_subscription = farmer_identify_subscription.fuse();
220 let mut farm_pruning_interval = tokio::time::interval_at(
221 (Instant::now() + identification_broadcast_interval * 2).into(),
222 identification_broadcast_interval * 2,
223 );
224 farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
225
226 loop {
227 select! {
228 (farm_index, result) = farms.select_next_some() => {
229 farms_to_add_remove.push(farm_index, Box::pin(async move {
230 let plotted_pieces = Arc::clone(plotted_pieces);
231
232 let delete_farm_fut = task::spawn_blocking(move || {
233 plotted_pieces.write_blocking().delete_farm(farm_index);
234 });
235 if let Err(error) = delete_farm_fut.await {
236 error!(
237 %farm_index,
238 %error,
239 "Failed to delete farm that exited"
240 );
241 }
242
243 FarmAddRemoveResult::Remove { farm_index }
244 }));
245
246 match result {
247 Ok(()) => {
248 info!(%farm_index, "Farm exited successfully");
249 }
250 Err(error) => {
251 error!(%farm_index, %error, "Farm exited with error");
252 }
253 }
254 }
255 maybe_identify_message = farmer_identify_subscription.next() => {
256 let Some(identify_message) = maybe_identify_message else {
257 return Err(anyhow!("Farmer identify stream ended"));
258 };
259 let ClusterFarmerIdentifyBroadcast {
260 farmer_id,
261 } = identify_message;
262
263 if known_farmers.refresh(farmer_id) {
264 trace!(
265 %farmer_id,
266 "Received identification for already known farmer"
267 );
268 } else if farmers_to_add.add_if_not_in_progress(farmer_id, Box::pin(collect_farmer_farms(farmer_id, nats_client))) {
269 debug!(
270 %farmer_id,
271 "Received identification for new farmer, collecting farms"
272 );
273 } else {
274 debug!(
275 %farmer_id,
276 "Received identification for new farmer, which is already in progress"
277 );
278 }
279 }
280 (farmer_id, maybe_farms) = farmers_to_add.select_next_some() => {
281 let Ok(farms) = maybe_farms.inspect_err(|error| {
282 warn!(
283 %farmer_id,
284 %error,
285 "Failed to collect farms to add, may retry later"
286 );
287 }) else {
288 continue;
289 };
290
291 let farm_add_result = known_farmers.add(farmer_id, farms);
292 let FarmerAddResult {
293 close_receiver,
294 added_farms,
295 } = farm_add_result;
296 for (farm_index, farm_details) in added_farms {
297 let ClusterFarmerFarmDetails {
298 farm_id,
299 total_sectors_count,
300 } = farm_details;
301
302 let plotted_pieces = Arc::clone(plotted_pieces);
303 let close_receiver = close_receiver.resubscribe();
304 farms_to_add_remove.push(
305 farm_index,
306 Box::pin(async move {
307 match initialize_farm(
308 farm_index,
309 farm_id,
310 total_sectors_count,
311 plotted_pieces,
312 nats_client,
313 )
314 .await
315 {
316 Ok(farm) => {
317 info!(
318 %farmer_id,
319 %farm_index,
320 %farm_id,
321 "Farm initialized successfully"
322 );
323
324 FarmAddRemoveResult::Add {
325 close_receiver,
326 farm,
327 }
328 }
329 Err(error) => {
330 warn!(
331 %farmer_id,
332 %farm_index,
333 %farm_id,
334 %error,
335 "Failed to initialize farm"
336 );
337 FarmAddRemoveResult::Remove { farm_index }
339 }
340 }
341 }),
342 );
343 }
344 }
345 _ = farm_pruning_interval.tick().fuse() => {
346 for (farmer_id, farm_index, farm_id) in known_farmers.remove_expired() {
347 warn!(
348 %farmer_id,
349 %farm_index,
350 %farm_id,
351 "Farm expired, notify for cleanup"
352 );
353 }
354 }
355 (farm_index, result) = farms_to_add_remove.select_next_some() => {
356 match result {
357 FarmAddRemoveResult::Add {
358 mut close_receiver,
359 farm,
360 } => {
361 farms.push(async move {
362 select! {
363 result = farm.run().fuse() => {
364 (farm_index, result)
365 }
366 _ = close_receiver.recv().fuse() => {
367 (farm_index, Ok(()))
369 }
370 }
371 });
372 }
373 FarmAddRemoveResult::Remove { farm_index } => {
374 known_farmers.remove_farm(farm_index);
375 }
376 }
377 }
378 }
379 }
380}
381
382async fn collect_farmer_farms(
384 farmer_id: ClusterFarmerId,
385 nats_client: &NatsClient,
386) -> anyhow::Result<Vec<ClusterFarmerFarmDetails>> {
387 trace!(%farmer_id, "Requesting farmer farm details");
388 Ok(nats_client
389 .stream_request(
390 &ClusterFarmerFarmDetailsRequest,
391 Some(&farmer_id.to_string()),
392 )
393 .await
394 .inspect_err(|error| {
395 warn!(
396 %error,
397 %farmer_id,
398 "Failed to request farmer farm details"
399 )
400 })?
401 .collect()
402 .await)
403}
404
405async fn initialize_farm(
406 farm_index: FarmIndex,
407 farm_id: FarmId,
408 total_sectors_count: SectorIndex,
409 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
410 nats_client: &NatsClient,
411) -> anyhow::Result<ClusterFarm> {
412 let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
413 .await
414 .map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;
415
416 let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
418 let sector_update_handler = farm.on_sector_update(Arc::new({
419 let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer);
420
421 move |(_sector_index, sector_update)| {
422 if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
423 plotted_sector,
424 old_plotted_sector,
425 ..
426 }) = sector_update
427 {
428 plotted_sectors_buffer
429 .lock()
430 .push((plotted_sector.clone(), old_plotted_sector.clone()));
431 }
432 }
433 }));
434
435 let plotted_sectors = farm.plotted_sectors();
437 let mut plotted_sectors = plotted_sectors
438 .get()
439 .await
440 .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
441
442 {
443 plotted_pieces
444 .write()
445 .await
446 .add_farm(farm_index, farm.piece_reader());
447
448 while let Some(plotted_sector_result) = plotted_sectors.next().await {
449 let plotted_sector = plotted_sector_result.map_err(|error| {
450 anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
451 })?;
452
453 let mut plotted_pieces_guard = plotted_pieces.write().await;
454 plotted_pieces_guard.add_sector(farm_index, &plotted_sector);
455
456 drop(plotted_pieces_guard);
458
459 task::yield_now().await;
460 }
461 }
462
463 drop(sector_update_handler);
466 let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
467 let add_buffered_sectors_fut = task::spawn_blocking(move || {
468 let mut plotted_pieces = plotted_pieces.write_blocking();
469
470 for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
471 if let Some(old_plotted_sector) = old_plotted_sector {
472 plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
473 }
474 plotted_pieces.delete_sector(farm_index, &plotted_sector);
476 plotted_pieces.add_sector(farm_index, &plotted_sector);
477 }
478 });
479
480 add_buffered_sectors_fut
481 .await
482 .map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;
483
484 Ok(farm)
485}