subspace_farmer/cluster/
cache.rs

1//! Farming cluster cache
2//!
3//! Cache is responsible for caching pieces within allocated space to accelerate plotting and serve
4//! pieces in response to DSN requests.
5//!
6//! This module exposes some data structures for NATS communication, custom piece cache
7//! implementation designed to work with cluster cache and a service function to drive the backend
8//! part of the cache.
9
10use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
11use crate::cluster::nats_client::{
12    GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset};
15use anyhow::anyhow;
16use async_trait::async_trait;
17use derive_more::{Display, From};
18use futures::stream::FuturesUnordered;
19use futures::{select, stream, FutureExt, Stream, StreamExt};
20use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
21use std::collections::BTreeSet;
22use std::pin::Pin;
23use std::task::Poll;
24use std::time::{Duration, Instant};
25use subspace_core_primitives::pieces::{Piece, PieceIndex};
26use tokio::time::MissedTickBehavior;
27use tracing::{debug, error, info, info_span, trace, warn, Instrument};
28use ulid::Ulid;
29
30const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
31
32/// An identifier for a cluster cache, can be used for in logs, thread names, etc.
33#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
34pub enum ClusterCacheId {
35    /// Cache ID
36    Ulid(Ulid),
37}
38
39impl Encode for ClusterCacheId {
40    #[inline]
41    fn size_hint(&self) -> usize {
42        1_usize
43            + match self {
44                ClusterCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
45            }
46    }
47
48    #[inline]
49    fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
50        match self {
51            ClusterCacheId::Ulid(ulid) => {
52                output.push_byte(0);
53                Encode::encode_to(&ulid.0, output);
54            }
55        }
56    }
57}
58
59impl EncodeLike for ClusterCacheId {}
60
61impl Decode for ClusterCacheId {
62    #[inline]
63    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
64        match input.read_byte().map_err(|e| {
65            e.chain("Could not decode `ClusterCacheId`, failed to read variant byte")
66        })? {
67            0 => u128::decode(input)
68                .map(|ulid| ClusterCacheId::Ulid(Ulid(ulid)))
69                .map_err(|e| e.chain("Could not decode `ClusterCacheId::Ulid.0`")),
70            _ => Err("Could not decode `ClusterCacheId`, variant doesn't exist".into()),
71        }
72    }
73}
74
75#[allow(clippy::new_without_default)]
76impl ClusterCacheId {
77    /// Creates new ID
78    #[inline]
79    pub fn new() -> Self {
80        Self::Ulid(Ulid::new())
81    }
82}
83
84/// Broadcast with identification details by caches
85#[derive(Debug, Clone, Encode, Decode)]
86pub struct ClusterCacheIdentifyBroadcast {
87    /// Cache ID
88    pub cluster_cache_id: ClusterCacheId,
89}
90
91impl GenericBroadcast for ClusterCacheIdentifyBroadcast {
92    /// `*` here stands for cache group
93    const SUBJECT: &'static str = "subspace.cache.*.cache-identify";
94}
95
96/// Request cache details from cache
97#[derive(Debug, Clone, Encode, Decode)]
98pub struct ClusterCacheDetailsRequest;
99
100impl GenericStreamRequest for ClusterCacheDetailsRequest {
101    /// `*` here stands for piece cache ID
102    const SUBJECT: &'static str = "subspace.cache.*.details";
103    type Response = ClusterPieceCacheDetails;
104}
105
106/// Cache details
107#[derive(Debug, Clone, Encode, Decode)]
108pub struct ClusterPieceCacheDetails {
109    /// Piece Cache ID
110    pub piece_cache_id: PieceCacheId,
111    /// Max number of elements in this cache
112    pub max_num_elements: u32,
113}
114
115/// Write piece into cache
116#[derive(Debug, Clone, Encode, Decode)]
117struct ClusterCacheWritePieceRequest {
118    offset: PieceCacheOffset,
119    piece_index: PieceIndex,
120    piece: Piece,
121}
122
123impl GenericRequest for ClusterCacheWritePieceRequest {
124    /// `*` here stands for piece cache ID
125    const SUBJECT: &'static str = "subspace.cache.*.write-piece";
126    type Response = Result<(), String>;
127}
128
129/// Read piece index from cache
130#[derive(Debug, Clone, Encode, Decode)]
131struct ClusterCacheReadPieceIndexRequest {
132    offset: PieceCacheOffset,
133}
134
135impl GenericRequest for ClusterCacheReadPieceIndexRequest {
136    /// `*` here stands for piece cache ID
137    const SUBJECT: &'static str = "subspace.cache.*.read-piece-index";
138    type Response = Result<Option<PieceIndex>, String>;
139}
140
141/// Read piece from cache
142#[derive(Debug, Clone, Encode, Decode)]
143pub(super) struct ClusterCacheReadPieceRequest {
144    pub(super) offset: PieceCacheOffset,
145}
146
147impl GenericRequest for ClusterCacheReadPieceRequest {
148    /// `*` here stands for piece cache ID
149    const SUBJECT: &'static str = "subspace.cache.*.read-piece";
150    type Response = Result<Option<(PieceIndex, Piece)>, String>;
151}
152
153/// Read piece from cache
154#[derive(Debug, Clone, Encode, Decode)]
155pub(super) struct ClusterCacheReadPiecesRequest {
156    pub(super) offsets: Vec<PieceCacheOffset>,
157}
158
159impl GenericStreamRequest for ClusterCacheReadPiecesRequest {
160    /// `*` here stands for piece cache ID
161    const SUBJECT: &'static str = "subspace.cache.*.read-pieces";
162    type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>;
163}
164
165/// Collect plotted pieces from farmer
166#[derive(Debug, Clone, Encode, Decode)]
167struct ClusterCacheContentsRequest;
168
169impl GenericStreamRequest for ClusterCacheContentsRequest {
170    /// `*` here stands for piece cache ID
171    const SUBJECT: &'static str = "subspace.cache.*.contents";
172    type Response = Result<(PieceCacheOffset, Option<PieceIndex>), String>;
173}
174
175/// Cluster cache implementation
176#[derive(Debug)]
177pub struct ClusterPieceCache {
178    piece_cache_id: PieceCacheId,
179    piece_cache_id_string: String,
180    max_num_elements: u32,
181    nats_client: NatsClient,
182}
183
184#[async_trait]
185impl PieceCache for ClusterPieceCache {
186    fn id(&self) -> &PieceCacheId {
187        &self.piece_cache_id
188    }
189
190    #[inline]
191    fn max_num_elements(&self) -> u32 {
192        self.max_num_elements
193    }
194
195    async fn contents(
196        &self,
197    ) -> Result<
198        Box<
199            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
200                + Unpin
201                + Send
202                + '_,
203        >,
204        FarmError,
205    > {
206        Ok(Box::new(
207            self.nats_client
208                .stream_request(
209                    &ClusterCacheContentsRequest,
210                    Some(&self.piece_cache_id_string),
211                )
212                .await?
213                .map(|response| response.map_err(FarmError::from)),
214        ))
215    }
216
217    async fn write_piece(
218        &self,
219        offset: PieceCacheOffset,
220        piece_index: PieceIndex,
221        piece: &Piece,
222    ) -> Result<(), FarmError> {
223        Ok(self
224            .nats_client
225            .request(
226                &ClusterCacheWritePieceRequest {
227                    offset,
228                    piece_index,
229                    piece: piece.clone(),
230                },
231                Some(&self.piece_cache_id_string),
232            )
233            .await??)
234    }
235
236    async fn read_piece_index(
237        &self,
238        offset: PieceCacheOffset,
239    ) -> Result<Option<PieceIndex>, FarmError> {
240        Ok(self
241            .nats_client
242            .request(
243                &ClusterCacheReadPieceIndexRequest { offset },
244                Some(&self.piece_cache_id_string),
245            )
246            .await??)
247    }
248
249    async fn read_piece(
250        &self,
251        offset: PieceCacheOffset,
252    ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
253        Ok(self
254            .nats_client
255            .request(
256                &ClusterCacheReadPieceRequest { offset },
257                Some(&self.piece_cache_id_string),
258            )
259            .await??)
260    }
261
262    async fn read_pieces(
263        &self,
264        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
265    ) -> Result<
266        Box<
267            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
268                + Send
269                + Unpin
270                + '_,
271        >,
272        FarmError,
273    > {
274        let offsets = offsets.collect::<Vec<_>>();
275        let mut offsets_set = BTreeSet::from_iter(offsets.iter().copied());
276        let mut stream = self
277            .nats_client
278            .stream_request(
279                &ClusterCacheReadPiecesRequest { offsets },
280                Some(&self.piece_cache_id_string),
281            )
282            .await?
283            .map(|response| response.map_err(FarmError::from))
284            .fuse();
285        Ok(Box::new(stream::poll_fn(move |cx| {
286            if !stream.is_done() {
287                match stream.poll_next_unpin(cx) {
288                    Poll::Ready(Some(response)) => {
289                        return Poll::Ready(Some(response.inspect(|(offset, _)| {
290                            offsets_set.remove(offset);
291                        })));
292                    }
293                    Poll::Ready(None) => {
294                        // Handled as a general case below
295                    }
296                    Poll::Pending => {
297                        return Poll::Pending;
298                    }
299                }
300            }
301
302            // Uphold invariant of the trait that some result should be returned for every unique
303            // provided offset
304            match offsets_set.pop_first() {
305                Some(offset) => Poll::Ready(Some(Ok((offset, None)))),
306                None => Poll::Ready(None),
307            }
308        })))
309    }
310}
311
312impl ClusterPieceCache {
313    /// Create new instance using information from previously received
314    /// [`ClusterCacheIdentifyBroadcast`]
315    #[inline]
316    pub fn new(
317        piece_cache_id: PieceCacheId,
318        max_num_elements: u32,
319        nats_client: NatsClient,
320    ) -> ClusterPieceCache {
321        Self {
322            piece_cache_id,
323            piece_cache_id_string: piece_cache_id.to_string(),
324            max_num_elements,
325            nats_client,
326        }
327    }
328}
329
330#[derive(Debug)]
331struct CacheDetails<'a, C> {
332    piece_cache_id: PieceCacheId,
333    piece_cache_id_string: String,
334    cache: &'a C,
335}
336
337/// Create cache service for specified caches that will be processing incoming requests and send
338/// periodic identify notifications
339pub async fn cache_service<C>(
340    nats_client: NatsClient,
341    caches: &[C],
342    cache_group: &str,
343    identification_broadcast_interval: Duration,
344    primary_instance: bool,
345) -> anyhow::Result<()>
346where
347    C: PieceCache,
348{
349    let cluster_cache_id = ClusterCacheId::new();
350    let cluster_cache_id_string = cluster_cache_id.to_string();
351
352    let caches_details = caches
353        .iter()
354        .map(|cache| {
355            let piece_cache_id = *cache.id();
356
357            if primary_instance {
358                info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created piece cache");
359            }
360
361            CacheDetails {
362                piece_cache_id,
363                piece_cache_id_string: piece_cache_id.to_string(),
364                cache,
365            }
366        })
367        .collect::<Vec<_>>();
368
369    if primary_instance {
370        select! {
371            result = identify_responder(
372                &nats_client,
373                cluster_cache_id,
374                cache_group,
375                identification_broadcast_interval
376            ).fuse() => {
377                result
378            },
379            result = piece_cache_details_responder(
380                &nats_client,
381                &cluster_cache_id_string,
382                &caches_details
383            ).fuse() => {
384                result
385            },
386            result = write_piece_responder(&nats_client, &caches_details).fuse() => {
387                result
388            },
389            result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
390                result
391            },
392            result = read_piece_responder(&nats_client, &caches_details).fuse() => {
393                result
394            },
395            result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
396                result
397            },
398            result = contents_responder(&nats_client, &caches_details).fuse() => {
399                result
400            },
401        }
402    } else {
403        select! {
404            result = write_piece_responder(&nats_client, &caches_details).fuse() => {
405                result
406            },
407            result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
408                result
409            },
410            result = read_piece_responder(&nats_client, &caches_details).fuse() => {
411                result
412            },
413            result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
414                result
415            },
416            result = contents_responder(&nats_client, &caches_details).fuse() => {
417                result
418            },
419        }
420    }
421}
422
423/// Listen for cache identification broadcast from controller and publish identification
424/// broadcast in response, also send periodic notifications reminding that cache exists.
425///
426/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
427/// per controller instance in order to parallelize more work across threads if needed.
428async fn identify_responder(
429    nats_client: &NatsClient,
430    cluster_cache_id: ClusterCacheId,
431    cache_group: &str,
432    identification_broadcast_interval: Duration,
433) -> anyhow::Result<()> {
434    let mut subscription = nats_client
435        .subscribe_to_broadcasts::<ClusterControllerCacheIdentifyBroadcast>(Some(cache_group), None)
436        .await
437        .map_err(|error| {
438            anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
439        })?
440        .fuse();
441
442    // Also send periodic updates in addition to the subscription response
443    let mut interval = tokio::time::interval(identification_broadcast_interval);
444    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
445
446    let mut last_identification = Instant::now();
447
448    loop {
449        select! {
450            maybe_message = subscription.next() => {
451                let Some(message) = maybe_message else {
452                    debug!("Identify broadcast stream ended");
453                    break;
454                };
455
456                trace!(?message, "Cache received identify broadcast message");
457
458                if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL {
459                    // Skip too frequent identification requests
460                    continue;
461                }
462
463                last_identification = Instant::now();
464                send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
465                interval.reset();
466            }
467            _ = interval.tick().fuse() => {
468                last_identification = Instant::now();
469                trace!("Cache self-identification");
470
471                send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
472            }
473        }
474    }
475
476    Ok(())
477}
478
479async fn send_identify_broadcast(
480    nats_client: &NatsClient,
481    cluster_cache_id: ClusterCacheId,
482    cache_group: &str,
483) {
484    if let Err(error) = nats_client
485        .broadcast(
486            &ClusterCacheIdentifyBroadcast { cluster_cache_id },
487            cache_group,
488        )
489        .await
490    {
491        warn!(%cluster_cache_id, %error, "Failed to send cache identify notification");
492    }
493}
494
495async fn piece_cache_details_responder<C>(
496    nats_client: &NatsClient,
497    cluster_cache_id_string: &str,
498    caches_details: &[CacheDetails<'_, C>],
499) -> anyhow::Result<()>
500where
501    C: PieceCache,
502{
503    nats_client
504        .stream_request_responder(
505            Some(cluster_cache_id_string),
506            Some(cluster_cache_id_string.to_string()),
507            |_request: ClusterCacheDetailsRequest| async {
508                Some(stream::iter(caches_details.iter().map(|cache_details| {
509                    ClusterPieceCacheDetails {
510                        piece_cache_id: cache_details.piece_cache_id,
511                        max_num_elements: cache_details.cache.max_num_elements(),
512                    }
513                })))
514            },
515        )
516        .await
517}
518
519async fn write_piece_responder<C>(
520    nats_client: &NatsClient,
521    caches_details: &[CacheDetails<'_, C>],
522) -> anyhow::Result<()>
523where
524    C: PieceCache,
525{
526    caches_details
527        .iter()
528        .map(|cache_details| async move {
529            nats_client
530                .request_responder(
531                    Some(cache_details.piece_cache_id_string.as_str()),
532                    Some(cache_details.piece_cache_id_string.clone()),
533                    |request: ClusterCacheWritePieceRequest| async move {
534                        Some(
535                            cache_details
536                                .cache
537                                .write_piece(request.offset, request.piece_index, &request.piece)
538                                .await
539                                .map_err(|error| error.to_string()),
540                        )
541                    },
542                )
543                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
544                .await
545        })
546        .collect::<FuturesUnordered<_>>()
547        .next()
548        .await
549        .ok_or_else(|| anyhow!("No caches"))?
550}
551
552async fn read_piece_index_responder<C>(
553    nats_client: &NatsClient,
554    caches_details: &[CacheDetails<'_, C>],
555) -> anyhow::Result<()>
556where
557    C: PieceCache,
558{
559    caches_details
560        .iter()
561        .map(|cache_details| async move {
562            nats_client
563                .request_responder(
564                    Some(cache_details.piece_cache_id_string.as_str()),
565                    Some(cache_details.piece_cache_id_string.clone()),
566                    |request: ClusterCacheReadPieceIndexRequest| async move {
567                        Some(
568                            cache_details
569                                .cache
570                                .read_piece_index(request.offset)
571                                .await
572                                .map_err(|error| error.to_string()),
573                        )
574                    },
575                )
576                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
577                .await
578        })
579        .collect::<FuturesUnordered<_>>()
580        .next()
581        .await
582        .ok_or_else(|| anyhow!("No caches"))?
583}
584
585async fn read_piece_responder<C>(
586    nats_client: &NatsClient,
587    caches_details: &[CacheDetails<'_, C>],
588) -> anyhow::Result<()>
589where
590    C: PieceCache,
591{
592    caches_details
593        .iter()
594        .map(|cache_details| async move {
595            nats_client
596                .request_responder(
597                    Some(cache_details.piece_cache_id_string.as_str()),
598                    Some(cache_details.piece_cache_id_string.clone()),
599                    |request: ClusterCacheReadPieceRequest| async move {
600                        Some(
601                            cache_details
602                                .cache
603                                .read_piece(request.offset)
604                                .await
605                                .map_err(|error| error.to_string()),
606                        )
607                    },
608                )
609                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
610                .await
611        })
612        .collect::<FuturesUnordered<_>>()
613        .next()
614        .await
615        .ok_or_else(|| anyhow!("No caches"))?
616}
617
618async fn read_pieces_responder<C>(
619    nats_client: &NatsClient,
620    caches_details: &[CacheDetails<'_, C>],
621) -> anyhow::Result<()>
622where
623    C: PieceCache,
624{
625    caches_details
626        .iter()
627        .map(|cache_details| async move {
628            nats_client
629                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
630                    Some(cache_details.piece_cache_id_string.as_str()),
631                    Some(cache_details.piece_cache_id_string.clone()),
632                    |ClusterCacheReadPiecesRequest { offsets }| async move {
633                        Some(
634                            match cache_details
635                                .cache
636                                .read_pieces(Box::new(offsets.into_iter()))
637                                .await
638                            {
639                                Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
640                                    maybe_cache_element.map_err(|error| error.to_string())
641                                })) as _,
642                                Err(error) => {
643                                    error!(%error, "Failed to read pieces");
644
645                                    Box::pin(stream::once(async move {
646                                        Err(format!("Failed to read pieces: {error}"))
647                                    })) as _
648                                }
649                            },
650                        )
651                    },
652                )
653                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
654                .await
655        })
656        .collect::<FuturesUnordered<_>>()
657        .next()
658        .await
659        .ok_or_else(|| anyhow!("No caches"))?
660}
661
662async fn contents_responder<C>(
663    nats_client: &NatsClient,
664    caches_details: &[CacheDetails<'_, C>],
665) -> anyhow::Result<()>
666where
667    C: PieceCache,
668{
669    caches_details
670        .iter()
671        .map(|cache_details| async move {
672            nats_client
673                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
674                    Some(cache_details.piece_cache_id_string.as_str()),
675                    Some(cache_details.piece_cache_id_string.clone()),
676                    |_request: ClusterCacheContentsRequest| async move {
677                        Some(match cache_details.cache.contents().await {
678                            Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
679                                maybe_cache_element.map_err(|error| error.to_string())
680                            })) as _,
681                            Err(error) => {
682                                error!(%error, "Failed to get contents");
683
684                                Box::pin(stream::once(async move {
685                                    Err(format!("Failed to get contents: {error}"))
686                                })) as _
687                            }
688                        })
689                    },
690                )
691                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
692                .await
693        })
694        .collect::<FuturesUnordered<_>>()
695        .next()
696        .await
697        .ok_or_else(|| anyhow!("No caches"))?
698}