1use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
11use crate::cluster::nats_client::{
12 GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset};
15use anyhow::anyhow;
16use async_trait::async_trait;
17use derive_more::{Display, From};
18use futures::stream::FuturesUnordered;
19use futures::{select, stream, FutureExt, Stream, StreamExt};
20use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
21use std::collections::BTreeSet;
22use std::pin::Pin;
23use std::task::Poll;
24use std::time::{Duration, Instant};
25use subspace_core_primitives::pieces::{Piece, PieceIndex};
26use tokio::time::MissedTickBehavior;
27use tracing::{debug, error, info, info_span, trace, warn, Instrument};
28use ulid::Ulid;
29
30const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
31
32#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
34pub enum ClusterCacheId {
35 Ulid(Ulid),
37}
38
39impl Encode for ClusterCacheId {
40 #[inline]
41 fn size_hint(&self) -> usize {
42 1_usize
43 + match self {
44 ClusterCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
45 }
46 }
47
48 #[inline]
49 fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
50 match self {
51 ClusterCacheId::Ulid(ulid) => {
52 output.push_byte(0);
53 Encode::encode_to(&ulid.0, output);
54 }
55 }
56 }
57}
58
59impl EncodeLike for ClusterCacheId {}
60
61impl Decode for ClusterCacheId {
62 #[inline]
63 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
64 match input.read_byte().map_err(|e| {
65 e.chain("Could not decode `ClusterCacheId`, failed to read variant byte")
66 })? {
67 0 => u128::decode(input)
68 .map(|ulid| ClusterCacheId::Ulid(Ulid(ulid)))
69 .map_err(|e| e.chain("Could not decode `ClusterCacheId::Ulid.0`")),
70 _ => Err("Could not decode `ClusterCacheId`, variant doesn't exist".into()),
71 }
72 }
73}
74
75#[allow(clippy::new_without_default)]
76impl ClusterCacheId {
77 #[inline]
79 pub fn new() -> Self {
80 Self::Ulid(Ulid::new())
81 }
82}
83
84#[derive(Debug, Clone, Encode, Decode)]
86pub struct ClusterCacheIdentifyBroadcast {
87 pub cluster_cache_id: ClusterCacheId,
89}
90
91impl GenericBroadcast for ClusterCacheIdentifyBroadcast {
92 const SUBJECT: &'static str = "subspace.cache.*.cache-identify";
94}
95
96#[derive(Debug, Clone, Encode, Decode)]
98pub struct ClusterCacheDetailsRequest;
99
100impl GenericStreamRequest for ClusterCacheDetailsRequest {
101 const SUBJECT: &'static str = "subspace.cache.*.details";
103 type Response = ClusterPieceCacheDetails;
104}
105
106#[derive(Debug, Clone, Encode, Decode)]
108pub struct ClusterPieceCacheDetails {
109 pub piece_cache_id: PieceCacheId,
111 pub max_num_elements: u32,
113}
114
115#[derive(Debug, Clone, Encode, Decode)]
117struct ClusterCacheWritePieceRequest {
118 offset: PieceCacheOffset,
119 piece_index: PieceIndex,
120 piece: Piece,
121}
122
123impl GenericRequest for ClusterCacheWritePieceRequest {
124 const SUBJECT: &'static str = "subspace.cache.*.write-piece";
126 type Response = Result<(), String>;
127}
128
129#[derive(Debug, Clone, Encode, Decode)]
131struct ClusterCacheReadPieceIndexRequest {
132 offset: PieceCacheOffset,
133}
134
135impl GenericRequest for ClusterCacheReadPieceIndexRequest {
136 const SUBJECT: &'static str = "subspace.cache.*.read-piece-index";
138 type Response = Result<Option<PieceIndex>, String>;
139}
140
141#[derive(Debug, Clone, Encode, Decode)]
143pub(super) struct ClusterCacheReadPieceRequest {
144 pub(super) offset: PieceCacheOffset,
145}
146
147impl GenericRequest for ClusterCacheReadPieceRequest {
148 const SUBJECT: &'static str = "subspace.cache.*.read-piece";
150 type Response = Result<Option<(PieceIndex, Piece)>, String>;
151}
152
153#[derive(Debug, Clone, Encode, Decode)]
155pub(super) struct ClusterCacheReadPiecesRequest {
156 pub(super) offsets: Vec<PieceCacheOffset>,
157}
158
159impl GenericStreamRequest for ClusterCacheReadPiecesRequest {
160 const SUBJECT: &'static str = "subspace.cache.*.read-pieces";
162 type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>;
163}
164
165#[derive(Debug, Clone, Encode, Decode)]
167struct ClusterCacheContentsRequest;
168
169impl GenericStreamRequest for ClusterCacheContentsRequest {
170 const SUBJECT: &'static str = "subspace.cache.*.contents";
172 type Response = Result<(PieceCacheOffset, Option<PieceIndex>), String>;
173}
174
175#[derive(Debug)]
177pub struct ClusterPieceCache {
178 piece_cache_id: PieceCacheId,
179 piece_cache_id_string: String,
180 max_num_elements: u32,
181 nats_client: NatsClient,
182}
183
184#[async_trait]
185impl PieceCache for ClusterPieceCache {
186 fn id(&self) -> &PieceCacheId {
187 &self.piece_cache_id
188 }
189
190 #[inline]
191 fn max_num_elements(&self) -> u32 {
192 self.max_num_elements
193 }
194
195 async fn contents(
196 &self,
197 ) -> Result<
198 Box<
199 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
200 + Unpin
201 + Send
202 + '_,
203 >,
204 FarmError,
205 > {
206 Ok(Box::new(
207 self.nats_client
208 .stream_request(
209 &ClusterCacheContentsRequest,
210 Some(&self.piece_cache_id_string),
211 )
212 .await?
213 .map(|response| response.map_err(FarmError::from)),
214 ))
215 }
216
217 async fn write_piece(
218 &self,
219 offset: PieceCacheOffset,
220 piece_index: PieceIndex,
221 piece: &Piece,
222 ) -> Result<(), FarmError> {
223 Ok(self
224 .nats_client
225 .request(
226 &ClusterCacheWritePieceRequest {
227 offset,
228 piece_index,
229 piece: piece.clone(),
230 },
231 Some(&self.piece_cache_id_string),
232 )
233 .await??)
234 }
235
236 async fn read_piece_index(
237 &self,
238 offset: PieceCacheOffset,
239 ) -> Result<Option<PieceIndex>, FarmError> {
240 Ok(self
241 .nats_client
242 .request(
243 &ClusterCacheReadPieceIndexRequest { offset },
244 Some(&self.piece_cache_id_string),
245 )
246 .await??)
247 }
248
249 async fn read_piece(
250 &self,
251 offset: PieceCacheOffset,
252 ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
253 Ok(self
254 .nats_client
255 .request(
256 &ClusterCacheReadPieceRequest { offset },
257 Some(&self.piece_cache_id_string),
258 )
259 .await??)
260 }
261
262 async fn read_pieces(
263 &self,
264 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
265 ) -> Result<
266 Box<
267 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
268 + Send
269 + Unpin
270 + '_,
271 >,
272 FarmError,
273 > {
274 let offsets = offsets.collect::<Vec<_>>();
275 let mut offsets_set = BTreeSet::from_iter(offsets.iter().copied());
276 let mut stream = self
277 .nats_client
278 .stream_request(
279 &ClusterCacheReadPiecesRequest { offsets },
280 Some(&self.piece_cache_id_string),
281 )
282 .await?
283 .map(|response| response.map_err(FarmError::from))
284 .fuse();
285 Ok(Box::new(stream::poll_fn(move |cx| {
286 if !stream.is_done() {
287 match stream.poll_next_unpin(cx) {
288 Poll::Ready(Some(response)) => {
289 return Poll::Ready(Some(response.inspect(|(offset, _)| {
290 offsets_set.remove(offset);
291 })));
292 }
293 Poll::Ready(None) => {
294 }
296 Poll::Pending => {
297 return Poll::Pending;
298 }
299 }
300 }
301
302 match offsets_set.pop_first() {
305 Some(offset) => Poll::Ready(Some(Ok((offset, None)))),
306 None => Poll::Ready(None),
307 }
308 })))
309 }
310}
311
312impl ClusterPieceCache {
313 #[inline]
316 pub fn new(
317 piece_cache_id: PieceCacheId,
318 max_num_elements: u32,
319 nats_client: NatsClient,
320 ) -> ClusterPieceCache {
321 Self {
322 piece_cache_id,
323 piece_cache_id_string: piece_cache_id.to_string(),
324 max_num_elements,
325 nats_client,
326 }
327 }
328}
329
330#[derive(Debug)]
331struct CacheDetails<'a, C> {
332 piece_cache_id: PieceCacheId,
333 piece_cache_id_string: String,
334 cache: &'a C,
335}
336
337pub async fn cache_service<C>(
340 nats_client: NatsClient,
341 caches: &[C],
342 cache_group: &str,
343 identification_broadcast_interval: Duration,
344 primary_instance: bool,
345) -> anyhow::Result<()>
346where
347 C: PieceCache,
348{
349 let cluster_cache_id = ClusterCacheId::new();
350 let cluster_cache_id_string = cluster_cache_id.to_string();
351
352 let caches_details = caches
353 .iter()
354 .map(|cache| {
355 let piece_cache_id = *cache.id();
356
357 if primary_instance {
358 info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created piece cache");
359 }
360
361 CacheDetails {
362 piece_cache_id,
363 piece_cache_id_string: piece_cache_id.to_string(),
364 cache,
365 }
366 })
367 .collect::<Vec<_>>();
368
369 if primary_instance {
370 select! {
371 result = identify_responder(
372 &nats_client,
373 cluster_cache_id,
374 cache_group,
375 identification_broadcast_interval
376 ).fuse() => {
377 result
378 },
379 result = piece_cache_details_responder(
380 &nats_client,
381 &cluster_cache_id_string,
382 &caches_details
383 ).fuse() => {
384 result
385 },
386 result = write_piece_responder(&nats_client, &caches_details).fuse() => {
387 result
388 },
389 result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
390 result
391 },
392 result = read_piece_responder(&nats_client, &caches_details).fuse() => {
393 result
394 },
395 result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
396 result
397 },
398 result = contents_responder(&nats_client, &caches_details).fuse() => {
399 result
400 },
401 }
402 } else {
403 select! {
404 result = write_piece_responder(&nats_client, &caches_details).fuse() => {
405 result
406 },
407 result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
408 result
409 },
410 result = read_piece_responder(&nats_client, &caches_details).fuse() => {
411 result
412 },
413 result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
414 result
415 },
416 result = contents_responder(&nats_client, &caches_details).fuse() => {
417 result
418 },
419 }
420 }
421}
422
423async fn identify_responder(
429 nats_client: &NatsClient,
430 cluster_cache_id: ClusterCacheId,
431 cache_group: &str,
432 identification_broadcast_interval: Duration,
433) -> anyhow::Result<()> {
434 let mut subscription = nats_client
435 .subscribe_to_broadcasts::<ClusterControllerCacheIdentifyBroadcast>(Some(cache_group), None)
436 .await
437 .map_err(|error| {
438 anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
439 })?
440 .fuse();
441
442 let mut interval = tokio::time::interval(identification_broadcast_interval);
444 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
445
446 let mut last_identification = Instant::now();
447
448 loop {
449 select! {
450 maybe_message = subscription.next() => {
451 let Some(message) = maybe_message else {
452 debug!("Identify broadcast stream ended");
453 break;
454 };
455
456 trace!(?message, "Cache received identify broadcast message");
457
458 if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL {
459 continue;
461 }
462
463 last_identification = Instant::now();
464 send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
465 interval.reset();
466 }
467 _ = interval.tick().fuse() => {
468 last_identification = Instant::now();
469 trace!("Cache self-identification");
470
471 send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
472 }
473 }
474 }
475
476 Ok(())
477}
478
479async fn send_identify_broadcast(
480 nats_client: &NatsClient,
481 cluster_cache_id: ClusterCacheId,
482 cache_group: &str,
483) {
484 if let Err(error) = nats_client
485 .broadcast(
486 &ClusterCacheIdentifyBroadcast { cluster_cache_id },
487 cache_group,
488 )
489 .await
490 {
491 warn!(%cluster_cache_id, %error, "Failed to send cache identify notification");
492 }
493}
494
495async fn piece_cache_details_responder<C>(
496 nats_client: &NatsClient,
497 cluster_cache_id_string: &str,
498 caches_details: &[CacheDetails<'_, C>],
499) -> anyhow::Result<()>
500where
501 C: PieceCache,
502{
503 nats_client
504 .stream_request_responder(
505 Some(cluster_cache_id_string),
506 Some(cluster_cache_id_string.to_string()),
507 |_request: ClusterCacheDetailsRequest| async {
508 Some(stream::iter(caches_details.iter().map(|cache_details| {
509 ClusterPieceCacheDetails {
510 piece_cache_id: cache_details.piece_cache_id,
511 max_num_elements: cache_details.cache.max_num_elements(),
512 }
513 })))
514 },
515 )
516 .await
517}
518
519async fn write_piece_responder<C>(
520 nats_client: &NatsClient,
521 caches_details: &[CacheDetails<'_, C>],
522) -> anyhow::Result<()>
523where
524 C: PieceCache,
525{
526 caches_details
527 .iter()
528 .map(|cache_details| async move {
529 nats_client
530 .request_responder(
531 Some(cache_details.piece_cache_id_string.as_str()),
532 Some(cache_details.piece_cache_id_string.clone()),
533 |request: ClusterCacheWritePieceRequest| async move {
534 Some(
535 cache_details
536 .cache
537 .write_piece(request.offset, request.piece_index, &request.piece)
538 .await
539 .map_err(|error| error.to_string()),
540 )
541 },
542 )
543 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
544 .await
545 })
546 .collect::<FuturesUnordered<_>>()
547 .next()
548 .await
549 .ok_or_else(|| anyhow!("No caches"))?
550}
551
552async fn read_piece_index_responder<C>(
553 nats_client: &NatsClient,
554 caches_details: &[CacheDetails<'_, C>],
555) -> anyhow::Result<()>
556where
557 C: PieceCache,
558{
559 caches_details
560 .iter()
561 .map(|cache_details| async move {
562 nats_client
563 .request_responder(
564 Some(cache_details.piece_cache_id_string.as_str()),
565 Some(cache_details.piece_cache_id_string.clone()),
566 |request: ClusterCacheReadPieceIndexRequest| async move {
567 Some(
568 cache_details
569 .cache
570 .read_piece_index(request.offset)
571 .await
572 .map_err(|error| error.to_string()),
573 )
574 },
575 )
576 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
577 .await
578 })
579 .collect::<FuturesUnordered<_>>()
580 .next()
581 .await
582 .ok_or_else(|| anyhow!("No caches"))?
583}
584
585async fn read_piece_responder<C>(
586 nats_client: &NatsClient,
587 caches_details: &[CacheDetails<'_, C>],
588) -> anyhow::Result<()>
589where
590 C: PieceCache,
591{
592 caches_details
593 .iter()
594 .map(|cache_details| async move {
595 nats_client
596 .request_responder(
597 Some(cache_details.piece_cache_id_string.as_str()),
598 Some(cache_details.piece_cache_id_string.clone()),
599 |request: ClusterCacheReadPieceRequest| async move {
600 Some(
601 cache_details
602 .cache
603 .read_piece(request.offset)
604 .await
605 .map_err(|error| error.to_string()),
606 )
607 },
608 )
609 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
610 .await
611 })
612 .collect::<FuturesUnordered<_>>()
613 .next()
614 .await
615 .ok_or_else(|| anyhow!("No caches"))?
616}
617
618async fn read_pieces_responder<C>(
619 nats_client: &NatsClient,
620 caches_details: &[CacheDetails<'_, C>],
621) -> anyhow::Result<()>
622where
623 C: PieceCache,
624{
625 caches_details
626 .iter()
627 .map(|cache_details| async move {
628 nats_client
629 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
630 Some(cache_details.piece_cache_id_string.as_str()),
631 Some(cache_details.piece_cache_id_string.clone()),
632 |ClusterCacheReadPiecesRequest { offsets }| async move {
633 Some(
634 match cache_details
635 .cache
636 .read_pieces(Box::new(offsets.into_iter()))
637 .await
638 {
639 Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
640 maybe_cache_element.map_err(|error| error.to_string())
641 })) as _,
642 Err(error) => {
643 error!(%error, "Failed to read pieces");
644
645 Box::pin(stream::once(async move {
646 Err(format!("Failed to read pieces: {error}"))
647 })) as _
648 }
649 },
650 )
651 },
652 )
653 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
654 .await
655 })
656 .collect::<FuturesUnordered<_>>()
657 .next()
658 .await
659 .ok_or_else(|| anyhow!("No caches"))?
660}
661
662async fn contents_responder<C>(
663 nats_client: &NatsClient,
664 caches_details: &[CacheDetails<'_, C>],
665) -> anyhow::Result<()>
666where
667 C: PieceCache,
668{
669 caches_details
670 .iter()
671 .map(|cache_details| async move {
672 nats_client
673 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
674 Some(cache_details.piece_cache_id_string.as_str()),
675 Some(cache_details.piece_cache_id_string.clone()),
676 |_request: ClusterCacheContentsRequest| async move {
677 Some(match cache_details.cache.contents().await {
678 Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
679 maybe_cache_element.map_err(|error| error.to_string())
680 })) as _,
681 Err(error) => {
682 error!(%error, "Failed to get contents");
683
684 Box::pin(stream::once(async move {
685 Err(format!("Failed to get contents: {error}"))
686 })) as _
687 }
688 })
689 },
690 )
691 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
692 .await
693 })
694 .collect::<FuturesUnordered<_>>()
695 .next()
696 .await
697 .ok_or_else(|| anyhow!("No caches"))?
698}