subspace_farmer/node_client/
caching_proxy_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
5use async_lock::{
6 Mutex as AsyncMutex, RwLock as AsyncRwLock,
7 RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
8 RwLockWriteGuardArc as AsyncRwLockWriteGuard,
9};
10use async_trait::async_trait;
11use futures::{FutureExt, Stream, StreamExt, select};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use subspace_core_primitives::pieces::{Piece, PieceIndex};
16use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
17use subspace_process::AsyncJoinOnDrop;
18use subspace_rpc_primitives::{
19 FarmerAppInfo, MAX_SEGMENT_HEADERS_PER_REQUEST, RewardSignatureResponse, RewardSigningInfo,
20 SlotInfo, SolutionResponse,
21};
22use tokio::sync::watch;
23use tokio_stream::wrappers::WatchStream;
24use tracing::{info, trace, warn};
25
26const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
27const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
28
29#[derive(Debug, Default)]
30struct SegmentHeaders {
31 segment_headers: Vec<SegmentHeader>,
32 last_synced: Option<Instant>,
33}
34
35impl SegmentHeaders {
36 fn push(&mut self, archived_segment_header: SegmentHeader) {
39 if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize
40 {
41 self.segment_headers.push(archived_segment_header);
42 }
43 }
44
45 fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
49 segment_indices
50 .iter()
51 .map(|segment_index| {
52 self.segment_headers
53 .get(u64::from(*segment_index) as usize)
54 .copied()
55 })
56 .collect::<Vec<_>>()
57 }
58
59 fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
61 self.segment_headers
62 .iter()
63 .rev()
64 .take(limit as usize)
65 .rev()
66 .copied()
67 .map(Some)
68 .collect()
69 }
70
71 async fn request_uncached_headers<NC>(&self, client: &NC) -> anyhow::Result<Vec<SegmentHeader>>
78 where
79 NC: NodeClient,
80 {
81 if let Some(last_synced) = &self.last_synced
83 && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
84 {
85 return Ok(Vec::new());
86 }
87
88 let mut extra_segment_headers = Vec::new();
89 let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
90 let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
91
92 'outer: loop {
93 let from = segment_index_offset;
94 let to = segment_index_offset + segment_index_step;
95 trace!(%from, %to, "Requesting segment headers");
96
97 for maybe_segment_header in client
98 .segment_headers((from..to).collect::<Vec<_>>())
99 .await
100 .map_err(|error| {
101 anyhow::anyhow!(
102 "Failed to download segment headers {from}..{to} from node: {error}"
103 )
104 })?
105 {
106 let Some(segment_header) = maybe_segment_header else {
107 break 'outer;
109 };
110
111 extra_segment_headers.push(segment_header);
112 }
113
114 segment_index_offset += segment_index_step;
115 }
116
117 Ok(extra_segment_headers)
118 }
119
120 fn write_cache(&mut self, extra_segment_headers: Vec<SegmentHeader>) {
122 for segment_header in extra_segment_headers {
123 self.push(segment_header);
124 }
125 self.last_synced.replace(Instant::now());
126 }
127}
128
129#[derive(Debug, Clone)]
137pub struct CachingProxyNodeClient<NC> {
138 inner: NC,
139 slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
140 archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
141 reward_signing_receiver: watch::Receiver<Option<RewardSigningInfo>>,
142 segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
143 last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
144 _background_task: Arc<AsyncJoinOnDrop<()>>,
145}
146
147impl<NC> CachingProxyNodeClient<NC>
148where
149 NC: NodeClient + Clone,
150{
151 pub async fn new(client: NC) -> anyhow::Result<Self> {
153 let mut segment_headers = SegmentHeaders::default();
154 let mut archived_segments_notifications =
155 client.subscribe_archived_segment_headers().await?;
156
157 info!("Downloading all segment headers from node...");
158 let headers = segment_headers.request_uncached_headers(&client).await?;
160 segment_headers.write_cache(headers);
161 info!("Downloaded all segment headers from node successfully");
162
163 let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
164
165 let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
166 let slot_info_proxy_fut = {
167 let mut slot_info_subscription = client.subscribe_slot_info().await?;
168
169 async move {
170 let mut last_slot_number = None;
171 while let Some(slot_info) = slot_info_subscription.next().await {
172 if let Some(last_slot_number) = last_slot_number
173 && last_slot_number >= slot_info.slot_number
174 {
175 continue;
176 }
177 last_slot_number.replace(slot_info.slot_number);
178
179 if let Err(error) = slot_info_sender.send(Some(slot_info)) {
180 warn!(%error, "Failed to proxy slot info notification");
181 return;
182 }
183 }
184 }
185 };
186
187 let (archived_segment_headers_sender, archived_segment_headers_receiver) =
188 watch::channel(None::<SegmentHeader>);
189 let segment_headers_maintenance_fut = {
190 let client = client.clone();
191 let segment_headers = Arc::clone(&segment_headers);
192
193 async move {
194 let mut last_archived_segment_index = None;
195 while let Some(archived_segment_header) =
196 archived_segments_notifications.next().await
197 {
198 let segment_index = archived_segment_header.segment_index();
199 trace!(
200 ?archived_segment_header,
201 "New archived archived segment header notification"
202 );
203
204 while let Err(error) = client
205 .acknowledge_archived_segment_header(segment_index)
206 .await
207 {
208 warn!(
209 %error,
210 "Failed to acknowledge archived segment header, trying again"
211 );
212 }
213
214 if let Some(last_archived_segment_index) = last_archived_segment_index
215 && last_archived_segment_index >= segment_index
216 {
217 continue;
218 }
219 last_archived_segment_index.replace(segment_index);
220
221 segment_headers.write().await.push(archived_segment_header);
222
223 if let Err(error) =
224 archived_segment_headers_sender.send(Some(archived_segment_header))
225 {
226 warn!(%error, "Failed to proxy archived segment header notification");
227 return;
228 }
229 }
230 }
231 };
232
233 let (reward_signing_sender, reward_signing_receiver) =
234 watch::channel(None::<RewardSigningInfo>);
235 let reward_signing_proxy_fut = {
236 let mut reward_signing_subscription = client.subscribe_reward_signing().await?;
237
238 async move {
239 while let Some(reward_signing_info) = reward_signing_subscription.next().await {
240 if let Err(error) = reward_signing_sender.send(Some(reward_signing_info)) {
241 warn!(%error, "Failed to proxy reward signing notification");
242 return;
243 }
244 }
245 }
246 };
247
248 let farmer_app_info = client
249 .farmer_app_info()
250 .await
251 .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
252 let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
253
254 let background_task = tokio::spawn(async move {
255 select! {
256 _ = slot_info_proxy_fut.fuse() => {},
257 _ = segment_headers_maintenance_fut.fuse() => {},
258 _ = reward_signing_proxy_fut.fuse() => {},
259 }
260 });
261
262 let node_client = Self {
263 inner: client,
264 slot_info_receiver,
265 archived_segment_headers_receiver,
266 reward_signing_receiver,
267 segment_headers,
268 last_farmer_app_info,
269 _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
270 };
271
272 Ok(node_client)
273 }
274}
275
276#[async_trait]
277impl<NC> NodeClient for CachingProxyNodeClient<NC>
278where
279 NC: NodeClient,
280{
281 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
282 let (last_farmer_app_info, last_farmer_app_info_request) =
283 &mut *self.last_farmer_app_info.lock().await;
284
285 if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
286 let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
287
288 *last_farmer_app_info = new_last_farmer_app_info;
289 *last_farmer_app_info_request = Instant::now();
290 }
291
292 Ok(last_farmer_app_info.clone())
293 }
294
295 async fn subscribe_slot_info(
296 &self,
297 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
298 Ok(Box::pin(
299 WatchStream::new(self.slot_info_receiver.clone())
300 .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
301 ))
302 }
303
304 async fn submit_solution_response(
305 &self,
306 solution_response: SolutionResponse,
307 ) -> anyhow::Result<()> {
308 self.inner.submit_solution_response(solution_response).await
309 }
310
311 async fn subscribe_reward_signing(
312 &self,
313 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
314 Ok(Box::pin(
315 WatchStream::new(self.reward_signing_receiver.clone())
316 .filter_map(|maybe_reward_signing_info| async move { maybe_reward_signing_info }),
317 ))
318 }
319
320 async fn submit_reward_signature(
322 &self,
323 reward_signature: RewardSignatureResponse,
324 ) -> anyhow::Result<()> {
325 self.inner.submit_reward_signature(reward_signature).await
326 }
327
328 async fn subscribe_archived_segment_headers(
329 &self,
330 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
331 Ok(Box::pin(
332 WatchStream::new(self.archived_segment_headers_receiver.clone())
333 .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
334 ))
335 }
336
337 async fn segment_headers(
342 &self,
343 segment_indices: Vec<SegmentIndex>,
344 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
345 let retrieved_segment_headers = self
346 .segment_headers
347 .read()
348 .await
349 .get_segment_headers(&segment_indices);
350
351 if retrieved_segment_headers.iter().all(Option::is_some) {
352 Ok(retrieved_segment_headers)
353 } else {
354 let segment_headers = self.segment_headers.upgradable_read_arc().await;
361
362 let retrieved_segment_headers = segment_headers.get_segment_headers(&segment_indices);
365 if retrieved_segment_headers.iter().all(Option::is_some) {
366 return Ok(retrieved_segment_headers);
367 }
368
369 let extra_segment_headers = segment_headers
371 .request_uncached_headers(&self.inner)
372 .await?;
373
374 if extra_segment_headers.is_empty() {
375 return Ok(retrieved_segment_headers);
378 }
379
380 let mut segment_headers =
382 AsyncRwLockUpgradableReadGuard::upgrade(segment_headers).await;
383 segment_headers.write_cache(extra_segment_headers);
384
385 Ok(AsyncRwLockWriteGuard::downgrade(segment_headers)
388 .get_segment_headers(&segment_indices))
389 }
390 }
391
392 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
393 self.inner.piece(piece_index).await
394 }
395
396 async fn acknowledge_archived_segment_header(
397 &self,
398 _segment_index: SegmentIndex,
399 ) -> anyhow::Result<()> {
400 Ok(())
402 }
403}
404
405#[async_trait]
406impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
407where
408 NC: NodeClientExt,
409{
410 async fn cached_segment_headers(
411 &self,
412 segment_indices: Vec<SegmentIndex>,
413 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
414 Ok(self
417 .segment_headers
418 .read()
419 .await
420 .get_segment_headers(&segment_indices))
421 }
422
423 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
424 Ok(self
425 .segment_headers
426 .read()
427 .await
428 .last_segment_headers(limit))
429 }
430}