subspace_farmer/node_client/
rpc_node_client.rsuse crate::node_client::{NodeClient, NodeClientExt};
use async_lock::Semaphore;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use std::pin::Pin;
use std::sync::Arc;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
#[derive(Debug, Clone)]
pub struct RpcNodeClient {
client: Arc<WsClient>,
piece_request_semaphore: Arc<Semaphore>,
}
impl RpcNodeClient {
pub async fn new(url: &str) -> Result<Self, JsonError> {
let client = Arc::new(
WsClientBuilder::default()
.max_request_size(20 * 1024 * 1024)
.build(url)
.await?,
);
let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
Ok(Self {
client,
piece_request_semaphore,
})
}
}
#[async_trait]
impl NodeClient for RpcNodeClient {
async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
Ok(self
.client
.request("subspace_getFarmerAppInfo", rpc_params![])
.await?)
}
async fn subscribe_slot_info(
&self,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
let subscription = self
.client
.subscribe(
"subspace_subscribeSlotInfo",
rpc_params![],
"subspace_unsubscribeSlotInfo",
)
.await?;
Ok(Box::pin(subscription.filter_map(
|slot_info_result| async move { slot_info_result.ok() },
)))
}
async fn submit_solution_response(
&self,
solution_response: SolutionResponse,
) -> anyhow::Result<()> {
Ok(self
.client
.request(
"subspace_submitSolutionResponse",
rpc_params![&solution_response],
)
.await?)
}
async fn subscribe_reward_signing(
&self,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
let subscription = self
.client
.subscribe(
"subspace_subscribeRewardSigning",
rpc_params![],
"subspace_unsubscribeRewardSigning",
)
.await?;
Ok(Box::pin(subscription.filter_map(
|reward_signing_info_result| async move { reward_signing_info_result.ok() },
)))
}
async fn submit_reward_signature(
&self,
reward_signature: RewardSignatureResponse,
) -> anyhow::Result<()> {
Ok(self
.client
.request(
"subspace_submitRewardSignature",
rpc_params![&reward_signature],
)
.await?)
}
async fn subscribe_archived_segment_headers(
&self,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
let subscription = self
.client
.subscribe(
"subspace_subscribeArchivedSegmentHeader",
rpc_params![],
"subspace_unsubscribeArchivedSegmentHeader",
)
.await?;
Ok(Box::pin(subscription.filter_map(
|archived_segment_header_result| async move { archived_segment_header_result.ok() },
)))
}
async fn segment_headers(
&self,
segment_indices: Vec<SegmentIndex>,
) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
Ok(self
.client
.request("subspace_segmentHeaders", rpc_params![&segment_indices])
.await?)
}
async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _permit = self.piece_request_semaphore.acquire().await;
let client = Arc::clone(&self.client);
let piece_fut = tokio::task::spawn(async move {
client
.request("subspace_piece", rpc_params![&piece_index])
.await
});
Ok(piece_fut.await??)
}
async fn acknowledge_archived_segment_header(
&self,
segment_index: SegmentIndex,
) -> anyhow::Result<()> {
Ok(self
.client
.request(
"subspace_acknowledgeArchivedSegmentHeader",
rpc_params![&segment_index],
)
.await?)
}
}
#[async_trait]
impl NodeClientExt for RpcNodeClient {
async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
Ok(self
.client
.request("subspace_lastSegmentHeaders", rpc_params![limit])
.await?)
}
}