subspace_farmer/cluster/controller/
caches.rs

1//! This module exposed implementation of caches maintenance.
2//!
3//! The goal is to observe caches in a particular cache group and keep controller's data structures
4//! about which pieces are stored where up to date. Implementation automatically handles dynamic
5//! cache addition and removal, tries to reduce number of reinitializations that result in potential
6//! piece cache sync, etc.
7
8use crate::cluster::cache::{
9    ClusterCacheDetailsRequest, ClusterCacheId, ClusterCacheIdentifyBroadcast, ClusterPieceCache,
10    ClusterPieceCacheDetails,
11};
12use crate::cluster::controller::stream_map::StreamMap;
13use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
14use crate::cluster::nats_client::NatsClient;
15use crate::farm::PieceCache;
16use crate::farmer_cache::FarmerCache;
17use anyhow::anyhow;
18use futures::channel::oneshot;
19use futures::future::FusedFuture;
20use futures::{select, FutureExt, StreamExt};
21use parking_lot::Mutex;
22use std::future::{ready, Future};
23use std::pin::{pin, Pin};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::time::MissedTickBehavior;
27use tracing::{debug, info, trace, warn};
28
29const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3);
30
31#[derive(Debug)]
32struct KnownCache {
33    cluster_cache_id: ClusterCacheId,
34    last_identification: Instant,
35    piece_caches: Vec<Arc<ClusterPieceCache>>,
36}
37
38#[derive(Debug)]
39struct KnownCaches {
40    identification_broadcast_interval: Duration,
41    known_caches: Vec<KnownCache>,
42}
43
44impl KnownCaches {
45    fn new(identification_broadcast_interval: Duration) -> Self {
46        Self {
47            identification_broadcast_interval,
48            known_caches: Vec::new(),
49        }
50    }
51
52    fn get_all(&self) -> Vec<Arc<dyn PieceCache>> {
53        self.known_caches
54            .iter()
55            .flat_map(|known_cache| {
56                known_cache
57                    .piece_caches
58                    .iter()
59                    .map(|piece_cache| Arc::clone(piece_cache) as Arc<_>)
60            })
61            .collect()
62    }
63
64    /// Return `false` if cluster cache is unknown and reinitialization is required
65    #[must_use]
66    fn refresh(&mut self, cluster_cache_id: ClusterCacheId) -> bool {
67        self.known_caches.iter_mut().any(|known_cache| {
68            if known_cache.cluster_cache_id == cluster_cache_id {
69                trace!(%cluster_cache_id, "Updating last identification for cache");
70                known_cache.last_identification = Instant::now();
71                true
72            } else {
73                false
74            }
75        })
76    }
77
78    /// Return `true` if cluster cache reinitialization is required
79    fn add_cache(
80        &mut self,
81        cluster_cache_id: ClusterCacheId,
82        piece_caches: Vec<Arc<ClusterPieceCache>>,
83    ) {
84        self.known_caches.push(KnownCache {
85            cluster_cache_id,
86            last_identification: Instant::now(),
87            piece_caches,
88        });
89    }
90
91    fn remove_expired(&mut self) -> impl Iterator<Item = KnownCache> + '_ {
92        self.known_caches.extract_if(.., |known_cache| {
93            known_cache.last_identification.elapsed() > self.identification_broadcast_interval * 2
94        })
95    }
96}
97
98/// Utility function for maintaining caches by controller in a cluster environment
99pub async fn maintain_caches(
100    cache_group: &str,
101    nats_client: &NatsClient,
102    farmer_cache: &FarmerCache,
103    identification_broadcast_interval: Duration,
104) -> anyhow::Result<()> {
105    let mut known_caches = KnownCaches::new(identification_broadcast_interval);
106
107    let mut piece_caches_to_add = StreamMap::default();
108
109    let mut scheduled_reinitialization_for = None;
110    let mut cache_reinitialization =
111        (Box::pin(ready(())) as Pin<Box<dyn Future<Output = ()>>>).fuse();
112
113    let cache_identify_subscription = pin!(nats_client
114        .subscribe_to_broadcasts::<ClusterCacheIdentifyBroadcast>(Some(cache_group), None)
115        .await
116        .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?);
117
118    // Request cache to identify themselves
119    if let Err(error) = nats_client
120        .broadcast(&ClusterControllerCacheIdentifyBroadcast, cache_group)
121        .await
122    {
123        warn!(%error, "Failed to send cache identification broadcast");
124    }
125
126    let mut cache_identify_subscription = cache_identify_subscription.fuse();
127    let mut cache_pruning_interval = tokio::time::interval_at(
128        (Instant::now() + identification_broadcast_interval * 2).into(),
129        identification_broadcast_interval * 2,
130    );
131    cache_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
132
133    loop {
134        if cache_reinitialization.is_terminated()
135            && let Some(time) = scheduled_reinitialization_for
136            && time <= Instant::now()
137        {
138            scheduled_reinitialization_for.take();
139
140            let new_piece_caches = known_caches.get_all();
141            let new_cache_reinitialization = async {
142                let (sync_finish_sender, sync_finish_receiver) = oneshot::channel::<()>();
143                let sync_finish_sender = Mutex::new(Some(sync_finish_sender));
144
145                let _handler_id = farmer_cache.on_sync_progress(Arc::new(move |&progress| {
146                    if progress == 100.0 {
147                        if let Some(sync_finish_sender) = sync_finish_sender.lock().take() {
148                            // Result doesn't matter
149                            let _ = sync_finish_sender.send(());
150                        }
151                    }
152                }));
153
154                farmer_cache
155                    .replace_backing_caches(new_piece_caches, Vec::new())
156                    .await;
157
158                // Wait for piece cache sync to finish before potentially staring a new one, result
159                // doesn't matter
160                let _ = sync_finish_receiver.await;
161            };
162
163            cache_reinitialization =
164                (Box::pin(new_cache_reinitialization) as Pin<Box<dyn Future<Output = ()>>>).fuse();
165        }
166
167        select! {
168            maybe_identify_message = cache_identify_subscription.next() => {
169                let Some(identify_message) = maybe_identify_message else {
170                    return Err(anyhow!("Cache identify stream ended"));
171                };
172
173                let ClusterCacheIdentifyBroadcast { cluster_cache_id } = identify_message;
174
175                if known_caches.refresh(cluster_cache_id) {
176                    trace!(
177                        %cluster_cache_id,
178                        "Received identification for already known cache"
179                    );
180                } else if piece_caches_to_add.add_if_not_in_progress(cluster_cache_id, Box::pin(collect_piece_caches(cluster_cache_id, nats_client))) {
181                    debug!(
182                        %cluster_cache_id,
183                        "Received identification for new cache, collecting piece caches"
184                    );
185                } else {
186                    debug!(
187                        %cluster_cache_id,
188                        "Received identification for new cache, which is already in progress"
189                    );
190                }
191            }
192            (cluster_cache_id, maybe_piece_caches) = piece_caches_to_add.select_next_some() => {
193                let Ok(piece_caches) = maybe_piece_caches.inspect_err(|error| {
194                    warn!(
195                        %cluster_cache_id,
196                        %error,
197                        "Failed to collect piece caches to add, may retry later"
198                    )
199                }) else {
200                    continue;
201                };
202
203                info!(
204                    %cluster_cache_id,
205                    "New cache discovered, scheduling reinitialization"
206                );
207                scheduled_reinitialization_for.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY);
208
209                known_caches.add_cache(cluster_cache_id, piece_caches);
210            }
211            _ = cache_pruning_interval.tick().fuse() => {
212                let mut reinit = false;
213                for removed_cache in known_caches.remove_expired() {
214                    reinit = true;
215
216                    warn!(
217                        cluster_cache_id = %removed_cache.cluster_cache_id,
218                        "Cache expired and removed, scheduling reinitialization"
219                    );
220                }
221
222                if reinit {
223                    scheduled_reinitialization_for.replace(
224                        Instant::now() + SCHEDULE_REINITIALIZATION_DELAY,
225                    );
226                }
227            }
228            _ = cache_reinitialization => {
229                // Nothing left to do
230            }
231        }
232    }
233}
234
235/// Collect piece caches from the cache and convert them to `ClusterPieceCache` by sending a stream
236/// request, then construct a `KnownCache` instance.
237async fn collect_piece_caches(
238    cluster_cache_id: ClusterCacheId,
239    nats_client: &NatsClient,
240) -> anyhow::Result<Vec<Arc<ClusterPieceCache>>> {
241    let piece_caches = nats_client
242        .stream_request(
243            &ClusterCacheDetailsRequest,
244            Some(&cluster_cache_id.to_string()),
245        )
246        .await
247        .inspect_err(|error| {
248            warn!(
249                %error,
250                %cluster_cache_id,
251                "Failed to request cache details"
252            )
253        })?
254        .map(
255            |ClusterPieceCacheDetails {
256                 piece_cache_id,
257                 max_num_elements,
258             }| {
259                debug!(
260                    %cluster_cache_id,
261                    %piece_cache_id,
262                    %max_num_elements,
263                    "Discovered new piece cache"
264                );
265                Arc::new(ClusterPieceCache::new(
266                    piece_cache_id,
267                    max_num_elements,
268                    nats_client.clone(),
269                ))
270            },
271        )
272        .collect()
273        .await;
274
275    Ok(piece_caches)
276}