subspace_farmer/node_client/
rpc_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
4use async_lock::Semaphore;
5use async_trait::async_trait;
6use futures::{Stream, StreamExt};
7use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
8use jsonrpsee::rpc_params;
9use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
10use std::pin::Pin;
11use std::sync::Arc;
12use subspace_core_primitives::pieces::{Piece, PieceIndex};
13use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
14use subspace_rpc_primitives::{
15 FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
16};
17
18const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
24
25#[derive(Debug, Clone)]
30pub struct RpcNodeClient {
31 client: Arc<WsClient>,
32 piece_request_semaphore: Arc<Semaphore>,
33}
34
35impl RpcNodeClient {
36 pub async fn new(url: &str) -> Result<Self, JsonError> {
38 let client = Arc::new(
39 WsClientBuilder::default()
40 .max_request_size(20 * 1024 * 1024)
41 .build(url)
42 .await?,
43 );
44 let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
45 Ok(Self {
46 client,
47 piece_request_semaphore,
48 })
49 }
50}
51
52#[async_trait]
53impl NodeClient for RpcNodeClient {
54 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
55 Ok(self
56 .client
57 .request("subspace_getFarmerAppInfo", rpc_params![])
58 .await?)
59 }
60
61 async fn subscribe_slot_info(
62 &self,
63 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
64 let subscription = self
65 .client
66 .subscribe(
67 "subspace_subscribeSlotInfo",
68 rpc_params![],
69 "subspace_unsubscribeSlotInfo",
70 )
71 .await?;
72
73 Ok(Box::pin(subscription.filter_map(
74 |slot_info_result| async move { slot_info_result.ok() },
75 )))
76 }
77
78 async fn submit_solution_response(
79 &self,
80 solution_response: SolutionResponse,
81 ) -> anyhow::Result<()> {
82 Ok(self
83 .client
84 .request(
85 "subspace_submitSolutionResponse",
86 rpc_params![&solution_response],
87 )
88 .await?)
89 }
90
91 async fn subscribe_reward_signing(
92 &self,
93 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
94 let subscription = self
95 .client
96 .subscribe(
97 "subspace_subscribeRewardSigning",
98 rpc_params![],
99 "subspace_unsubscribeRewardSigning",
100 )
101 .await?;
102
103 Ok(Box::pin(subscription.filter_map(
104 |reward_signing_info_result| async move { reward_signing_info_result.ok() },
105 )))
106 }
107
108 async fn submit_reward_signature(
110 &self,
111 reward_signature: RewardSignatureResponse,
112 ) -> anyhow::Result<()> {
113 Ok(self
114 .client
115 .request(
116 "subspace_submitRewardSignature",
117 rpc_params![&reward_signature],
118 )
119 .await?)
120 }
121
122 async fn subscribe_archived_segment_headers(
123 &self,
124 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
125 let subscription = self
126 .client
127 .subscribe(
128 "subspace_subscribeArchivedSegmentHeader",
129 rpc_params![],
130 "subspace_unsubscribeArchivedSegmentHeader",
131 )
132 .await?;
133
134 Ok(Box::pin(subscription.filter_map(
135 |archived_segment_header_result| async move { archived_segment_header_result.ok() },
136 )))
137 }
138
139 async fn segment_headers(
140 &self,
141 segment_indices: Vec<SegmentIndex>,
142 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
143 Ok(self
144 .client
145 .request("subspace_segmentHeaders", rpc_params![&segment_indices])
146 .await?)
147 }
148
149 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
150 let _permit = self.piece_request_semaphore.acquire().await;
151 let client = Arc::clone(&self.client);
152 let piece_fut = tokio::task::spawn(async move {
155 client
156 .request("subspace_piece", rpc_params![&piece_index])
157 .await
158 });
159 Ok(piece_fut.await??)
160 }
161
162 async fn acknowledge_archived_segment_header(
163 &self,
164 segment_index: SegmentIndex,
165 ) -> anyhow::Result<()> {
166 Ok(self
167 .client
168 .request(
169 "subspace_acknowledgeArchivedSegmentHeader",
170 rpc_params![&segment_index],
171 )
172 .await?)
173 }
174}
175
176#[async_trait]
177impl NodeClientExt for RpcNodeClient {
178 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
179 Ok(self
180 .client
181 .request("subspace_lastSegmentHeaders", rpc_params![limit])
182 .await?)
183 }
184}