subspace_farmer/node_client/
caching_proxy_node_client.rs

1//! Node client wrapper around another node client that caches some data for better performance and
2//! proxies other requests through
3
4use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
7use async_trait::async_trait;
8use futures::{select, FutureExt, Stream, StreamExt};
9use std::pin::Pin;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use subspace_core_primitives::pieces::{Piece, PieceIndex};
13use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
14use subspace_rpc_primitives::{
15    FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
16    MAX_SEGMENT_HEADERS_PER_REQUEST,
17};
18use tokio::sync::watch;
19use tokio_stream::wrappers::WatchStream;
20use tracing::{info, trace, warn};
21
22const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
23const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
24
25#[derive(Debug, Default)]
26struct SegmentHeaders {
27    segment_headers: Vec<SegmentHeader>,
28    last_synced: Option<Instant>,
29}
30
31impl SegmentHeaders {
32    fn push(&mut self, archived_segment_header: SegmentHeader) {
33        if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize
34        {
35            self.segment_headers.push(archived_segment_header);
36        }
37    }
38
39    fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
40        segment_indices
41            .iter()
42            .map(|segment_index| {
43                self.segment_headers
44                    .get(u64::from(*segment_index) as usize)
45                    .copied()
46            })
47            .collect::<Vec<_>>()
48    }
49
50    fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
51        self.segment_headers
52            .iter()
53            .rev()
54            .take(limit as usize)
55            .rev()
56            .copied()
57            .map(Some)
58            .collect()
59    }
60
61    async fn sync<NC>(&mut self, client: &NC) -> anyhow::Result<()>
62    where
63        NC: NodeClient,
64    {
65        if let Some(last_synced) = &self.last_synced {
66            if last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL {
67                return Ok(());
68            }
69        }
70        self.last_synced.replace(Instant::now());
71
72        let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
73        let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
74
75        'outer: loop {
76            let from = segment_index_offset;
77            let to = segment_index_offset + segment_index_step;
78            trace!(%from, %to, "Requesting segment headers");
79
80            for maybe_segment_header in client
81                .segment_headers((from..to).collect::<Vec<_>>())
82                .await
83                .map_err(|error| {
84                    anyhow::anyhow!(
85                        "Failed to download segment headers {from}..{to} from node: {error}"
86                    )
87                })?
88            {
89                let Some(segment_header) = maybe_segment_header else {
90                    // Reached non-existent segment header
91                    break 'outer;
92                };
93
94                self.push(segment_header);
95            }
96
97            segment_index_offset += segment_index_step;
98        }
99
100        Ok(())
101    }
102}
103
104/// Node client wrapper around another node client that caches some data for better performance and
105/// proxies other requests through.
106///
107/// NOTE: Archived segment acknowledgement is ignored in this client, all subscriptions are
108/// acknowledged implicitly and immediately.
109/// NOTE: Subscription messages that are not processed in time will be skipped for performance
110/// reasons!
111#[derive(Debug, Clone)]
112pub struct CachingProxyNodeClient<NC> {
113    inner: NC,
114    slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
115    archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
116    reward_signing_receiver: watch::Receiver<Option<RewardSigningInfo>>,
117    segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
118    last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
119    _background_task: Arc<AsyncJoinOnDrop<()>>,
120}
121
122impl<NC> CachingProxyNodeClient<NC>
123where
124    NC: NodeClient + Clone,
125{
126    /// Create a new instance
127    pub async fn new(client: NC) -> anyhow::Result<Self> {
128        let mut segment_headers = SegmentHeaders::default();
129        let mut archived_segments_notifications =
130            client.subscribe_archived_segment_headers().await?;
131
132        info!("Downloading all segment headers from node...");
133        segment_headers.sync(&client).await?;
134        info!("Downloaded all segment headers from node successfully");
135
136        let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
137
138        let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
139        let slot_info_proxy_fut = {
140            let mut slot_info_subscription = client.subscribe_slot_info().await?;
141
142            async move {
143                let mut last_slot_number = None;
144                while let Some(slot_info) = slot_info_subscription.next().await {
145                    if let Some(last_slot_number) = last_slot_number
146                        && last_slot_number >= slot_info.slot_number
147                    {
148                        continue;
149                    }
150                    last_slot_number.replace(slot_info.slot_number);
151
152                    if let Err(error) = slot_info_sender.send(Some(slot_info)) {
153                        warn!(%error, "Failed to proxy slot info notification");
154                        return;
155                    }
156                }
157            }
158        };
159
160        let (archived_segment_headers_sender, archived_segment_headers_receiver) =
161            watch::channel(None::<SegmentHeader>);
162        let segment_headers_maintenance_fut = {
163            let client = client.clone();
164            let segment_headers = Arc::clone(&segment_headers);
165
166            async move {
167                let mut last_archived_segment_index = None;
168                while let Some(archived_segment_header) =
169                    archived_segments_notifications.next().await
170                {
171                    let segment_index = archived_segment_header.segment_index();
172                    trace!(
173                        ?archived_segment_header,
174                        "New archived archived segment header notification"
175                    );
176
177                    while let Err(error) = client
178                        .acknowledge_archived_segment_header(segment_index)
179                        .await
180                    {
181                        warn!(
182                            %error,
183                            "Failed to acknowledge archived segment header, trying again"
184                        );
185                    }
186
187                    if let Some(last_archived_segment_index) = last_archived_segment_index
188                        && last_archived_segment_index >= segment_index
189                    {
190                        continue;
191                    }
192                    last_archived_segment_index.replace(segment_index);
193
194                    segment_headers.write().await.push(archived_segment_header);
195
196                    if let Err(error) =
197                        archived_segment_headers_sender.send(Some(archived_segment_header))
198                    {
199                        warn!(%error, "Failed to proxy archived segment header notification");
200                        return;
201                    }
202                }
203            }
204        };
205
206        let (reward_signing_sender, reward_signing_receiver) =
207            watch::channel(None::<RewardSigningInfo>);
208        let reward_signing_proxy_fut = {
209            let mut reward_signing_subscription = client.subscribe_reward_signing().await?;
210
211            async move {
212                while let Some(reward_signing_info) = reward_signing_subscription.next().await {
213                    if let Err(error) = reward_signing_sender.send(Some(reward_signing_info)) {
214                        warn!(%error, "Failed to proxy reward signing notification");
215                        return;
216                    }
217                }
218            }
219        };
220
221        let farmer_app_info = client
222            .farmer_app_info()
223            .await
224            .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
225        let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
226
227        let background_task = tokio::spawn(async move {
228            select! {
229                _ = slot_info_proxy_fut.fuse() => {},
230                _ = segment_headers_maintenance_fut.fuse() => {},
231                _ = reward_signing_proxy_fut.fuse() => {},
232            }
233        });
234
235        let node_client = Self {
236            inner: client,
237            slot_info_receiver,
238            archived_segment_headers_receiver,
239            reward_signing_receiver,
240            segment_headers,
241            last_farmer_app_info,
242            _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
243        };
244
245        Ok(node_client)
246    }
247}
248
249#[async_trait]
250impl<NC> NodeClient for CachingProxyNodeClient<NC>
251where
252    NC: NodeClient,
253{
254    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
255        let (last_farmer_app_info, last_farmer_app_info_request) =
256            &mut *self.last_farmer_app_info.lock().await;
257
258        if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
259            let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
260
261            *last_farmer_app_info = new_last_farmer_app_info;
262            *last_farmer_app_info_request = Instant::now();
263        }
264
265        Ok(last_farmer_app_info.clone())
266    }
267
268    async fn subscribe_slot_info(
269        &self,
270    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
271        Ok(Box::pin(
272            WatchStream::new(self.slot_info_receiver.clone())
273                .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
274        ))
275    }
276
277    async fn submit_solution_response(
278        &self,
279        solution_response: SolutionResponse,
280    ) -> anyhow::Result<()> {
281        self.inner.submit_solution_response(solution_response).await
282    }
283
284    async fn subscribe_reward_signing(
285        &self,
286    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
287        Ok(Box::pin(
288            WatchStream::new(self.reward_signing_receiver.clone())
289                .filter_map(|maybe_reward_signing_info| async move { maybe_reward_signing_info }),
290        ))
291    }
292
293    /// Submit a block signature
294    async fn submit_reward_signature(
295        &self,
296        reward_signature: RewardSignatureResponse,
297    ) -> anyhow::Result<()> {
298        self.inner.submit_reward_signature(reward_signature).await
299    }
300
301    async fn subscribe_archived_segment_headers(
302        &self,
303    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
304        Ok(Box::pin(
305            WatchStream::new(self.archived_segment_headers_receiver.clone())
306                .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
307        ))
308    }
309
310    async fn segment_headers(
311        &self,
312        segment_indices: Vec<SegmentIndex>,
313    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
314        let retrieved_segment_headers = self
315            .segment_headers
316            .read()
317            .await
318            .get_segment_headers(&segment_indices);
319
320        if retrieved_segment_headers.iter().all(Option::is_some) {
321            Ok(retrieved_segment_headers)
322        } else {
323            // Re-sync segment headers
324            let mut segment_headers = self.segment_headers.write().await;
325            segment_headers.sync(&self.inner).await?;
326
327            Ok(segment_headers.get_segment_headers(&segment_indices))
328        }
329    }
330
331    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
332        self.inner.piece(piece_index).await
333    }
334
335    async fn acknowledge_archived_segment_header(
336        &self,
337        _segment_index: SegmentIndex,
338    ) -> anyhow::Result<()> {
339        // Not supported
340        Ok(())
341    }
342}
343
344#[async_trait]
345impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
346where
347    NC: NodeClientExt,
348{
349    async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
350        Ok(self
351            .segment_headers
352            .read()
353            .await
354            .last_segment_headers(limit))
355    }
356}