subspace_farmer/cluster/controller/
caches.rs1use crate::cluster::cache::{
9 ClusterCacheDetailsRequest, ClusterCacheId, ClusterCacheIdentifyBroadcast, ClusterPieceCache,
10 ClusterPieceCacheDetails,
11};
12use crate::cluster::controller::stream_map::StreamMap;
13use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
14use crate::cluster::nats_client::NatsClient;
15use crate::farm::PieceCache;
16use crate::farmer_cache::FarmerCache;
17use anyhow::anyhow;
18use futures::channel::oneshot;
19use futures::future::FusedFuture;
20use futures::{select, FutureExt, StreamExt};
21use parking_lot::Mutex;
22use std::future::{ready, Future};
23use std::pin::{pin, Pin};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::time::MissedTickBehavior;
27use tracing::{debug, info, trace, warn};
28
29const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3);
30
31#[derive(Debug)]
32struct KnownCache {
33 cluster_cache_id: ClusterCacheId,
34 last_identification: Instant,
35 piece_caches: Vec<Arc<ClusterPieceCache>>,
36}
37
38#[derive(Debug)]
39struct KnownCaches {
40 identification_broadcast_interval: Duration,
41 known_caches: Vec<KnownCache>,
42}
43
44impl KnownCaches {
45 fn new(identification_broadcast_interval: Duration) -> Self {
46 Self {
47 identification_broadcast_interval,
48 known_caches: Vec::new(),
49 }
50 }
51
52 fn get_all(&self) -> Vec<Arc<dyn PieceCache>> {
53 self.known_caches
54 .iter()
55 .flat_map(|known_cache| {
56 known_cache
57 .piece_caches
58 .iter()
59 .map(|piece_cache| Arc::clone(piece_cache) as Arc<_>)
60 })
61 .collect()
62 }
63
64 #[must_use]
66 fn refresh(&mut self, cluster_cache_id: ClusterCacheId) -> bool {
67 self.known_caches.iter_mut().any(|known_cache| {
68 if known_cache.cluster_cache_id == cluster_cache_id {
69 trace!(%cluster_cache_id, "Updating last identification for cache");
70 known_cache.last_identification = Instant::now();
71 true
72 } else {
73 false
74 }
75 })
76 }
77
78 fn add_cache(
80 &mut self,
81 cluster_cache_id: ClusterCacheId,
82 piece_caches: Vec<Arc<ClusterPieceCache>>,
83 ) {
84 self.known_caches.push(KnownCache {
85 cluster_cache_id,
86 last_identification: Instant::now(),
87 piece_caches,
88 });
89 }
90
91 fn remove_expired(&mut self) -> impl Iterator<Item = KnownCache> + '_ {
92 self.known_caches.extract_if(.., |known_cache| {
93 known_cache.last_identification.elapsed() > self.identification_broadcast_interval * 2
94 })
95 }
96}
97
98pub async fn maintain_caches(
100 cache_group: &str,
101 nats_client: &NatsClient,
102 farmer_cache: &FarmerCache,
103 identification_broadcast_interval: Duration,
104) -> anyhow::Result<()> {
105 let mut known_caches = KnownCaches::new(identification_broadcast_interval);
106
107 let mut piece_caches_to_add = StreamMap::default();
108
109 let mut scheduled_reinitialization_for = None;
110 let mut cache_reinitialization =
111 (Box::pin(ready(())) as Pin<Box<dyn Future<Output = ()>>>).fuse();
112
113 let cache_identify_subscription = pin!(nats_client
114 .subscribe_to_broadcasts::<ClusterCacheIdentifyBroadcast>(Some(cache_group), None)
115 .await
116 .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?);
117
118 if let Err(error) = nats_client
120 .broadcast(&ClusterControllerCacheIdentifyBroadcast, cache_group)
121 .await
122 {
123 warn!(%error, "Failed to send cache identification broadcast");
124 }
125
126 let mut cache_identify_subscription = cache_identify_subscription.fuse();
127 let mut cache_pruning_interval = tokio::time::interval_at(
128 (Instant::now() + identification_broadcast_interval * 2).into(),
129 identification_broadcast_interval * 2,
130 );
131 cache_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
132
133 loop {
134 if cache_reinitialization.is_terminated()
135 && let Some(time) = scheduled_reinitialization_for
136 && time <= Instant::now()
137 {
138 scheduled_reinitialization_for.take();
139
140 let new_piece_caches = known_caches.get_all();
141 let new_cache_reinitialization = async {
142 let (sync_finish_sender, sync_finish_receiver) = oneshot::channel::<()>();
143 let sync_finish_sender = Mutex::new(Some(sync_finish_sender));
144
145 let _handler_id = farmer_cache.on_sync_progress(Arc::new(move |&progress| {
146 if progress == 100.0 {
147 if let Some(sync_finish_sender) = sync_finish_sender.lock().take() {
148 let _ = sync_finish_sender.send(());
150 }
151 }
152 }));
153
154 farmer_cache
155 .replace_backing_caches(new_piece_caches, Vec::new())
156 .await;
157
158 let _ = sync_finish_receiver.await;
161 };
162
163 cache_reinitialization =
164 (Box::pin(new_cache_reinitialization) as Pin<Box<dyn Future<Output = ()>>>).fuse();
165 }
166
167 select! {
168 maybe_identify_message = cache_identify_subscription.next() => {
169 let Some(identify_message) = maybe_identify_message else {
170 return Err(anyhow!("Cache identify stream ended"));
171 };
172
173 let ClusterCacheIdentifyBroadcast { cluster_cache_id } = identify_message;
174
175 if known_caches.refresh(cluster_cache_id) {
176 trace!(
177 %cluster_cache_id,
178 "Received identification for already known cache"
179 );
180 } else if piece_caches_to_add.add_if_not_in_progress(cluster_cache_id, Box::pin(collect_piece_caches(cluster_cache_id, nats_client))) {
181 debug!(
182 %cluster_cache_id,
183 "Received identification for new cache, collecting piece caches"
184 );
185 } else {
186 debug!(
187 %cluster_cache_id,
188 "Received identification for new cache, which is already in progress"
189 );
190 }
191 }
192 (cluster_cache_id, maybe_piece_caches) = piece_caches_to_add.select_next_some() => {
193 let Ok(piece_caches) = maybe_piece_caches.inspect_err(|error| {
194 warn!(
195 %cluster_cache_id,
196 %error,
197 "Failed to collect piece caches to add, may retry later"
198 )
199 }) else {
200 continue;
201 };
202
203 info!(
204 %cluster_cache_id,
205 "New cache discovered, scheduling reinitialization"
206 );
207 scheduled_reinitialization_for.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY);
208
209 known_caches.add_cache(cluster_cache_id, piece_caches);
210 }
211 _ = cache_pruning_interval.tick().fuse() => {
212 let mut reinit = false;
213 for removed_cache in known_caches.remove_expired() {
214 reinit = true;
215
216 warn!(
217 cluster_cache_id = %removed_cache.cluster_cache_id,
218 "Cache expired and removed, scheduling reinitialization"
219 );
220 }
221
222 if reinit {
223 scheduled_reinitialization_for.replace(
224 Instant::now() + SCHEDULE_REINITIALIZATION_DELAY,
225 );
226 }
227 }
228 _ = cache_reinitialization => {
229 }
231 }
232 }
233}
234
235async fn collect_piece_caches(
238 cluster_cache_id: ClusterCacheId,
239 nats_client: &NatsClient,
240) -> anyhow::Result<Vec<Arc<ClusterPieceCache>>> {
241 let piece_caches = nats_client
242 .stream_request(
243 &ClusterCacheDetailsRequest,
244 Some(&cluster_cache_id.to_string()),
245 )
246 .await
247 .inspect_err(|error| {
248 warn!(
249 %error,
250 %cluster_cache_id,
251 "Failed to request cache details"
252 )
253 })?
254 .map(
255 |ClusterPieceCacheDetails {
256 piece_cache_id,
257 max_num_elements,
258 }| {
259 debug!(
260 %cluster_cache_id,
261 %piece_cache_id,
262 %max_num_elements,
263 "Discovered new piece cache"
264 );
265 Arc::new(ClusterPieceCache::new(
266 piece_cache_id,
267 max_num_elements,
268 nats_client.clone(),
269 ))
270 },
271 )
272 .collect()
273 .await;
274
275 Ok(piece_caches)
276}