subspace_farmer/node_client/
caching_proxy_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
7use async_trait::async_trait;
8use futures::{select, FutureExt, Stream, StreamExt};
9use std::pin::Pin;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use subspace_core_primitives::pieces::{Piece, PieceIndex};
13use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
14use subspace_rpc_primitives::{
15 FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
16 MAX_SEGMENT_HEADERS_PER_REQUEST,
17};
18use tokio::sync::watch;
19use tokio_stream::wrappers::WatchStream;
20use tracing::{info, trace, warn};
21
22const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
23const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
24
25#[derive(Debug, Default)]
26struct SegmentHeaders {
27 segment_headers: Vec<SegmentHeader>,
28 last_synced: Option<Instant>,
29}
30
31impl SegmentHeaders {
32 fn push(&mut self, archived_segment_header: SegmentHeader) {
33 if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize
34 {
35 self.segment_headers.push(archived_segment_header);
36 }
37 }
38
39 fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
40 segment_indices
41 .iter()
42 .map(|segment_index| {
43 self.segment_headers
44 .get(u64::from(*segment_index) as usize)
45 .copied()
46 })
47 .collect::<Vec<_>>()
48 }
49
50 fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
51 self.segment_headers
52 .iter()
53 .rev()
54 .take(limit as usize)
55 .rev()
56 .copied()
57 .map(Some)
58 .collect()
59 }
60
61 async fn sync<NC>(&mut self, client: &NC) -> anyhow::Result<()>
62 where
63 NC: NodeClient,
64 {
65 if let Some(last_synced) = &self.last_synced {
66 if last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL {
67 return Ok(());
68 }
69 }
70 self.last_synced.replace(Instant::now());
71
72 let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
73 let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
74
75 'outer: loop {
76 let from = segment_index_offset;
77 let to = segment_index_offset + segment_index_step;
78 trace!(%from, %to, "Requesting segment headers");
79
80 for maybe_segment_header in client
81 .segment_headers((from..to).collect::<Vec<_>>())
82 .await
83 .map_err(|error| {
84 anyhow::anyhow!(
85 "Failed to download segment headers {from}..{to} from node: {error}"
86 )
87 })?
88 {
89 let Some(segment_header) = maybe_segment_header else {
90 break 'outer;
92 };
93
94 self.push(segment_header);
95 }
96
97 segment_index_offset += segment_index_step;
98 }
99
100 Ok(())
101 }
102}
103
104#[derive(Debug, Clone)]
112pub struct CachingProxyNodeClient<NC> {
113 inner: NC,
114 slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
115 archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
116 reward_signing_receiver: watch::Receiver<Option<RewardSigningInfo>>,
117 segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
118 last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
119 _background_task: Arc<AsyncJoinOnDrop<()>>,
120}
121
122impl<NC> CachingProxyNodeClient<NC>
123where
124 NC: NodeClient + Clone,
125{
126 pub async fn new(client: NC) -> anyhow::Result<Self> {
128 let mut segment_headers = SegmentHeaders::default();
129 let mut archived_segments_notifications =
130 client.subscribe_archived_segment_headers().await?;
131
132 info!("Downloading all segment headers from node...");
133 segment_headers.sync(&client).await?;
134 info!("Downloaded all segment headers from node successfully");
135
136 let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
137
138 let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
139 let slot_info_proxy_fut = {
140 let mut slot_info_subscription = client.subscribe_slot_info().await?;
141
142 async move {
143 let mut last_slot_number = None;
144 while let Some(slot_info) = slot_info_subscription.next().await {
145 if let Some(last_slot_number) = last_slot_number
146 && last_slot_number >= slot_info.slot_number
147 {
148 continue;
149 }
150 last_slot_number.replace(slot_info.slot_number);
151
152 if let Err(error) = slot_info_sender.send(Some(slot_info)) {
153 warn!(%error, "Failed to proxy slot info notification");
154 return;
155 }
156 }
157 }
158 };
159
160 let (archived_segment_headers_sender, archived_segment_headers_receiver) =
161 watch::channel(None::<SegmentHeader>);
162 let segment_headers_maintenance_fut = {
163 let client = client.clone();
164 let segment_headers = Arc::clone(&segment_headers);
165
166 async move {
167 let mut last_archived_segment_index = None;
168 while let Some(archived_segment_header) =
169 archived_segments_notifications.next().await
170 {
171 let segment_index = archived_segment_header.segment_index();
172 trace!(
173 ?archived_segment_header,
174 "New archived archived segment header notification"
175 );
176
177 while let Err(error) = client
178 .acknowledge_archived_segment_header(segment_index)
179 .await
180 {
181 warn!(
182 %error,
183 "Failed to acknowledge archived segment header, trying again"
184 );
185 }
186
187 if let Some(last_archived_segment_index) = last_archived_segment_index
188 && last_archived_segment_index >= segment_index
189 {
190 continue;
191 }
192 last_archived_segment_index.replace(segment_index);
193
194 segment_headers.write().await.push(archived_segment_header);
195
196 if let Err(error) =
197 archived_segment_headers_sender.send(Some(archived_segment_header))
198 {
199 warn!(%error, "Failed to proxy archived segment header notification");
200 return;
201 }
202 }
203 }
204 };
205
206 let (reward_signing_sender, reward_signing_receiver) =
207 watch::channel(None::<RewardSigningInfo>);
208 let reward_signing_proxy_fut = {
209 let mut reward_signing_subscription = client.subscribe_reward_signing().await?;
210
211 async move {
212 while let Some(reward_signing_info) = reward_signing_subscription.next().await {
213 if let Err(error) = reward_signing_sender.send(Some(reward_signing_info)) {
214 warn!(%error, "Failed to proxy reward signing notification");
215 return;
216 }
217 }
218 }
219 };
220
221 let farmer_app_info = client
222 .farmer_app_info()
223 .await
224 .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
225 let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
226
227 let background_task = tokio::spawn(async move {
228 select! {
229 _ = slot_info_proxy_fut.fuse() => {},
230 _ = segment_headers_maintenance_fut.fuse() => {},
231 _ = reward_signing_proxy_fut.fuse() => {},
232 }
233 });
234
235 let node_client = Self {
236 inner: client,
237 slot_info_receiver,
238 archived_segment_headers_receiver,
239 reward_signing_receiver,
240 segment_headers,
241 last_farmer_app_info,
242 _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
243 };
244
245 Ok(node_client)
246 }
247}
248
249#[async_trait]
250impl<NC> NodeClient for CachingProxyNodeClient<NC>
251where
252 NC: NodeClient,
253{
254 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
255 let (last_farmer_app_info, last_farmer_app_info_request) =
256 &mut *self.last_farmer_app_info.lock().await;
257
258 if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
259 let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
260
261 *last_farmer_app_info = new_last_farmer_app_info;
262 *last_farmer_app_info_request = Instant::now();
263 }
264
265 Ok(last_farmer_app_info.clone())
266 }
267
268 async fn subscribe_slot_info(
269 &self,
270 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
271 Ok(Box::pin(
272 WatchStream::new(self.slot_info_receiver.clone())
273 .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
274 ))
275 }
276
277 async fn submit_solution_response(
278 &self,
279 solution_response: SolutionResponse,
280 ) -> anyhow::Result<()> {
281 self.inner.submit_solution_response(solution_response).await
282 }
283
284 async fn subscribe_reward_signing(
285 &self,
286 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
287 Ok(Box::pin(
288 WatchStream::new(self.reward_signing_receiver.clone())
289 .filter_map(|maybe_reward_signing_info| async move { maybe_reward_signing_info }),
290 ))
291 }
292
293 async fn submit_reward_signature(
295 &self,
296 reward_signature: RewardSignatureResponse,
297 ) -> anyhow::Result<()> {
298 self.inner.submit_reward_signature(reward_signature).await
299 }
300
301 async fn subscribe_archived_segment_headers(
302 &self,
303 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
304 Ok(Box::pin(
305 WatchStream::new(self.archived_segment_headers_receiver.clone())
306 .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
307 ))
308 }
309
310 async fn segment_headers(
311 &self,
312 segment_indices: Vec<SegmentIndex>,
313 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
314 let retrieved_segment_headers = self
315 .segment_headers
316 .read()
317 .await
318 .get_segment_headers(&segment_indices);
319
320 if retrieved_segment_headers.iter().all(Option::is_some) {
321 Ok(retrieved_segment_headers)
322 } else {
323 let mut segment_headers = self.segment_headers.write().await;
325 segment_headers.sync(&self.inner).await?;
326
327 Ok(segment_headers.get_segment_headers(&segment_indices))
328 }
329 }
330
331 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
332 self.inner.piece(piece_index).await
333 }
334
335 async fn acknowledge_archived_segment_header(
336 &self,
337 _segment_index: SegmentIndex,
338 ) -> anyhow::Result<()> {
339 Ok(())
341 }
342}
343
344#[async_trait]
345impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
346where
347 NC: NodeClientExt,
348{
349 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
350 Ok(self
351 .segment_headers
352 .read()
353 .await
354 .last_segment_headers(limit))
355 }
356}