use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
use crate::cluster::nats_client::{
GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
};
use crate::farm::{
Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader,
PlottedSectors, SectorUpdate,
};
use crate::utils::AsyncJoinOnDrop;
use anyhow::anyhow;
use async_trait::async_trait;
use event_listener_primitives::Bag;
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{select, stream, FutureExt, Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use std::future::Future;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::{Duration, Instant};
use subspace_core_primitives::hashes::{blake3_hash_list, Blake3Hash};
use subspace_core_primitives::pieces::{Piece, PieceOffset};
use subspace_core_primitives::sectors::SectorIndex;
use subspace_farmer_components::plotting::PlottedSector;
use subspace_rpc_primitives::SolutionResponse;
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info_span, trace, warn, Instrument};
const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000;
const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
type Handler<A> = Bag<HandlerFn<A>, A>;
#[derive(Debug, Clone, Encode, Decode)]
pub struct ClusterFarmerIdentifyFarmBroadcast {
pub farm_id: FarmId,
pub total_sectors_count: SectorIndex,
pub fingerprint: Blake3Hash,
}
impl GenericBroadcast for ClusterFarmerIdentifyFarmBroadcast {
const SUBJECT: &'static str = "subspace.farmer.*.identify";
}
#[derive(Debug, Clone, Encode, Decode)]
struct ClusterFarmerSectorUpdateBroadcast {
farm_id: FarmId,
sector_index: SectorIndex,
sector_update: SectorUpdate,
}
impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast {
const SUBJECT: &'static str = "subspace.farmer.*.sector-update";
}
#[derive(Debug, Clone, Encode, Decode)]
struct ClusterFarmerFarmingNotificationBroadcast {
farm_id: FarmId,
farming_notification: FarmingNotification,
}
impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast {
const SUBJECT: &'static str = "subspace.farmer.*.farming-notification";
}
#[derive(Debug, Clone, Encode, Decode)]
struct ClusterFarmerSolutionBroadcast {
farm_id: FarmId,
solution_response: SolutionResponse,
}
impl GenericBroadcast for ClusterFarmerSolutionBroadcast {
const SUBJECT: &'static str = "subspace.farmer.*.solution-response";
}
#[derive(Debug, Clone, Encode, Decode)]
struct ClusterFarmerReadPieceRequest {
sector_index: SectorIndex,
piece_offset: PieceOffset,
}
impl GenericRequest for ClusterFarmerReadPieceRequest {
const SUBJECT: &'static str = "subspace.farmer.*.farm.read-piece";
type Response = Result<Option<Piece>, String>;
}
#[derive(Debug, Clone, Encode, Decode)]
struct ClusterFarmerPlottedSectorsRequest;
impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest {
const SUBJECT: &'static str = "subspace.farmer.*.farm.plotted-sectors";
type Response = Result<PlottedSector, String>;
}
#[derive(Debug)]
struct ClusterPlottedSectors {
farm_id_string: String,
nats_client: NatsClient,
}
#[async_trait]
impl PlottedSectors for ClusterPlottedSectors {
async fn get(
&self,
) -> Result<
Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
FarmError,
> {
Ok(Box::new(
self.nats_client
.stream_request(
&ClusterFarmerPlottedSectorsRequest,
Some(&self.farm_id_string),
)
.await?
.map(|response| response.map_err(FarmError::from)),
))
}
}
#[derive(Debug)]
struct ClusterPieceReader {
farm_id_string: String,
nats_client: NatsClient,
}
#[async_trait]
impl PieceReader for ClusterPieceReader {
async fn read_piece(
&self,
sector_index: SectorIndex,
piece_offset: PieceOffset,
) -> Result<Option<Piece>, FarmError> {
Ok(self
.nats_client
.request(
&ClusterFarmerReadPieceRequest {
sector_index,
piece_offset,
},
Some(&self.farm_id_string),
)
.await??)
}
}
#[derive(Default, Debug)]
struct Handlers {
sector_update: Handler<(SectorIndex, SectorUpdate)>,
farming_notification: Handler<FarmingNotification>,
solution: Handler<SolutionResponse>,
}
#[derive(Debug)]
pub struct ClusterFarm {
farm_id: FarmId,
farm_id_string: String,
total_sectors_count: SectorIndex,
nats_client: NatsClient,
handlers: Arc<Handlers>,
background_tasks: AsyncJoinOnDrop<()>,
}
#[async_trait(?Send)]
impl Farm for ClusterFarm {
fn id(&self) -> &FarmId {
&self.farm_id
}
fn total_sectors_count(&self) -> SectorIndex {
self.total_sectors_count
}
fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
Arc::new(ClusterPlottedSectors {
farm_id_string: self.farm_id_string.clone(),
nats_client: self.nats_client.clone(),
})
}
fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
Arc::new(ClusterPieceReader {
farm_id_string: self.farm_id_string.clone(),
nats_client: self.nats_client.clone(),
})
}
fn on_sector_update(
&self,
callback: HandlerFn<(SectorIndex, SectorUpdate)>,
) -> Box<dyn HandlerId> {
Box::new(self.handlers.sector_update.add(callback))
}
fn on_farming_notification(
&self,
callback: HandlerFn<FarmingNotification>,
) -> Box<dyn HandlerId> {
Box::new(self.handlers.farming_notification.add(callback))
}
fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
Box::new(self.handlers.solution.add(callback))
}
fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
Box::pin((*self).run())
}
}
impl ClusterFarm {
pub async fn new(
farm_id: FarmId,
total_sectors_count: SectorIndex,
nats_client: NatsClient,
) -> anyhow::Result<Self> {
let farm_id_string = farm_id.to_string();
let sector_updates_subscription = nats_client
.subscribe_to_broadcasts::<ClusterFarmerSectorUpdateBroadcast>(
Some(&farm_id_string),
None,
)
.await
.map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))?;
let farming_notifications_subscription = nats_client
.subscribe_to_broadcasts::<ClusterFarmerFarmingNotificationBroadcast>(
Some(&farm_id_string),
None,
)
.await
.map_err(|error| {
anyhow!("Failed to subscribe to farming notifications broadcast: {error}")
})?;
let solution_subscription = nats_client
.subscribe_to_broadcasts::<ClusterFarmerSolutionBroadcast>(Some(&farm_id_string), None)
.await
.map_err(|error| {
anyhow!("Failed to subscribe to solution responses broadcast: {error}")
})?;
let handlers = Arc::<Handlers>::default();
let background_tasks = {
let handlers = Arc::clone(&handlers);
async move {
let mut sector_updates_subscription = pin!(sector_updates_subscription);
let mut farming_notifications_subscription =
pin!(farming_notifications_subscription);
let mut solution_subscription = pin!(solution_subscription);
let sector_updates_fut = async {
while let Some(ClusterFarmerSectorUpdateBroadcast {
sector_index,
sector_update,
..
}) = sector_updates_subscription.next().await
{
handlers
.sector_update
.call_simple(&(sector_index, sector_update));
}
};
let farming_notifications_fut = async {
while let Some(ClusterFarmerFarmingNotificationBroadcast {
farming_notification,
..
}) = farming_notifications_subscription.next().await
{
handlers
.farming_notification
.call_simple(&farming_notification);
}
};
let solutions_fut = async {
while let Some(ClusterFarmerSolutionBroadcast {
solution_response, ..
}) = solution_subscription.next().await
{
handlers.solution.call_simple(&solution_response);
}
};
select! {
_ = sector_updates_fut.fuse() => {}
_ = farming_notifications_fut.fuse() => {}
_ = solutions_fut.fuse() => {}
}
}
};
Ok(Self {
farm_id,
farm_id_string,
total_sectors_count,
nats_client,
handlers,
background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true),
})
}
pub async fn run(self) -> anyhow::Result<()> {
Ok(self.background_tasks.await?)
}
}
#[derive(Debug)]
struct FarmDetails {
farm_id: FarmId,
farm_id_string: String,
total_sectors_count: SectorIndex,
piece_reader: Arc<dyn PieceReader + 'static>,
plotted_sectors: Arc<dyn PlottedSectors + 'static>,
_background_tasks: Option<AsyncJoinOnDrop<()>>,
}
pub fn farmer_service<F>(
nats_client: NatsClient,
farms: &[F],
identification_broadcast_interval: Duration,
primary_instance: bool,
) -> impl Future<Output = anyhow::Result<()>> + Send + 'static
where
F: Farm,
{
let farms_details = farms
.iter()
.map(|farm| {
let farm_id = *farm.id();
let nats_client = nats_client.clone();
let background_tasks = if primary_instance {
let (sector_updates_sender, mut sector_updates_receiver) =
mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
let (farming_notifications_sender, mut farming_notifications_receiver) =
mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
let (solutions_sender, mut solutions_receiver) =
mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
let sector_updates_handler_id =
farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| {
if let Err(error) = sector_updates_sender.clone().try_send(
ClusterFarmerSectorUpdateBroadcast {
farm_id,
sector_index: *sector_index,
sector_update: sector_update.clone(),
},
) {
warn!(%farm_id, %error, "Failed to send sector update notification");
}
}));
let farming_notifications_handler_id =
farm.on_farming_notification(Arc::new(move |farming_notification| {
if let Err(error) = farming_notifications_sender.clone().try_send(
ClusterFarmerFarmingNotificationBroadcast {
farm_id,
farming_notification: farming_notification.clone(),
},
) {
warn!(%farm_id, %error, "Failed to send farming notification");
}
}));
let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| {
if let Err(error) =
solutions_sender
.clone()
.try_send(ClusterFarmerSolutionBroadcast {
farm_id,
solution_response: solution_response.clone(),
})
{
warn!(%farm_id, %error, "Failed to send solution notification");
}
}));
Some(AsyncJoinOnDrop::new(
tokio::spawn(async move {
let farm_id_string = farm_id.to_string();
let sector_updates_fut = async {
while let Some(broadcast) = sector_updates_receiver.next().await {
if let Err(error) =
nats_client.broadcast(&broadcast, &farm_id_string).await
{
warn!(%farm_id, %error, "Failed to broadcast sector update");
}
}
};
let farming_notifications_fut = async {
while let Some(broadcast) = farming_notifications_receiver.next().await
{
if let Err(error) =
nats_client.broadcast(&broadcast, &farm_id_string).await
{
warn!(
%farm_id,
%error,
"Failed to broadcast farming notification"
);
}
}
};
let solutions_fut = async {
while let Some(broadcast) = solutions_receiver.next().await {
if let Err(error) =
nats_client.broadcast(&broadcast, &farm_id_string).await
{
warn!(%farm_id, %error, "Failed to broadcast solution");
}
}
};
select! {
_ = sector_updates_fut.fuse() => {}
_ = farming_notifications_fut.fuse() => {}
_ = solutions_fut.fuse() => {}
}
drop(sector_updates_handler_id);
drop(farming_notifications_handler_id);
drop(solutions_handler_id);
}),
true,
))
} else {
None
};
FarmDetails {
farm_id,
farm_id_string: farm_id.to_string(),
total_sectors_count: farm.total_sectors_count(),
piece_reader: farm.piece_reader(),
plotted_sectors: farm.plotted_sectors(),
_background_tasks: background_tasks,
}
})
.collect::<Vec<_>>();
async move {
if primary_instance {
select! {
result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => {
result
},
result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
result
},
result = read_piece_responder(&nats_client, &farms_details).fuse() => {
result
},
}
} else {
select! {
result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
result
},
result = read_piece_responder(&nats_client, &farms_details).fuse() => {
result
},
}
}
}
}
async fn identify_responder(
nats_client: &NatsClient,
farms_details: &[FarmDetails],
identification_broadcast_interval: Duration,
) -> anyhow::Result<()> {
let mut subscription = nats_client
.subscribe_to_broadcasts::<ClusterControllerFarmerIdentifyBroadcast>(None, None)
.await
.map_err(|error| {
anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}")
})?
.fuse();
let mut interval = tokio::time::interval(identification_broadcast_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut last_identification = Instant::now();
loop {
select! {
maybe_message = subscription.next() => {
let Some(message) = maybe_message else {
debug!("Identify broadcast stream ended");
break;
};
trace!(?message, "Farmer received identify broadcast message");
if last_identification.elapsed() < MIN_FARMER_IDENTIFICATION_INTERVAL {
continue;
}
last_identification = Instant::now();
send_identify_broadcast(nats_client, farms_details).await;
interval.reset();
}
_ = interval.tick().fuse() => {
last_identification = Instant::now();
trace!("Farmer self-identification");
send_identify_broadcast(nats_client, farms_details).await;
}
}
}
Ok(())
}
async fn send_identify_broadcast(nats_client: &NatsClient, farms_details: &[FarmDetails]) {
farms_details
.iter()
.map(|farm_details| async move {
if let Err(error) = nats_client
.broadcast(
&ClusterFarmerIdentifyFarmBroadcast {
farm_id: farm_details.farm_id,
total_sectors_count: farm_details.total_sectors_count,
fingerprint: blake3_hash_list(&[
&farm_details.farm_id.encode(),
&farm_details.total_sectors_count.to_le_bytes(),
]),
},
&farm_details.farm_id_string,
)
.await
{
warn!(
farm_id = %farm_details.farm_id,
%error,
"Failed to send farmer identify notification"
);
}
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await;
}
async fn plotted_sectors_responder(
nats_client: &NatsClient,
farms_details: &[FarmDetails],
) -> anyhow::Result<()> {
farms_details
.iter()
.map(|farm_details| async move {
nats_client
.stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
Some(&farm_details.farm_id_string),
Some(farm_details.farm_id_string.clone()),
|_request: ClusterFarmerPlottedSectorsRequest| async move {
Some(match farm_details.plotted_sectors.get().await {
Ok(plotted_sectors) => {
Box::pin(plotted_sectors.map(|maybe_plotted_sector| {
maybe_plotted_sector.map_err(|error| error.to_string())
})) as _
}
Err(error) => {
error!(
%error,
farm_id = %farm_details.farm_id,
"Failed to get plotted sectors"
);
Box::pin(stream::once(async move {
Err(format!("Failed to get plotted sectors: {error}"))
})) as _
}
})
},
)
.instrument(info_span!("", cache_id = %farm_details.farm_id))
.await
})
.collect::<FuturesUnordered<_>>()
.next()
.await
.ok_or_else(|| anyhow!("No farms"))?
}
async fn read_piece_responder(
nats_client: &NatsClient,
farms_details: &[FarmDetails],
) -> anyhow::Result<()> {
farms_details
.iter()
.map(|farm_details| async move {
nats_client
.request_responder(
Some(farm_details.farm_id_string.as_str()),
Some(farm_details.farm_id_string.clone()),
|request: ClusterFarmerReadPieceRequest| async move {
Some(
farm_details
.piece_reader
.read_piece(request.sector_index, request.piece_offset)
.await
.map_err(|error| error.to_string()),
)
},
)
.instrument(info_span!("", cache_id = %farm_details.farm_id))
.await
})
.collect::<FuturesUnordered<_>>()
.next()
.await
.ok_or_else(|| anyhow!("No farms"))?
}