subspace_farmer/node_client/
caching_proxy_node_client.rs

1//! Node client wrapper around another node client that caches some data for better performance and
2//! proxies other requests through
3
4use crate::node_client::{NodeClient, NodeClientExt};
5use async_lock::{
6    Mutex as AsyncMutex, RwLock as AsyncRwLock,
7    RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
8    RwLockWriteGuardArc as AsyncRwLockWriteGuard,
9};
10use async_trait::async_trait;
11use futures::{FutureExt, Stream, StreamExt, select};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use subspace_core_primitives::pieces::{Piece, PieceIndex};
16use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
17use subspace_process::AsyncJoinOnDrop;
18use subspace_rpc_primitives::{
19    FarmerAppInfo, MAX_SEGMENT_HEADERS_PER_REQUEST, RewardSignatureResponse, RewardSigningInfo,
20    SlotInfo, SolutionResponse,
21};
22use tokio::sync::watch;
23use tokio_stream::wrappers::WatchStream;
24use tracing::{info, trace, warn};
25
26const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
27const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
28
29#[derive(Debug, Default)]
30struct SegmentHeaders {
31    segment_headers: Vec<SegmentHeader>,
32    last_synced: Option<Instant>,
33}
34
35impl SegmentHeaders {
36    /// Push a new segment header to the cache, if it is the next segment header.
37    /// Otherwise, skip the push.
38    fn push(&mut self, archived_segment_header: SegmentHeader) {
39        if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize
40        {
41            self.segment_headers.push(archived_segment_header);
42        }
43    }
44
45    /// Get cached segment headers for the given segment indices.
46    ///
47    /// Returns `None` for segment indices that are not in the cache.
48    fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
49        segment_indices
50            .iter()
51            .map(|segment_index| {
52                self.segment_headers
53                    .get(u64::from(*segment_index) as usize)
54                    .copied()
55            })
56            .collect::<Vec<_>>()
57    }
58
59    /// Get the last `limit` segment headers from the cache.
60    fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
61        self.segment_headers
62            .iter()
63            .rev()
64            .take(limit as usize)
65            .rev()
66            .copied()
67            .map(Some)
68            .collect()
69    }
70
71    /// Get uncached headers from the node, if we're not rate-limited.
72    /// This only requires a read lock.
73    ///
74    /// Returns any extra segment headers if the download succeeds, or an error if it fails.
75    /// The caller must write the returned segment headers to the cache, and reset the sync
76    /// rate-limit timer.
77    async fn request_uncached_headers<NC>(&self, client: &NC) -> anyhow::Result<Vec<SegmentHeader>>
78    where
79        NC: NodeClient,
80    {
81        // Skip the sync if we're still within the sync rate limit.
82        if let Some(last_synced) = &self.last_synced
83            && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
84        {
85            return Ok(Vec::new());
86        }
87
88        let mut extra_segment_headers = Vec::new();
89        let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
90        let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
91
92        'outer: loop {
93            let from = segment_index_offset;
94            let to = segment_index_offset + segment_index_step;
95            trace!(%from, %to, "Requesting segment headers");
96
97            for maybe_segment_header in client
98                .segment_headers((from..to).collect::<Vec<_>>())
99                .await
100                .map_err(|error| {
101                    anyhow::anyhow!(
102                        "Failed to download segment headers {from}..{to} from node: {error}"
103                    )
104                })?
105            {
106                let Some(segment_header) = maybe_segment_header else {
107                    // Reached non-existent segment header
108                    break 'outer;
109                };
110
111                extra_segment_headers.push(segment_header);
112            }
113
114            segment_index_offset += segment_index_step;
115        }
116
117        Ok(extra_segment_headers)
118    }
119
120    /// Write the sync results to the cache, and reset the sync rate-limit timer.
121    fn write_cache(&mut self, extra_segment_headers: Vec<SegmentHeader>) {
122        for segment_header in extra_segment_headers {
123            self.push(segment_header);
124        }
125        self.last_synced.replace(Instant::now());
126    }
127}
128
129/// Node client wrapper around another node client that caches some data for better performance and
130/// proxies other requests through.
131///
132/// NOTE: Archived segment acknowledgement is ignored in this client, all subscriptions are
133/// acknowledged implicitly and immediately.
134/// NOTE: Subscription messages that are not processed in time will be skipped for performance
135/// reasons!
136#[derive(Debug, Clone)]
137pub struct CachingProxyNodeClient<NC> {
138    inner: NC,
139    slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
140    archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
141    reward_signing_receiver: watch::Receiver<Option<RewardSigningInfo>>,
142    segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
143    last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
144    _background_task: Arc<AsyncJoinOnDrop<()>>,
145}
146
147impl<NC> CachingProxyNodeClient<NC>
148where
149    NC: NodeClient + Clone,
150{
151    /// Create a new instance
152    pub async fn new(client: NC) -> anyhow::Result<Self> {
153        let mut segment_headers = SegmentHeaders::default();
154        let mut archived_segments_notifications =
155            client.subscribe_archived_segment_headers().await?;
156
157        info!("Downloading all segment headers from node...");
158        // No locking is needed, we are the first and only instance right now.
159        let headers = segment_headers.request_uncached_headers(&client).await?;
160        segment_headers.write_cache(headers);
161        info!("Downloaded all segment headers from node successfully");
162
163        let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
164
165        let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
166        let slot_info_proxy_fut = {
167            let mut slot_info_subscription = client.subscribe_slot_info().await?;
168
169            async move {
170                let mut last_slot_number = None;
171                while let Some(slot_info) = slot_info_subscription.next().await {
172                    if let Some(last_slot_number) = last_slot_number
173                        && last_slot_number >= slot_info.slot_number
174                    {
175                        continue;
176                    }
177                    last_slot_number.replace(slot_info.slot_number);
178
179                    if let Err(error) = slot_info_sender.send(Some(slot_info)) {
180                        warn!(%error, "Failed to proxy slot info notification");
181                        return;
182                    }
183                }
184            }
185        };
186
187        let (archived_segment_headers_sender, archived_segment_headers_receiver) =
188            watch::channel(None::<SegmentHeader>);
189        let segment_headers_maintenance_fut = {
190            let client = client.clone();
191            let segment_headers = Arc::clone(&segment_headers);
192
193            async move {
194                let mut last_archived_segment_index = None;
195                while let Some(archived_segment_header) =
196                    archived_segments_notifications.next().await
197                {
198                    let segment_index = archived_segment_header.segment_index();
199                    trace!(
200                        ?archived_segment_header,
201                        "New archived archived segment header notification"
202                    );
203
204                    while let Err(error) = client
205                        .acknowledge_archived_segment_header(segment_index)
206                        .await
207                    {
208                        warn!(
209                            %error,
210                            "Failed to acknowledge archived segment header, trying again"
211                        );
212                    }
213
214                    if let Some(last_archived_segment_index) = last_archived_segment_index
215                        && last_archived_segment_index >= segment_index
216                    {
217                        continue;
218                    }
219                    last_archived_segment_index.replace(segment_index);
220
221                    segment_headers.write().await.push(archived_segment_header);
222
223                    if let Err(error) =
224                        archived_segment_headers_sender.send(Some(archived_segment_header))
225                    {
226                        warn!(%error, "Failed to proxy archived segment header notification");
227                        return;
228                    }
229                }
230            }
231        };
232
233        let (reward_signing_sender, reward_signing_receiver) =
234            watch::channel(None::<RewardSigningInfo>);
235        let reward_signing_proxy_fut = {
236            let mut reward_signing_subscription = client.subscribe_reward_signing().await?;
237
238            async move {
239                while let Some(reward_signing_info) = reward_signing_subscription.next().await {
240                    if let Err(error) = reward_signing_sender.send(Some(reward_signing_info)) {
241                        warn!(%error, "Failed to proxy reward signing notification");
242                        return;
243                    }
244                }
245            }
246        };
247
248        let farmer_app_info = client
249            .farmer_app_info()
250            .await
251            .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
252        let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
253
254        let background_task = tokio::spawn(async move {
255            select! {
256                _ = slot_info_proxy_fut.fuse() => {},
257                _ = segment_headers_maintenance_fut.fuse() => {},
258                _ = reward_signing_proxy_fut.fuse() => {},
259            }
260        });
261
262        let node_client = Self {
263            inner: client,
264            slot_info_receiver,
265            archived_segment_headers_receiver,
266            reward_signing_receiver,
267            segment_headers,
268            last_farmer_app_info,
269            _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
270        };
271
272        Ok(node_client)
273    }
274}
275
276#[async_trait]
277impl<NC> NodeClient for CachingProxyNodeClient<NC>
278where
279    NC: NodeClient,
280{
281    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
282        let (last_farmer_app_info, last_farmer_app_info_request) =
283            &mut *self.last_farmer_app_info.lock().await;
284
285        if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
286            let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
287
288            *last_farmer_app_info = new_last_farmer_app_info;
289            *last_farmer_app_info_request = Instant::now();
290        }
291
292        Ok(last_farmer_app_info.clone())
293    }
294
295    async fn subscribe_slot_info(
296        &self,
297    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
298        Ok(Box::pin(
299            WatchStream::new(self.slot_info_receiver.clone())
300                .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
301        ))
302    }
303
304    async fn submit_solution_response(
305        &self,
306        solution_response: SolutionResponse,
307    ) -> anyhow::Result<()> {
308        self.inner.submit_solution_response(solution_response).await
309    }
310
311    async fn subscribe_reward_signing(
312        &self,
313    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
314        Ok(Box::pin(
315            WatchStream::new(self.reward_signing_receiver.clone())
316                .filter_map(|maybe_reward_signing_info| async move { maybe_reward_signing_info }),
317        ))
318    }
319
320    /// Submit a block signature
321    async fn submit_reward_signature(
322        &self,
323        reward_signature: RewardSignatureResponse,
324    ) -> anyhow::Result<()> {
325        self.inner.submit_reward_signature(reward_signature).await
326    }
327
328    async fn subscribe_archived_segment_headers(
329        &self,
330    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
331        Ok(Box::pin(
332            WatchStream::new(self.archived_segment_headers_receiver.clone())
333                .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
334        ))
335    }
336
337    /// Gets segment headers for the given segment indices, updating the cache from the node if
338    /// needed.
339    ///
340    /// Returns `None` for segment indices that are not in the cache.
341    async fn segment_headers(
342        &self,
343        segment_indices: Vec<SegmentIndex>,
344    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
345        let retrieved_segment_headers = self
346            .segment_headers
347            .read()
348            .await
349            .get_segment_headers(&segment_indices);
350
351        if retrieved_segment_headers.iter().all(Option::is_some) {
352            Ok(retrieved_segment_headers)
353        } else {
354            // We might be missing a requested segment header.
355            // Sync the cache with the node, applying a rate limit, and return cached segment headers.
356
357            // If we took a write lock here, a queue of writers could starve all the readers, even if
358            // those writers would be rate-limited. So we take an upgradable read lock for the rate
359            // limit check.
360            let segment_headers = self.segment_headers.upgradable_read_arc().await;
361
362            // Try again after acquiring the upgradeable read lock, in case another caller already
363            // synced the headers.
364            let retrieved_segment_headers = segment_headers.get_segment_headers(&segment_indices);
365            if retrieved_segment_headers.iter().all(Option::is_some) {
366                return Ok(retrieved_segment_headers);
367            }
368
369            // Try to sync the cache with the node.
370            let extra_segment_headers = segment_headers
371                .request_uncached_headers(&self.inner)
372                .await?;
373
374            if extra_segment_headers.is_empty() {
375                // No extra segment headers on the node, or we are rate-limited.
376                // So just return what we have in the cache.
377                return Ok(retrieved_segment_headers);
378            }
379
380            // We need to update the cached segment headers, so take the write lock.
381            let mut segment_headers =
382                AsyncRwLockUpgradableReadGuard::upgrade(segment_headers).await;
383            segment_headers.write_cache(extra_segment_headers);
384
385            // Downgrade the write lock to a read lock to get the updated segment headers for the
386            // query.
387            Ok(AsyncRwLockWriteGuard::downgrade(segment_headers)
388                .get_segment_headers(&segment_indices))
389        }
390    }
391
392    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
393        self.inner.piece(piece_index).await
394    }
395
396    async fn acknowledge_archived_segment_header(
397        &self,
398        _segment_index: SegmentIndex,
399    ) -> anyhow::Result<()> {
400        // Not supported
401        Ok(())
402    }
403}
404
405#[async_trait]
406impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
407where
408    NC: NodeClientExt,
409{
410    async fn cached_segment_headers(
411        &self,
412        segment_indices: Vec<SegmentIndex>,
413    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
414        // To avoid remote denial of service, we don't update the cache here, because it is called
415        // from network code.
416        Ok(self
417            .segment_headers
418            .read()
419            .await
420            .get_segment_headers(&segment_indices))
421    }
422
423    async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
424        Ok(self
425            .segment_headers
426            .read()
427            .await
428            .last_segment_headers(limit))
429    }
430}