subspace_farmer/cluster/controller/
caches.rs1use crate::cluster::cache::{
9 ClusterCacheDetailsRequest, ClusterCacheId, ClusterCacheIdentifyBroadcast, ClusterPieceCache,
10 ClusterPieceCacheDetails,
11};
12use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
13use crate::cluster::controller::stream_map::StreamMap;
14use crate::cluster::nats_client::NatsClient;
15use crate::farm::PieceCache;
16use crate::farmer_cache::FarmerCache;
17use anyhow::anyhow;
18use futures::channel::oneshot;
19use futures::future::FusedFuture;
20use futures::{FutureExt, StreamExt, select};
21use parking_lot::Mutex;
22use std::future::{Future, ready};
23use std::pin::{Pin, pin};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::time::MissedTickBehavior;
27use tracing::{debug, info, trace, warn};
28
29const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3);
30
31#[derive(Debug)]
32struct KnownCache {
33 cluster_cache_id: ClusterCacheId,
34 last_identification: Instant,
35 piece_caches: Vec<Arc<ClusterPieceCache>>,
36}
37
38#[derive(Debug)]
39struct KnownCaches {
40 identification_broadcast_interval: Duration,
41 known_caches: Vec<KnownCache>,
42}
43
44impl KnownCaches {
45 fn new(identification_broadcast_interval: Duration) -> Self {
46 Self {
47 identification_broadcast_interval,
48 known_caches: Vec::new(),
49 }
50 }
51
52 fn get_all(&self) -> Vec<Arc<dyn PieceCache>> {
53 self.known_caches
54 .iter()
55 .flat_map(|known_cache| {
56 known_cache
57 .piece_caches
58 .iter()
59 .map(|piece_cache| Arc::clone(piece_cache) as Arc<_>)
60 })
61 .collect()
62 }
63
64 #[must_use]
66 fn refresh(&mut self, cluster_cache_id: ClusterCacheId) -> bool {
67 self.known_caches.iter_mut().any(|known_cache| {
68 if known_cache.cluster_cache_id == cluster_cache_id {
69 trace!(%cluster_cache_id, "Updating last identification for cache");
70 known_cache.last_identification = Instant::now();
71 true
72 } else {
73 false
74 }
75 })
76 }
77
78 fn add_cache(
80 &mut self,
81 cluster_cache_id: ClusterCacheId,
82 piece_caches: Vec<Arc<ClusterPieceCache>>,
83 ) {
84 self.known_caches.push(KnownCache {
85 cluster_cache_id,
86 last_identification: Instant::now(),
87 piece_caches,
88 });
89 }
90
91 fn remove_expired(&mut self) -> impl Iterator<Item = KnownCache> + '_ {
92 self.known_caches.extract_if(.., |known_cache| {
93 known_cache.last_identification.elapsed() > self.identification_broadcast_interval * 2
94 })
95 }
96}
97
98pub async fn maintain_caches(
100 cache_group: &str,
101 nats_client: &NatsClient,
102 farmer_cache: &FarmerCache,
103 identification_broadcast_interval: Duration,
104) -> anyhow::Result<()> {
105 let mut known_caches = KnownCaches::new(identification_broadcast_interval);
106
107 let mut piece_caches_to_add = StreamMap::default();
108
109 let mut scheduled_reinitialization_for = None;
110 let mut cache_reinitialization =
111 (Box::pin(ready(())) as Pin<Box<dyn Future<Output = ()>>>).fuse();
112
113 let cache_identify_subscription = pin!(
114 nats_client
115 .subscribe_to_broadcasts::<ClusterCacheIdentifyBroadcast>(Some(cache_group), None)
116 .await
117 .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?
118 );
119
120 if let Err(error) = nats_client
122 .broadcast(&ClusterControllerCacheIdentifyBroadcast, cache_group)
123 .await
124 {
125 warn!(%error, "Failed to send cache identification broadcast");
126 }
127
128 let mut cache_identify_subscription = cache_identify_subscription.fuse();
129 let mut cache_pruning_interval = tokio::time::interval_at(
130 (Instant::now() + identification_broadcast_interval * 2).into(),
131 identification_broadcast_interval * 2,
132 );
133 cache_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
134
135 loop {
136 if cache_reinitialization.is_terminated()
137 && let Some(time) = scheduled_reinitialization_for
138 && time <= Instant::now()
139 {
140 scheduled_reinitialization_for.take();
141
142 let new_piece_caches = known_caches.get_all();
143 let new_cache_reinitialization = async {
144 let (sync_finish_sender, sync_finish_receiver) = oneshot::channel::<()>();
145 let sync_finish_sender = Mutex::new(Some(sync_finish_sender));
146
147 let _handler_id = farmer_cache.on_sync_progress(Arc::new(move |&progress| {
148 if progress == 100.0
149 && let Some(sync_finish_sender) = sync_finish_sender.lock().take()
150 {
151 let _ = sync_finish_sender.send(());
153 }
154 }));
155
156 farmer_cache
157 .replace_backing_caches(new_piece_caches, Vec::new())
158 .await;
159
160 let _ = sync_finish_receiver.await;
163 };
164
165 cache_reinitialization =
166 (Box::pin(new_cache_reinitialization) as Pin<Box<dyn Future<Output = ()>>>).fuse();
167 }
168
169 select! {
170 maybe_identify_message = cache_identify_subscription.next() => {
171 let Some(identify_message) = maybe_identify_message else {
172 return Err(anyhow!("Cache identify stream ended"));
173 };
174
175 let ClusterCacheIdentifyBroadcast { cluster_cache_id } = identify_message;
176
177 if known_caches.refresh(cluster_cache_id) {
178 trace!(
179 %cluster_cache_id,
180 "Received identification for already known cache"
181 );
182 } else if piece_caches_to_add.add_if_not_in_progress(cluster_cache_id, Box::pin(collect_piece_caches(cluster_cache_id, nats_client))) {
183 debug!(
184 %cluster_cache_id,
185 "Received identification for new cache, collecting piece caches"
186 );
187 } else {
188 debug!(
189 %cluster_cache_id,
190 "Received identification for new cache, which is already in progress"
191 );
192 }
193 }
194 (cluster_cache_id, maybe_piece_caches) = piece_caches_to_add.select_next_some() => {
195 let Ok(piece_caches) = maybe_piece_caches.inspect_err(|error| {
196 warn!(
197 %cluster_cache_id,
198 %error,
199 "Failed to collect piece caches to add, may retry later"
200 )
201 }) else {
202 continue;
203 };
204
205 info!(
206 %cluster_cache_id,
207 "New cache discovered, scheduling reinitialization"
208 );
209 scheduled_reinitialization_for.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY);
210
211 known_caches.add_cache(cluster_cache_id, piece_caches);
212 }
213 _ = cache_pruning_interval.tick().fuse() => {
214 let mut reinit = false;
215 for removed_cache in known_caches.remove_expired() {
216 reinit = true;
217
218 warn!(
219 cluster_cache_id = %removed_cache.cluster_cache_id,
220 "Cache expired and removed, scheduling reinitialization"
221 );
222 }
223
224 if reinit {
225 scheduled_reinitialization_for.replace(
226 Instant::now() + SCHEDULE_REINITIALIZATION_DELAY,
227 );
228 }
229 }
230 _ = cache_reinitialization => {
231 }
233 }
234 }
235}
236
237async fn collect_piece_caches(
240 cluster_cache_id: ClusterCacheId,
241 nats_client: &NatsClient,
242) -> anyhow::Result<Vec<Arc<ClusterPieceCache>>> {
243 let piece_caches = nats_client
244 .stream_request(
245 &ClusterCacheDetailsRequest,
246 Some(&cluster_cache_id.to_string()),
247 )
248 .await
249 .inspect_err(|error| {
250 warn!(
251 %error,
252 %cluster_cache_id,
253 "Failed to request cache details"
254 )
255 })?
256 .map(
257 |ClusterPieceCacheDetails {
258 piece_cache_id,
259 max_num_elements,
260 }| {
261 debug!(
262 %cluster_cache_id,
263 %piece_cache_id,
264 %max_num_elements,
265 "Discovered new piece cache"
266 );
267 Arc::new(ClusterPieceCache::new(
268 piece_cache_id,
269 max_num_elements,
270 nats_client.clone(),
271 ))
272 },
273 )
274 .collect()
275 .await;
276
277 Ok(piece_caches)
278}