subspace_farmer/cluster/controller/
caches.rs

1//! This module exposed implementation of caches maintenance.
2//!
3//! The goal is to observe caches in a particular cache group and keep controller's data structures
4//! about which pieces are stored where up to date. Implementation automatically handles dynamic
5//! cache addition and removal, tries to reduce number of reinitializations that result in potential
6//! piece cache sync, etc.
7
8use crate::cluster::cache::{
9    ClusterCacheDetailsRequest, ClusterCacheId, ClusterCacheIdentifyBroadcast, ClusterPieceCache,
10    ClusterPieceCacheDetails,
11};
12use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
13use crate::cluster::controller::stream_map::StreamMap;
14use crate::cluster::nats_client::NatsClient;
15use crate::farm::PieceCache;
16use crate::farmer_cache::FarmerCache;
17use anyhow::anyhow;
18use futures::channel::oneshot;
19use futures::future::FusedFuture;
20use futures::{FutureExt, StreamExt, select};
21use parking_lot::Mutex;
22use std::future::{Future, ready};
23use std::pin::{Pin, pin};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::time::MissedTickBehavior;
27use tracing::{debug, info, trace, warn};
28
29const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3);
30
31#[derive(Debug)]
32struct KnownCache {
33    cluster_cache_id: ClusterCacheId,
34    last_identification: Instant,
35    piece_caches: Vec<Arc<ClusterPieceCache>>,
36}
37
38#[derive(Debug)]
39struct KnownCaches {
40    identification_broadcast_interval: Duration,
41    known_caches: Vec<KnownCache>,
42}
43
44impl KnownCaches {
45    fn new(identification_broadcast_interval: Duration) -> Self {
46        Self {
47            identification_broadcast_interval,
48            known_caches: Vec::new(),
49        }
50    }
51
52    fn get_all(&self) -> Vec<Arc<dyn PieceCache>> {
53        self.known_caches
54            .iter()
55            .flat_map(|known_cache| {
56                known_cache
57                    .piece_caches
58                    .iter()
59                    .map(|piece_cache| Arc::clone(piece_cache) as Arc<_>)
60            })
61            .collect()
62    }
63
64    /// Return `false` if cluster cache is unknown and reinitialization is required
65    #[must_use]
66    fn refresh(&mut self, cluster_cache_id: ClusterCacheId) -> bool {
67        self.known_caches.iter_mut().any(|known_cache| {
68            if known_cache.cluster_cache_id == cluster_cache_id {
69                trace!(%cluster_cache_id, "Updating last identification for cache");
70                known_cache.last_identification = Instant::now();
71                true
72            } else {
73                false
74            }
75        })
76    }
77
78    /// Return `true` if cluster cache reinitialization is required
79    fn add_cache(
80        &mut self,
81        cluster_cache_id: ClusterCacheId,
82        piece_caches: Vec<Arc<ClusterPieceCache>>,
83    ) {
84        self.known_caches.push(KnownCache {
85            cluster_cache_id,
86            last_identification: Instant::now(),
87            piece_caches,
88        });
89    }
90
91    fn remove_expired(&mut self) -> impl Iterator<Item = KnownCache> + '_ {
92        self.known_caches.extract_if(.., |known_cache| {
93            known_cache.last_identification.elapsed() > self.identification_broadcast_interval * 2
94        })
95    }
96}
97
98/// Utility function for maintaining caches by controller in a cluster environment
99pub async fn maintain_caches(
100    cache_group: &str,
101    nats_client: &NatsClient,
102    farmer_cache: &FarmerCache,
103    identification_broadcast_interval: Duration,
104) -> anyhow::Result<()> {
105    let mut known_caches = KnownCaches::new(identification_broadcast_interval);
106
107    let mut piece_caches_to_add = StreamMap::default();
108
109    let mut scheduled_reinitialization_for = None;
110    let mut cache_reinitialization =
111        (Box::pin(ready(())) as Pin<Box<dyn Future<Output = ()>>>).fuse();
112
113    let cache_identify_subscription = pin!(
114        nats_client
115            .subscribe_to_broadcasts::<ClusterCacheIdentifyBroadcast>(Some(cache_group), None)
116            .await
117            .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?
118    );
119
120    // Request cache to identify themselves
121    if let Err(error) = nats_client
122        .broadcast(&ClusterControllerCacheIdentifyBroadcast, cache_group)
123        .await
124    {
125        warn!(%error, "Failed to send cache identification broadcast");
126    }
127
128    let mut cache_identify_subscription = cache_identify_subscription.fuse();
129    let mut cache_pruning_interval = tokio::time::interval_at(
130        (Instant::now() + identification_broadcast_interval * 2).into(),
131        identification_broadcast_interval * 2,
132    );
133    cache_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
134
135    loop {
136        if cache_reinitialization.is_terminated()
137            && let Some(time) = scheduled_reinitialization_for
138            && time <= Instant::now()
139        {
140            scheduled_reinitialization_for.take();
141
142            let new_piece_caches = known_caches.get_all();
143            let new_cache_reinitialization = async {
144                let (sync_finish_sender, sync_finish_receiver) = oneshot::channel::<()>();
145                let sync_finish_sender = Mutex::new(Some(sync_finish_sender));
146
147                let _handler_id = farmer_cache.on_sync_progress(Arc::new(move |&progress| {
148                    if progress == 100.0
149                        && let Some(sync_finish_sender) = sync_finish_sender.lock().take()
150                    {
151                        // Result doesn't matter
152                        let _ = sync_finish_sender.send(());
153                    }
154                }));
155
156                farmer_cache
157                    .replace_backing_caches(new_piece_caches, Vec::new())
158                    .await;
159
160                // Wait for piece cache sync to finish before potentially staring a new one, result
161                // doesn't matter
162                let _ = sync_finish_receiver.await;
163            };
164
165            cache_reinitialization =
166                (Box::pin(new_cache_reinitialization) as Pin<Box<dyn Future<Output = ()>>>).fuse();
167        }
168
169        select! {
170            maybe_identify_message = cache_identify_subscription.next() => {
171                let Some(identify_message) = maybe_identify_message else {
172                    return Err(anyhow!("Cache identify stream ended"));
173                };
174
175                let ClusterCacheIdentifyBroadcast { cluster_cache_id } = identify_message;
176
177                if known_caches.refresh(cluster_cache_id) {
178                    trace!(
179                        %cluster_cache_id,
180                        "Received identification for already known cache"
181                    );
182                } else if piece_caches_to_add.add_if_not_in_progress(cluster_cache_id, Box::pin(collect_piece_caches(cluster_cache_id, nats_client))) {
183                    debug!(
184                        %cluster_cache_id,
185                        "Received identification for new cache, collecting piece caches"
186                    );
187                } else {
188                    debug!(
189                        %cluster_cache_id,
190                        "Received identification for new cache, which is already in progress"
191                    );
192                }
193            }
194            (cluster_cache_id, maybe_piece_caches) = piece_caches_to_add.select_next_some() => {
195                let Ok(piece_caches) = maybe_piece_caches.inspect_err(|error| {
196                    warn!(
197                        %cluster_cache_id,
198                        %error,
199                        "Failed to collect piece caches to add, may retry later"
200                    )
201                }) else {
202                    continue;
203                };
204
205                info!(
206                    %cluster_cache_id,
207                    "New cache discovered, scheduling reinitialization"
208                );
209                scheduled_reinitialization_for.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY);
210
211                known_caches.add_cache(cluster_cache_id, piece_caches);
212            }
213            _ = cache_pruning_interval.tick().fuse() => {
214                let mut reinit = false;
215                for removed_cache in known_caches.remove_expired() {
216                    reinit = true;
217
218                    warn!(
219                        cluster_cache_id = %removed_cache.cluster_cache_id,
220                        "Cache expired and removed, scheduling reinitialization"
221                    );
222                }
223
224                if reinit {
225                    scheduled_reinitialization_for.replace(
226                        Instant::now() + SCHEDULE_REINITIALIZATION_DELAY,
227                    );
228                }
229            }
230            _ = cache_reinitialization => {
231                // Nothing left to do
232            }
233        }
234    }
235}
236
237/// Collect piece caches from the cache and convert them to `ClusterPieceCache` by sending a stream
238/// request, then construct a `KnownCache` instance.
239async fn collect_piece_caches(
240    cluster_cache_id: ClusterCacheId,
241    nats_client: &NatsClient,
242) -> anyhow::Result<Vec<Arc<ClusterPieceCache>>> {
243    let piece_caches = nats_client
244        .stream_request(
245            &ClusterCacheDetailsRequest,
246            Some(&cluster_cache_id.to_string()),
247        )
248        .await
249        .inspect_err(|error| {
250            warn!(
251                %error,
252                %cluster_cache_id,
253                "Failed to request cache details"
254            )
255        })?
256        .map(
257            |ClusterPieceCacheDetails {
258                 piece_cache_id,
259                 max_num_elements,
260             }| {
261                debug!(
262                    %cluster_cache_id,
263                    %piece_cache_id,
264                    %max_num_elements,
265                    "Discovered new piece cache"
266                );
267                Arc::new(ClusterPieceCache::new(
268                    piece_cache_id,
269                    max_num_elements,
270                    nats_client.clone(),
271                ))
272            },
273        )
274        .collect()
275        .await;
276
277    Ok(piece_caches)
278}