#[cfg(test)]
mod tests;
use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
use crate::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast};
use crate::cluster::nats_client::NatsClient;
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use futures::channel::oneshot;
use futures::stream::{FusedStream, FuturesUnordered};
use futures::{select, FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::mem;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use subspace_core_primitives::hashes::Blake3Hash;
use subspace_core_primitives::sectors::SectorIndex;
use tokio::task;
use tokio::time::MissedTickBehavior;
use tokio_stream::StreamMap;
use tracing::{error, info, trace, warn};
pub type FarmIndex = u16;
type AddRemoveResult = Option<(FarmIndex, oneshot::Receiver<()>, ClusterFarm)>;
type AddRemoveFuture<'a, R> = Pin<Box<dyn Future<Output = R> + 'a>>;
type AddRemoveStream<'a, R> = Pin<Box<dyn Stream<Item = R> + Unpin + 'a>>;
struct FarmsAddRemoveStreamMap<'a, R> {
in_progress: StreamMap<FarmIndex, AddRemoveStream<'a, R>>,
farms_to_add_remove: HashMap<FarmIndex, VecDeque<AddRemoveFuture<'a, R>>>,
}
impl<R> Default for FarmsAddRemoveStreamMap<'_, R> {
fn default() -> Self {
Self {
in_progress: StreamMap::default(),
farms_to_add_remove: HashMap::default(),
}
}
}
impl<'a, R: 'a> FarmsAddRemoveStreamMap<'a, R> {
fn push(&mut self, farm_index: FarmIndex, fut: AddRemoveFuture<'a, R>) {
if self.in_progress.contains_key(&farm_index) {
let queue = self.farms_to_add_remove.entry(farm_index).or_default();
queue.push_back(fut);
} else {
self.in_progress
.insert(farm_index, Box::pin(fut.into_stream()) as _);
}
}
fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<R>> {
if let Some((farm_index, res)) = std::task::ready!(self.in_progress.poll_next_unpin(cx)) {
self.in_progress.remove(&farm_index);
self.process_farm_queue(farm_index);
Poll::Ready(Some(res))
} else {
assert!(self.farms_to_add_remove.is_empty());
Poll::Ready(None)
}
}
fn process_farm_queue(&mut self, farm_index: FarmIndex) {
if let Entry::Occupied(mut next_entry) = self.farms_to_add_remove.entry(farm_index) {
let task_queue = next_entry.get_mut();
if let Some(fut) = task_queue.pop_front() {
self.in_progress
.insert(farm_index, Box::pin(fut.into_stream()) as _);
}
if task_queue.is_empty() {
next_entry.remove();
}
}
}
}
impl<'a, R: 'a> Stream for FarmsAddRemoveStreamMap<'a, R> {
type Item = R;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
this.poll_next_entry(cx)
}
}
impl<'a, R: 'a> FusedStream for FarmsAddRemoveStreamMap<'a, R> {
fn is_terminated(&self) -> bool {
self.in_progress.is_empty() && self.farms_to_add_remove.is_empty()
}
}
#[derive(Debug)]
struct KnownFarm {
farm_id: FarmId,
fingerprint: Blake3Hash,
last_identification: Instant,
expired_sender: oneshot::Sender<()>,
}
enum KnownFarmInsertResult {
Inserted {
farm_index: FarmIndex,
expired_receiver: oneshot::Receiver<()>,
},
FingerprintUpdated {
farm_index: FarmIndex,
expired_receiver: oneshot::Receiver<()>,
},
NotInserted,
}
#[derive(Debug)]
struct KnownFarms {
identification_broadcast_interval: Duration,
known_farms: HashMap<FarmIndex, KnownFarm>,
}
impl KnownFarms {
fn new(identification_broadcast_interval: Duration) -> Self {
Self {
identification_broadcast_interval,
known_farms: HashMap::new(),
}
}
fn insert_or_update(
&mut self,
farm_id: FarmId,
fingerprint: Blake3Hash,
) -> KnownFarmInsertResult {
if let Some(existing_result) =
self.known_farms
.iter_mut()
.find_map(|(&farm_index, known_farm)| {
if known_farm.farm_id == farm_id {
if known_farm.fingerprint == fingerprint {
known_farm.last_identification = Instant::now();
Some(KnownFarmInsertResult::NotInserted)
} else {
let (expired_sender, expired_receiver) = oneshot::channel();
known_farm.fingerprint = fingerprint;
known_farm.expired_sender = expired_sender;
Some(KnownFarmInsertResult::FingerprintUpdated {
farm_index,
expired_receiver,
})
}
} else {
None
}
})
{
return existing_result;
}
for farm_index in FarmIndex::MIN..=FarmIndex::MAX {
if let Entry::Vacant(entry) = self.known_farms.entry(farm_index) {
let (expired_sender, expired_receiver) = oneshot::channel();
entry.insert(KnownFarm {
farm_id,
fingerprint,
last_identification: Instant::now(),
expired_sender,
});
return KnownFarmInsertResult::Inserted {
farm_index,
expired_receiver,
};
}
}
warn!(%farm_id, max_supported_farm_index = %FarmIndex::MAX, "Too many farms, ignoring");
KnownFarmInsertResult::NotInserted
}
fn remove_expired(&mut self) -> impl Iterator<Item = (FarmIndex, KnownFarm)> + '_ {
self.known_farms.extract_if(|_farm_index, known_farm| {
known_farm.last_identification.elapsed() > self.identification_broadcast_interval * 2
})
}
fn remove(&mut self, farm_index: FarmIndex) {
self.known_farms.remove(&farm_index);
}
}
pub async fn maintain_farms(
instance: &str,
nats_client: &NatsClient,
plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
identification_broadcast_interval: Duration,
) -> anyhow::Result<()> {
let mut known_farms = KnownFarms::new(identification_broadcast_interval);
let mut farms_to_add_remove = FarmsAddRemoveStreamMap::default();
let mut farms = FuturesUnordered::new();
let farmer_identify_subscription = pin!(nats_client
.subscribe_to_broadcasts::<ClusterFarmerIdentifyFarmBroadcast>(None, None)
.await
.map_err(|error| anyhow!(
"Failed to subscribe to farmer identify farm broadcast: {error}"
))?);
if let Err(error) = nats_client
.broadcast(&ClusterControllerFarmerIdentifyBroadcast, instance)
.await
{
warn!(%error, "Failed to send farmer identification broadcast");
}
let mut farmer_identify_subscription = farmer_identify_subscription.fuse();
let mut farm_pruning_interval = tokio::time::interval_at(
(Instant::now() + identification_broadcast_interval * 2).into(),
identification_broadcast_interval * 2,
);
farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
select! {
(farm_index, result) = farms.select_next_some() => {
known_farms.remove(farm_index);
farms_to_add_remove.push(farm_index, Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);
let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%error,
"Failed to delete farm that exited"
);
}
None
}));
match result {
Ok(()) => {
info!(%farm_index, "Farm exited successfully");
}
Err(error) => {
error!(%farm_index, %error, "Farm exited with error");
}
}
}
maybe_identify_message = farmer_identify_subscription.next() => {
let Some(identify_message) = maybe_identify_message else {
return Err(anyhow!("Farmer identify stream ended"));
};
process_farm_identify_message(
identify_message,
nats_client,
&mut known_farms,
&mut farms_to_add_remove,
plotted_pieces,
);
}
_ = farm_pruning_interval.tick().fuse() => {
for (farm_index, removed_farm) in known_farms.remove_expired() {
let farm_id = removed_farm.farm_id;
if removed_farm.expired_sender.send(()).is_ok() {
warn!(
%farm_index,
%farm_id,
"Farm expired and removed"
);
} else {
warn!(
%farm_index,
%farm_id,
"Farm exited before expiration notification"
);
}
farms_to_add_remove.push(farm_index, Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);
let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%farm_id,
%error,
"Failed to delete farm that expired"
);
}
None
}));
}
}
result = farms_to_add_remove.select_next_some() => {
if let Some((farm_index, expired_receiver, farm)) = result {
farms.push(async move {
select! {
result = farm.run().fuse() => {
(farm_index, result)
}
_ = expired_receiver.fuse() => {
(farm_index, Ok(()))
}
}
});
}
}
}
}
}
fn process_farm_identify_message<'a>(
identify_message: ClusterFarmerIdentifyFarmBroadcast,
nats_client: &'a NatsClient,
known_farms: &mut KnownFarms,
farms_to_add_remove: &mut FarmsAddRemoveStreamMap<'a, AddRemoveResult>,
plotted_pieces: &'a Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
) {
let ClusterFarmerIdentifyFarmBroadcast {
farm_id,
total_sectors_count,
fingerprint,
} = identify_message;
let (farm_index, expired_receiver, add, remove) =
match known_farms.insert_or_update(farm_id, fingerprint) {
KnownFarmInsertResult::Inserted {
farm_index,
expired_receiver,
} => {
info!(
%farm_index,
%farm_id,
"Discovered new farm, initializing"
);
(farm_index, expired_receiver, true, false)
}
KnownFarmInsertResult::FingerprintUpdated {
farm_index,
expired_receiver,
} => {
info!(
%farm_index,
%farm_id,
"Farm fingerprint updated, re-initializing"
);
(farm_index, expired_receiver, true, true)
}
KnownFarmInsertResult::NotInserted => {
trace!(
%farm_id,
"Received identification for already known farm"
);
return;
}
};
if remove {
farms_to_add_remove.push(
farm_index,
Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);
let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%farm_id,
%error,
"Failed to delete farm that was replaced"
);
}
None
}),
);
}
if add {
farms_to_add_remove.push(
farm_index,
Box::pin(async move {
match initialize_farm(
farm_index,
farm_id,
total_sectors_count,
Arc::clone(plotted_pieces),
nats_client,
)
.await
{
Ok(farm) => {
if remove {
info!(
%farm_index,
%farm_id,
"Farm re-initialized successfully"
);
} else {
info!(
%farm_index,
%farm_id,
"Farm initialized successfully"
);
}
Some((farm_index, expired_receiver, farm))
}
Err(error) => {
warn!(
%error,
"Failed to initialize farm {farm_id}"
);
None
}
}
}),
);
}
}
async fn initialize_farm(
farm_index: FarmIndex,
farm_id: FarmId,
total_sectors_count: SectorIndex,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
nats_client: &NatsClient,
) -> anyhow::Result<ClusterFarm> {
let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
.await
.map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;
let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
let sector_update_handler = farm.on_sector_update(Arc::new({
let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer);
move |(_sector_index, sector_update)| {
if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
plotted_sector,
old_plotted_sector,
..
}) = sector_update
{
plotted_sectors_buffer
.lock()
.push((plotted_sector.clone(), old_plotted_sector.clone()));
}
}
}));
let plotted_sectors = farm.plotted_sectors();
let mut plotted_sectors = plotted_sectors
.get()
.await
.map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
{
plotted_pieces
.write()
.await
.add_farm(farm_index, farm.piece_reader());
while let Some(plotted_sector_result) = plotted_sectors.next().await {
let plotted_sector = plotted_sector_result.map_err(|error| {
anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
})?;
let mut plotted_pieces_guard = plotted_pieces.write().await;
plotted_pieces_guard.add_sector(farm_index, &plotted_sector);
drop(plotted_pieces_guard);
task::yield_now().await;
}
}
drop(sector_update_handler);
let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
let add_buffered_sectors_fut = task::spawn_blocking(move || {
let mut plotted_pieces = plotted_pieces.write_blocking();
for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
if let Some(old_plotted_sector) = old_plotted_sector {
plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
}
plotted_pieces.delete_sector(farm_index, &plotted_sector);
plotted_pieces.add_sector(farm_index, &plotted_sector);
}
});
add_buffered_sectors_fut
.await
.map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;
Ok(farm)
}