subspace_networking/behavior/
persistent_parameters.rs

1use crate::utils::{Handler, HandlerFn};
2use async_trait::async_trait;
3use event_listener_primitives::HandlerId;
4use fs2::FileExt;
5use futures::FutureExt;
6use futures::future::{Fuse, pending};
7use libp2p::multiaddr::Protocol;
8use libp2p::{Multiaddr, PeerId};
9use memmap2::{MmapMut, MmapOptions};
10use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
11use parking_lot::Mutex;
12use schnellru::{ByLength, LruMap};
13use std::collections::HashSet;
14use std::fs::OpenOptions;
15use std::io::{Read, Seek, SeekFrom};
16use std::path::Path;
17use std::pin::Pin;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21use std::{io, mem};
22use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash};
23use subspace_process::AsyncJoinOnDrop;
24use thiserror::Error;
25use tokio::time::{Sleep, sleep};
26use tracing::{debug, error, trace, warn};
27
28/// Defines optional time for address dial failure
29type FailureTime = Option<SystemTime>;
30
31/// Size of the LRU cache for peers.
32const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
33/// Size of the LRU cache for addresses of a single peer ID.
34const ADDRESSES_CACHE_SIZE: u32 = 30;
35/// Pause duration between network parameters save.
36const DATA_FLUSH_DURATION_SECS: u64 = 5;
37/// Defines an expiration period for the peer marked for the removal.
38const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
39/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
40const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
41/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
42const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
43
44/// Defines the event triggered when the peer address is removed from the permanent storage.
45#[derive(Debug, Clone)]
46pub struct PeerAddressRemovedEvent {
47    /// Peer ID
48    pub peer_id: PeerId,
49    /// Peer address
50    pub address: Multiaddr,
51}
52
53#[derive(Debug, Encode, Decode)]
54struct EncodableKnownPeerAddress {
55    multiaddr: Vec<u8>,
56    /// Failure time as Unix timestamp in seconds
57    failure_time: Option<u64>,
58}
59
60#[derive(Debug, Encode, Decode)]
61struct EncodableKnownPeers {
62    cache_size: u32,
63    timestamp: u64,
64    // Each entry is a tuple of peer ID + list of multiaddresses with corresponding failure time
65    known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
66}
67
68impl EncodableKnownPeers {
69    fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
70        let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
71
72        // Sort peers with the oldest expiration date first
73        self.known_peers
74            .sort_by_cached_key(|(_peer_id, addresses)| {
75                addresses.iter().fold(0u64, |acc, address| {
76                    acc.max(address.failure_time.unwrap_or(u64::MAX))
77                })
78            });
79
80        // Iterate over known peers with most recent failure time (or no failire time) first
81        'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
82            let mut peer_cache =
83                LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
84
85            let peer_id = match PeerId::from_bytes(&peer_id) {
86                Ok(peer_id) => peer_id,
87                Err(error) => {
88                    debug!(%error, "Failed to decode known peer ID, skipping peer entry");
89                    continue;
90                }
91            };
92            for address in addresses {
93                let multiaddr = match Multiaddr::try_from(address.multiaddr) {
94                    Ok(multiaddr) => multiaddr,
95                    Err(error) => {
96                        debug!(
97                            %error,
98                            "Failed to decode known peer multiaddress, skipping peer entry"
99                        );
100                        continue 'peers;
101                    }
102                };
103
104                peer_cache.insert(
105                    multiaddr,
106                    address.failure_time.map(|failure_time| {
107                        SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
108                    }),
109                );
110            }
111
112            peers_cache.insert(peer_id, peer_cache);
113        }
114
115        peers_cache
116    }
117
118    fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
119        let single_peer_encoded_address_size =
120            KnownPeersManager::single_peer_encoded_address_size();
121        Self {
122            cache_size,
123            timestamp: SystemTime::now()
124                .duration_since(SystemTime::UNIX_EPOCH)
125                .expect("Never before Unix epoch; qed")
126                .as_secs(),
127            known_peers: cache
128                .iter()
129                .map(|(peer_id, addresses)| {
130                    (
131                        peer_id.to_bytes(),
132                        addresses
133                            .iter()
134                            .filter_map(|(multiaddr, failure_time)| {
135                                let multiaddr_bytes = multiaddr.to_vec();
136
137                                if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
138                                {
139                                    // Skip unexpectedly large multiaddresses
140                                    debug!(
141                                        encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
142                                        limit = %single_peer_encoded_address_size,
143                                        ?multiaddr,
144                                        "Unexpectedly large multiaddress"
145                                    );
146                                    return None;
147                                }
148
149                                Some(EncodableKnownPeerAddress {
150                                    multiaddr: multiaddr_bytes,
151                                    failure_time: failure_time.map(|failure_time| {
152                                        failure_time
153                                            .duration_since(SystemTime::UNIX_EPOCH)
154                                            .expect("Never before Unix epoch; qed")
155                                            .as_secs()
156                                    }),
157                                })
158                            })
159                            .collect(),
160                    )
161                })
162                .collect(),
163        }
164    }
165}
166
167/// A/b slots with known peers where we write serialized known peers in one after another
168struct KnownPeersSlots {
169    a: MmapMut,
170    b: MmapMut,
171}
172
173impl KnownPeersSlots {
174    fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
175        let known_peers_bytes = encodable_known_peers.encode();
176        let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
177        encoded_bytes.copy_from_slice(&known_peers_bytes);
178        // Write checksum
179        remaining_bytes[..Blake3Hash::SIZE]
180            .copy_from_slice(blake3_hash(&known_peers_bytes).as_ref());
181        if let Err(error) = self.a.flush() {
182            warn!(%error, "Failed to flush known peers to disk");
183        }
184
185        // Swap slots such that we write into the opposite each time
186        mem::swap(&mut self.a, &mut self.b);
187    }
188}
189
190/// Defines operations with the networking parameters.
191#[async_trait]
192pub trait KnownPeersRegistry: Send + Sync {
193    /// Registers a peer ID and associated addresses
194    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
195
196    /// Unregisters associated addresses for peer ID.
197    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
198
199    /// Unregisters associated addresses for peer ID.
200    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
201
202    /// Returns all known peers and their addresses without P2P suffix at the end
203    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
204
205    /// Returns the number of known peers, and the number of addresses known for those peers.
206    fn count_known_peers(&mut self) -> (usize, usize);
207
208    /// Drive async work in the persistence provider
209    async fn run(&mut self);
210
211    /// Triggers when we removed the peer address from the permanent storage. Returns optional
212    /// event HandlerId. Option enables stub implementation. One of the usages is to notify
213    /// Kademlia about the expired(unreachable) address when it check for how long address was
214    /// unreachable.
215    fn on_unreachable_address(
216        &mut self,
217        handler: HandlerFn<PeerAddressRemovedEvent>,
218    ) -> Option<HandlerId>;
219}
220
221/// Networking manager implementation with NOOP implementation.
222#[derive(Clone, Default)]
223pub(crate) struct StubNetworkingParametersManager;
224
225impl StubNetworkingParametersManager {
226    /// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference.
227    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
228        Box::new(self)
229    }
230}
231
232#[async_trait]
233impl KnownPeersRegistry for StubNetworkingParametersManager {
234    async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
235
236    async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
237
238    fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
239
240    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
241        Vec::new()
242    }
243
244    fn count_known_peers(&mut self) -> (usize, usize) {
245        (0, 0)
246    }
247
248    async fn run(&mut self) {
249        // Never resolves
250        futures::future::pending().await
251    }
252
253    fn on_unreachable_address(
254        &mut self,
255        _handler: HandlerFn<PeerAddressRemovedEvent>,
256    ) -> Option<HandlerId> {
257        None
258    }
259}
260
261/// Configuration for [`KnownPeersManager`].
262#[derive(Debug, Clone)]
263pub struct KnownPeersManagerConfig {
264    /// Defines whether we return known peers in [`KnownPeersRegistry::all_known_peers()`]
265    pub enable_known_peers_source: bool,
266    /// Defines cache size.
267    pub cache_size: u32,
268    /// Peer ID list to filter on address adding.
269    pub ignore_peer_list: HashSet<PeerId>,
270    /// Defines whether we enable cache persistence.
271    pub path: Option<Box<Path>>,
272    /// Defines interval before the next peer address removes entry from the cache.
273    pub failed_address_cache_removal_interval: Duration,
274    /// Defines interval before the next peer address removal triggers [`PeerAddressRemovedEvent`].
275    pub failed_address_kademlia_removal_interval: Duration,
276    /// Amount of time after which stored known peers contents is assumed to be stale.
277    pub stale_known_peers_timeout: Duration,
278}
279
280impl Default for KnownPeersManagerConfig {
281    fn default() -> Self {
282        Self {
283            enable_known_peers_source: true,
284            cache_size: KNOWN_PEERS_CACHE_SIZE,
285            ignore_peer_list: Default::default(),
286            path: None,
287            failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
288            failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
289            stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
290        }
291    }
292}
293
294/// Networking parameters persistence errors.
295#[derive(Debug, Error)]
296pub enum KnownPeersManagerPersistenceError {
297    /// I/O error.
298    #[error("I/O error: {0}")]
299    Io(#[from] io::Error),
300    /// Can't preallocate known peers file, probably not enough space on disk
301    #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
302    CantPreallocateKnownPeersFile(io::Error),
303}
304
305/// Handles networking parameters. It manages network parameters set and its persistence.
306pub struct KnownPeersManager {
307    /// Defines whether the cache requires saving to DB
308    cache_need_saving: bool,
309    /// LRU cache for the known peers and their addresses
310    known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
311    /// Period between networking parameters saves.
312    networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
313    /// Slots backed by file that store known peers
314    known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
315    /// Event handler triggered when we decide to remove address from the storage.
316    address_removed: Handler<PeerAddressRemovedEvent>,
317    /// Defines configuration.
318    config: KnownPeersManagerConfig,
319}
320
321impl Drop for KnownPeersManager {
322    fn drop(&mut self) {
323        if self.cache_need_saving
324            && let Some(known_peers_slots) = &self.known_peers_slots
325        {
326            known_peers_slots
327                .lock()
328                .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
329                    &self.known_peers,
330                    self.config.cache_size,
331                ));
332        }
333    }
334}
335
336impl KnownPeersManager {
337    fn init_file(
338        path: &Path,
339        cache_size: u32,
340    ) -> Result<
341        (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
342        KnownPeersManagerPersistenceError,
343    > {
344        let mut file = OpenOptions::new()
345            .read(true)
346            .write(true)
347            .create(true)
348            .truncate(false)
349            .open(path)?;
350
351        let known_addresses_size = Self::known_addresses_size(cache_size);
352        let file_size = Self::file_size(cache_size);
353        // Try reading existing encoded known peers from file
354        let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
355
356        {
357            let mut file_contents = Vec::with_capacity(file_size);
358            file.read_to_end(&mut file_contents)?;
359            if !file_contents.is_empty() {
360                for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
361                    let known_addresses =
362                        match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
363                            Ok(known_addresses) => known_addresses,
364                            Err(error) => {
365                                debug!(%error, "Failed to decode encodable known peers");
366                                continue;
367                            }
368                        };
369
370                    let (encoded_bytes, remaining_bytes) =
371                        known_addresses_bytes.split_at(known_addresses.encoded_size());
372                    if remaining_bytes.len() < Blake3Hash::SIZE {
373                        debug!(
374                            remaining_bytes = %remaining_bytes.len(),
375                            "Not enough bytes to decode checksum, file was likely corrupted"
376                        );
377                        continue;
378                    }
379
380                    // Verify checksum
381                    let actual_checksum = blake3_hash(encoded_bytes);
382                    let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
383                    if *actual_checksum != *expected_checksum {
384                        debug!(
385                            encoded_bytes_len = %encoded_bytes.len(),
386                            actual_checksum = %hex::encode(actual_checksum),
387                            expected_checksum = %hex::encode(expected_checksum),
388                            "Hash doesn't match, possible disk corruption or file was just \
389                            created, ignoring"
390                        );
391                        continue;
392                    }
393
394                    match &mut maybe_newest_known_addresses {
395                        Some(newest_known_addresses) => {
396                            if newest_known_addresses.timestamp < known_addresses.timestamp {
397                                *newest_known_addresses = known_addresses;
398                            }
399                        }
400                        None => {
401                            maybe_newest_known_addresses.replace(known_addresses);
402                        }
403                    }
404                }
405            }
406        }
407
408        // *2 because we have a/b parts of the file
409        let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
410            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
411            // writes to fail later)
412            file.allocate(file_size as u64)
413                .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
414            // Truncating file (if necessary)
415            file.set_len(file_size as u64)?;
416            true
417        } else {
418            false
419        };
420
421        let mut a_mmap = unsafe {
422            MmapOptions::new()
423                .len(known_addresses_size)
424                .map_mut(&file)?
425        };
426        let mut b_mmap = unsafe {
427            MmapOptions::new()
428                .offset(known_addresses_size as u64)
429                .len(known_addresses_size)
430                .map_mut(&file)?
431        };
432
433        if file_resized {
434            // File might have been resized, write current known addresses into it
435            if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
436                let bytes = newest_known_addresses.encode();
437                a_mmap[..bytes.len()].copy_from_slice(&bytes);
438                a_mmap.flush()?;
439                b_mmap[..bytes.len()].copy_from_slice(&bytes);
440                b_mmap.flush()?;
441            }
442        }
443
444        let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
445            a: a_mmap,
446            b: b_mmap,
447        }));
448
449        Ok((maybe_newest_known_addresses, known_peers_slots))
450    }
451
452    /// Object constructor.
453    pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
454        let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
455            Self::init_file(path, config.cache_size)
456                .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
457        } else {
458            (None, None)
459        };
460
461        let known_peers = maybe_newest_known_addresses
462            .filter(|newest_known_addresses| {
463                let time_since_unix_epoch = SystemTime::now()
464                    .duration_since(SystemTime::UNIX_EPOCH)
465                    .expect("Never before Unix epoch; qed");
466                let known_peers_age = time_since_unix_epoch
467                    .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
468
469                known_peers_age <= config.stale_known_peers_timeout
470            })
471            .map(EncodableKnownPeers::into_cache)
472            .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
473
474        Ok(Self {
475            cache_need_saving: false,
476            known_peers,
477            networking_parameters_save_delay: Self::default_delay(),
478            known_peers_slots,
479            address_removed: Default::default(),
480            config,
481        })
482    }
483
484    /// Size of the backing file on disk
485    pub fn file_size(cache_size: u32) -> usize {
486        // *2 because we have a/b parts of the file
487        Self::known_addresses_size(cache_size) * 2
488    }
489
490    /// Creates a reference to the `NetworkingParametersRegistry` trait implementation.
491    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
492        Box::new(self)
493    }
494
495    // Create default delay for networking parameters.
496    fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
497        Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
498    }
499
500    fn single_peer_encoded_address_size() -> usize {
501        let multiaddr = Multiaddr::from_str(
502            "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
503        )
504        .expect("Valid multiaddr; qed");
505        // Use multiaddr size that is 3x larger than typical, should be enough for most practical
506        // cases
507        multiaddr.to_vec().encoded_size() * 3
508    }
509
510    /// Size of single peer known addresses, this is an estimate and in some pathological cases peer
511    /// will have to be rejected if encoding exceeds this length.
512    fn single_peer_encoded_size() -> usize {
513        // Peer ID encoding + compact encoding of the length of list of addresses + (length of a
514        // single peer address entry + optional failure time) * number of entries
515        PeerId::random().to_bytes().encoded_size()
516            + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
517            + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
518                * ADDRESSES_CACHE_SIZE as usize
519    }
520
521    /// Size of known addresses and accompanying metadata.
522    ///
523    /// NOTE: This is max size that needs to be allocated on disk for successful write of a single
524    /// `known_addresses` copy, the actual written data can occupy only a part of this size
525    fn known_addresses_size(cache_size: u32) -> usize {
526        // Timestamp (when was written) + compact encoding of the length of peer records + peer
527        // records + checksum
528        mem::size_of::<u64>()
529            + Compact::compact_len(&(cache_size))
530            + Self::single_peer_encoded_size() * cache_size as usize
531            + Blake3Hash::SIZE
532    }
533
534    fn persistent_enabled(&self) -> bool {
535        self.config.path.is_some()
536    }
537
538    #[cfg(test)]
539    pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
540        self.known_peers
541            .peek(peer_id)
542            .map(|addresses| addresses.peek(address).is_some())
543            .unwrap_or_default()
544    }
545}
546
547#[async_trait]
548impl KnownPeersRegistry for KnownPeersManager {
549    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
550        if self.config.ignore_peer_list.contains(&peer_id) {
551            debug!(
552                %peer_id,
553                addr_num=addresses.len(),
554                "Adding new peer addresses canceled (ignore list): {:?}",
555                addresses
556            );
557
558            return;
559        }
560
561        debug!(
562            %peer_id,
563            addr_num=addresses.len(),
564            "Add new peer addresses to the networking parameters registry: {:?}",
565            addresses
566        );
567
568        addresses
569            .iter()
570            .filter(|addr| {
571                // filter Memory addresses
572                !addr
573                    .into_iter()
574                    .any(|protocol| matches!(protocol, Protocol::Memory(..)))
575            })
576            .cloned()
577            .map(remove_p2p_suffix)
578            .for_each(|addr| {
579                // Add new address cache if it doesn't exist previously.
580                self.known_peers
581                    .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
582
583                if let Some(addresses) = self.known_peers.get(&peer_id) {
584                    let previous_entry = addresses.peek(&addr).cloned().flatten();
585                    addresses.insert(addr, None);
586                    if let Some(previous_entry) = previous_entry {
587                        trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
588                    }
589                }
590            });
591
592        self.cache_need_saving = true;
593    }
594
595    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
596        trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
597
598        let removed_addresses = remove_known_peer_addresses_internal(
599            &mut self.known_peers,
600            peer_id,
601            addresses,
602            self.config.failed_address_cache_removal_interval,
603            self.config.failed_address_kademlia_removal_interval,
604        );
605
606        for event in removed_addresses {
607            self.address_removed.call_simple(&event);
608        }
609
610        self.cache_need_saving = true;
611    }
612
613    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
614        trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
615
616        self.known_peers.remove(&peer_id);
617
618        self.cache_need_saving = true;
619    }
620
621    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
622        if !self.config.enable_known_peers_source {
623            return Vec::new();
624        }
625
626        self.known_peers
627            .iter()
628            .map(|(&peer_id, addresses)| {
629                (
630                    peer_id,
631                    addresses
632                        .iter()
633                        .map(|(addr, _failure_time)| addr.clone())
634                        .collect(),
635                )
636            })
637            .collect()
638    }
639
640    fn count_known_peers(&mut self) -> (usize, usize) {
641        if !self.config.enable_known_peers_source {
642            return (0, 0);
643        }
644
645        (
646            self.known_peers.len(),
647            self.known_peers
648                .iter()
649                .map(|(_peer, addresses)| addresses.len())
650                .sum(),
651        )
652    }
653
654    async fn run(&mut self) {
655        if !self.persistent_enabled() {
656            pending().await
657        }
658
659        loop {
660            (&mut self.networking_parameters_save_delay).await;
661
662            if let Some(known_peers_slots) = &self.known_peers_slots
663                && self.cache_need_saving
664            {
665                let known_peers =
666                    EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
667                let known_peers_slots = Arc::clone(known_peers_slots);
668                let write_known_peers_fut = AsyncJoinOnDrop::new(
669                    tokio::task::spawn_blocking(move || {
670                        known_peers_slots
671                            .lock()
672                            .write_to_inactive_slot(&known_peers);
673                    }),
674                    // Abort has no effect on spawn_blocking tasks
675                    false,
676                );
677
678                if let Err(error) = write_known_peers_fut.await {
679                    error!(%error, "Failed to write known peers");
680                }
681
682                self.cache_need_saving = false;
683            }
684            self.networking_parameters_save_delay = KnownPeersManager::default_delay();
685        }
686    }
687
688    fn on_unreachable_address(
689        &mut self,
690        handler: HandlerFn<PeerAddressRemovedEvent>,
691    ) -> Option<HandlerId> {
692        let handler_id = self.address_removed.add(handler);
693
694        Some(handler_id)
695    }
696}
697
698/// Removes a P2p protocol suffix from the multiaddress if any.
699pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
700    let last_protocol = address.pop();
701
702    if let Some(Protocol::P2p(_)) = &last_protocol {
703        return address;
704    }
705
706    if let Some(protocol) = last_protocol {
707        address.push(protocol)
708    }
709
710    address
711}
712
713/// Appends a P2p protocol suffix to the multiaddress if required.
714pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
715    let last_protocol = address.pop();
716
717    if let Some(protocol) = last_protocol
718        && !matches!(protocol, Protocol::P2p(..))
719    {
720        address.push(protocol)
721    }
722    address.push(Protocol::P2p(peer_id));
723
724    address
725}
726
727// Testable implementation of the `remove_known_peer_addresses`
728pub(super) fn remove_known_peer_addresses_internal(
729    known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
730    peer_id: PeerId,
731    addresses: Vec<Multiaddr>,
732    expired_address_duration_persistent_storage: Duration,
733    expired_address_duration_kademlia: Duration,
734) -> Vec<PeerAddressRemovedEvent> {
735    let mut address_removed_events = Vec::new();
736    let now = SystemTime::now();
737
738    addresses
739        .into_iter()
740        .map(remove_p2p_suffix)
741        .for_each(|addr| {
742            // if peer_id is present in the cache
743            if let Some(addresses) = known_peers.peek_mut(&peer_id) {
744                let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
745                // Get mutable reference to first_failed_time for the address without updating
746                // the item's position in the cache
747                if let Some(first_failed_time) = addresses.peek_mut(&addr) {
748                    // if we failed previously with this address
749                    if let Some(time) = first_failed_time {
750                        // if we failed first time more than an hour ago (for Kademlia)
751                        if *time + expired_address_duration_kademlia < now {
752                            let address_removed = PeerAddressRemovedEvent {
753                                peer_id,
754                                address: addr.clone(),
755                            };
756
757                            address_removed_events.push(address_removed);
758
759                            trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
760                        }
761
762                        // if we failed first time more than a day ago (for persistent cache)
763                        if *time + expired_address_duration_persistent_storage < now {
764                            // Remove a failed address
765                            addresses.remove(&addr);
766
767                            // If the last address for peer
768                            if last_address {
769                                known_peers.remove(&peer_id);
770
771                                trace!(%peer_id, "Peer removed from the cache");
772                            }
773
774                            trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
775                        } else {
776                            trace!(
777                                %peer_id, "Saving failed connection attempt to a peer: {:?}",
778                                addr
779                            );
780                        }
781                    } else {
782                        // Set failure time
783                        first_failed_time.replace(now);
784
785                        trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
786                    }
787                }
788            }
789        });
790
791    address_removed_events
792}