subspace_farmer/node_client/
caching_proxy_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
5use async_lock::{
6 Mutex as AsyncMutex, RwLock as AsyncRwLock,
7 RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
8 RwLockWriteGuardArc as AsyncRwLockWriteGuard,
9};
10use async_trait::async_trait;
11use futures::{FutureExt, Stream, StreamExt, select};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use subspace_core_primitives::pieces::{Piece, PieceIndex};
16use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
17use subspace_process::AsyncJoinOnDrop;
18use subspace_rpc_primitives::{
19 FarmerAppInfo, MAX_SEGMENT_HEADERS_PER_REQUEST, RewardSignatureResponse, RewardSigningInfo,
20 SlotInfo, SolutionResponse,
21};
22use tokio::sync::watch;
23use tokio_stream::wrappers::WatchStream;
24use tracing::{info, trace, warn};
25
26const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
27const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
28
29#[derive(Debug, Default)]
30struct SegmentHeaders {
31 segment_headers: Vec<SegmentHeader>,
32 last_synced: Option<Instant>,
33}
34
35impl SegmentHeaders {
36 fn push(&mut self, archived_segment_header: SegmentHeader) {
39 if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize
40 {
41 self.segment_headers.push(archived_segment_header);
42 }
43 }
44
45 fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
49 segment_indices
50 .iter()
51 .map(|segment_index| {
52 self.segment_headers
53 .get(u64::from(*segment_index) as usize)
54 .copied()
55 })
56 .collect::<Vec<_>>()
57 }
58
59 fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
61 self.segment_headers
62 .iter()
63 .rev()
64 .take(limit as usize)
65 .rev()
66 .copied()
67 .map(Some)
68 .collect()
69 }
70
71 async fn request_uncached_headers<NC>(&self, client: &NC) -> anyhow::Result<Vec<SegmentHeader>>
78 where
79 NC: NodeClient,
80 {
81 if let Some(last_synced) = &self.last_synced
83 && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
84 {
85 return Ok(Vec::new());
86 }
87
88 let mut extra_segment_headers = Vec::new();
89 let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
90 let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
91
92 'outer: loop {
93 let from = segment_index_offset;
94 let to = segment_index_offset + segment_index_step;
95 trace!(%from, %to, "Requesting segment headers");
96
97 for maybe_segment_header in client
98 .segment_headers((from..to).collect::<Vec<_>>())
99 .await
100 .map_err(|error| {
101 anyhow::anyhow!(
102 "Failed to download segment headers {from}..{to} from node: {error}"
103 )
104 })?
105 {
106 let Some(segment_header) = maybe_segment_header else {
107 break 'outer;
109 };
110
111 extra_segment_headers.push(segment_header);
112 }
113
114 segment_index_offset += segment_index_step;
115 }
116
117 Ok(extra_segment_headers)
118 }
119
120 fn write_cache(&mut self, extra_segment_headers: Vec<SegmentHeader>) {
122 self.segment_headers.extend(extra_segment_headers);
123 self.last_synced.replace(Instant::now());
124 }
125}
126
127#[derive(Debug, Clone)]
135pub struct CachingProxyNodeClient<NC> {
136 inner: NC,
137 slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
138 archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
139 reward_signing_receiver: watch::Receiver<Option<RewardSigningInfo>>,
140 segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
141 last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
142 _background_task: Arc<AsyncJoinOnDrop<()>>,
143}
144
145impl<NC> CachingProxyNodeClient<NC>
146where
147 NC: NodeClient + Clone,
148{
149 pub async fn new(client: NC) -> anyhow::Result<Self> {
151 let mut segment_headers = SegmentHeaders::default();
152 let mut archived_segments_notifications =
153 client.subscribe_archived_segment_headers().await?;
154
155 info!("Downloading all segment headers from node...");
156 let headers = segment_headers.request_uncached_headers(&client).await?;
158 segment_headers.write_cache(headers);
159 info!("Downloaded all segment headers from node successfully");
160
161 let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
162
163 let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
164 let slot_info_proxy_fut = {
165 let mut slot_info_subscription = client.subscribe_slot_info().await?;
166
167 async move {
168 let mut last_slot_number = None;
169 while let Some(slot_info) = slot_info_subscription.next().await {
170 if let Some(last_slot_number) = last_slot_number
171 && last_slot_number >= slot_info.slot_number
172 {
173 continue;
174 }
175 last_slot_number.replace(slot_info.slot_number);
176
177 if let Err(error) = slot_info_sender.send(Some(slot_info)) {
178 warn!(%error, "Failed to proxy slot info notification");
179 return;
180 }
181 }
182 }
183 };
184
185 let (archived_segment_headers_sender, archived_segment_headers_receiver) =
186 watch::channel(None::<SegmentHeader>);
187 let segment_headers_maintenance_fut = {
188 let client = client.clone();
189 let segment_headers = Arc::clone(&segment_headers);
190
191 async move {
192 let mut last_archived_segment_index = None;
193 while let Some(archived_segment_header) =
194 archived_segments_notifications.next().await
195 {
196 let segment_index = archived_segment_header.segment_index();
197 trace!(
198 ?archived_segment_header,
199 "New archived archived segment header notification"
200 );
201
202 while let Err(error) = client
203 .acknowledge_archived_segment_header(segment_index)
204 .await
205 {
206 warn!(
207 %error,
208 "Failed to acknowledge archived segment header, trying again"
209 );
210 }
211
212 if let Some(last_archived_segment_index) = last_archived_segment_index
213 && last_archived_segment_index >= segment_index
214 {
215 continue;
216 }
217 last_archived_segment_index.replace(segment_index);
218
219 segment_headers.write().await.push(archived_segment_header);
220
221 if let Err(error) =
222 archived_segment_headers_sender.send(Some(archived_segment_header))
223 {
224 warn!(%error, "Failed to proxy archived segment header notification");
225 return;
226 }
227 }
228 }
229 };
230
231 let (reward_signing_sender, reward_signing_receiver) =
232 watch::channel(None::<RewardSigningInfo>);
233 let reward_signing_proxy_fut = {
234 let mut reward_signing_subscription = client.subscribe_reward_signing().await?;
235
236 async move {
237 while let Some(reward_signing_info) = reward_signing_subscription.next().await {
238 if let Err(error) = reward_signing_sender.send(Some(reward_signing_info)) {
239 warn!(%error, "Failed to proxy reward signing notification");
240 return;
241 }
242 }
243 }
244 };
245
246 let farmer_app_info = client
247 .farmer_app_info()
248 .await
249 .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
250 let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
251
252 let background_task = tokio::spawn(async move {
253 select! {
254 _ = slot_info_proxy_fut.fuse() => {},
255 _ = segment_headers_maintenance_fut.fuse() => {},
256 _ = reward_signing_proxy_fut.fuse() => {},
257 }
258 });
259
260 let node_client = Self {
261 inner: client,
262 slot_info_receiver,
263 archived_segment_headers_receiver,
264 reward_signing_receiver,
265 segment_headers,
266 last_farmer_app_info,
267 _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
268 };
269
270 Ok(node_client)
271 }
272}
273
274#[async_trait]
275impl<NC> NodeClient for CachingProxyNodeClient<NC>
276where
277 NC: NodeClient,
278{
279 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
280 let (last_farmer_app_info, last_farmer_app_info_request) =
281 &mut *self.last_farmer_app_info.lock().await;
282
283 if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
284 let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
285
286 *last_farmer_app_info = new_last_farmer_app_info;
287 *last_farmer_app_info_request = Instant::now();
288 }
289
290 Ok(last_farmer_app_info.clone())
291 }
292
293 async fn subscribe_slot_info(
294 &self,
295 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
296 Ok(Box::pin(
297 WatchStream::new(self.slot_info_receiver.clone())
298 .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
299 ))
300 }
301
302 async fn submit_solution_response(
303 &self,
304 solution_response: SolutionResponse,
305 ) -> anyhow::Result<()> {
306 self.inner.submit_solution_response(solution_response).await
307 }
308
309 async fn subscribe_reward_signing(
310 &self,
311 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
312 Ok(Box::pin(
313 WatchStream::new(self.reward_signing_receiver.clone())
314 .filter_map(|maybe_reward_signing_info| async move { maybe_reward_signing_info }),
315 ))
316 }
317
318 async fn submit_reward_signature(
320 &self,
321 reward_signature: RewardSignatureResponse,
322 ) -> anyhow::Result<()> {
323 self.inner.submit_reward_signature(reward_signature).await
324 }
325
326 async fn subscribe_archived_segment_headers(
327 &self,
328 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
329 Ok(Box::pin(
330 WatchStream::new(self.archived_segment_headers_receiver.clone())
331 .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
332 ))
333 }
334
335 async fn segment_headers(
340 &self,
341 segment_indices: Vec<SegmentIndex>,
342 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
343 let retrieved_segment_headers = self
344 .segment_headers
345 .read()
346 .await
347 .get_segment_headers(&segment_indices);
348
349 if retrieved_segment_headers.iter().all(Option::is_some) {
350 Ok(retrieved_segment_headers)
351 } else {
352 let segment_headers = self.segment_headers.upgradable_read_arc().await;
359
360 let retrieved_segment_headers = segment_headers.get_segment_headers(&segment_indices);
363 if retrieved_segment_headers.iter().all(Option::is_some) {
364 return Ok(retrieved_segment_headers);
365 }
366
367 let extra_segment_headers = segment_headers
369 .request_uncached_headers(&self.inner)
370 .await?;
371
372 if extra_segment_headers.is_empty() {
373 return Ok(retrieved_segment_headers);
376 }
377
378 let mut segment_headers =
380 AsyncRwLockUpgradableReadGuard::upgrade(segment_headers).await;
381 segment_headers.write_cache(extra_segment_headers);
382
383 Ok(AsyncRwLockWriteGuard::downgrade(segment_headers)
386 .get_segment_headers(&segment_indices))
387 }
388 }
389
390 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
391 self.inner.piece(piece_index).await
392 }
393
394 async fn acknowledge_archived_segment_header(
395 &self,
396 _segment_index: SegmentIndex,
397 ) -> anyhow::Result<()> {
398 Ok(())
400 }
401}
402
403#[async_trait]
404impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
405where
406 NC: NodeClientExt,
407{
408 async fn cached_segment_headers(
409 &self,
410 segment_indices: Vec<SegmentIndex>,
411 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
412 Ok(self
415 .segment_headers
416 .read()
417 .await
418 .get_segment_headers(&segment_indices))
419 }
420
421 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
422 Ok(self
423 .segment_headers
424 .read()
425 .await
426 .last_segment_headers(limit))
427 }
428}