subspace_farmer/node_client/
rpc_node_client.rs

1//! Node client implementation that connects to node via RPC (WebSockets)
2
3use crate::node_client::{NodeClient, NodeClientExt};
4use async_lock::Semaphore;
5use async_trait::async_trait;
6use futures::{Stream, StreamExt};
7use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
8use jsonrpsee::rpc_params;
9use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
10use std::pin::Pin;
11use std::sync::Arc;
12use subspace_core_primitives::pieces::{Piece, PieceIndex};
13use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
14use subspace_rpc_primitives::{
15    FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
16};
17
18/// TODO: Node is having a hard time responding for many piece requests, specifically this results
19///  in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
20///  This needs to be removed after Substrate upgrade when we can take advantage of new Substrate
21///  API that will prevent subscription breakage:
22///  https://github.com/paritytech/jsonrpsee/issues/1409#issuecomment-2303914643
23const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
24
25/// Node client implementation that connects to node via RPC (WebSockets).
26///
27/// This implementation is supposed to be used on local network and not via public Internet due to
28/// sensitive contents.
29#[derive(Debug, Clone)]
30pub struct RpcNodeClient {
31    client: Arc<WsClient>,
32    piece_request_semaphore: Arc<Semaphore>,
33}
34
35impl RpcNodeClient {
36    /// Create a new instance of [`NodeClient`].
37    pub async fn new(url: &str) -> Result<Self, JsonError> {
38        let client = Arc::new(
39            WsClientBuilder::default()
40                .max_request_size(20 * 1024 * 1024)
41                .build(url)
42                .await?,
43        );
44        let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
45        Ok(Self {
46            client,
47            piece_request_semaphore,
48        })
49    }
50}
51
52#[async_trait]
53impl NodeClient for RpcNodeClient {
54    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
55        Ok(self
56            .client
57            .request("subspace_getFarmerAppInfo", rpc_params![])
58            .await?)
59    }
60
61    async fn subscribe_slot_info(
62        &self,
63    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
64        let subscription = self
65            .client
66            .subscribe(
67                "subspace_subscribeSlotInfo",
68                rpc_params![],
69                "subspace_unsubscribeSlotInfo",
70            )
71            .await?;
72
73        Ok(Box::pin(subscription.filter_map(
74            |slot_info_result| async move { slot_info_result.ok() },
75        )))
76    }
77
78    async fn submit_solution_response(
79        &self,
80        solution_response: SolutionResponse,
81    ) -> anyhow::Result<()> {
82        Ok(self
83            .client
84            .request(
85                "subspace_submitSolutionResponse",
86                rpc_params![&solution_response],
87            )
88            .await?)
89    }
90
91    async fn subscribe_reward_signing(
92        &self,
93    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
94        let subscription = self
95            .client
96            .subscribe(
97                "subspace_subscribeRewardSigning",
98                rpc_params![],
99                "subspace_unsubscribeRewardSigning",
100            )
101            .await?;
102
103        Ok(Box::pin(subscription.filter_map(
104            |reward_signing_info_result| async move { reward_signing_info_result.ok() },
105        )))
106    }
107
108    /// Submit a block signature
109    async fn submit_reward_signature(
110        &self,
111        reward_signature: RewardSignatureResponse,
112    ) -> anyhow::Result<()> {
113        Ok(self
114            .client
115            .request(
116                "subspace_submitRewardSignature",
117                rpc_params![&reward_signature],
118            )
119            .await?)
120    }
121
122    async fn subscribe_archived_segment_headers(
123        &self,
124    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
125        let subscription = self
126            .client
127            .subscribe(
128                "subspace_subscribeArchivedSegmentHeader",
129                rpc_params![],
130                "subspace_unsubscribeArchivedSegmentHeader",
131            )
132            .await?;
133
134        Ok(Box::pin(subscription.filter_map(
135            |archived_segment_header_result| async move { archived_segment_header_result.ok() },
136        )))
137    }
138
139    async fn segment_headers(
140        &self,
141        segment_indices: Vec<SegmentIndex>,
142    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
143        Ok(self
144            .client
145            .request("subspace_segmentHeaders", rpc_params![&segment_indices])
146            .await?)
147    }
148
149    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
150        let _permit = self.piece_request_semaphore.acquire().await;
151        let client = Arc::clone(&self.client);
152        // Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes
153        // issues for jsonrpsee
154        let piece_fut = tokio::task::spawn(async move {
155            client
156                .request("subspace_piece", rpc_params![&piece_index])
157                .await
158        });
159        Ok(piece_fut.await??)
160    }
161
162    async fn acknowledge_archived_segment_header(
163        &self,
164        segment_index: SegmentIndex,
165    ) -> anyhow::Result<()> {
166        Ok(self
167            .client
168            .request(
169                "subspace_acknowledgeArchivedSegmentHeader",
170                rpc_params![&segment_index],
171            )
172            .await?)
173    }
174}
175
176#[async_trait]
177impl NodeClientExt for RpcNodeClient {
178    async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
179        Ok(self
180            .client
181            .request("subspace_lastSegmentHeaders", rpc_params![limit])
182            .await?)
183    }
184}