subspace_networking/behavior/
persistent_parameters.rs

1use crate::utils::{AsyncJoinOnDrop, Handler, HandlerFn};
2use async_trait::async_trait;
3use event_listener_primitives::HandlerId;
4use fs2::FileExt;
5use futures::FutureExt;
6use futures::future::{Fuse, pending};
7use libp2p::multiaddr::Protocol;
8use libp2p::{Multiaddr, PeerId};
9use memmap2::{MmapMut, MmapOptions};
10use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
11use parking_lot::Mutex;
12use schnellru::{ByLength, LruMap};
13use std::collections::HashSet;
14use std::fs::OpenOptions;
15use std::io::{Read, Seek, SeekFrom};
16use std::path::Path;
17use std::pin::Pin;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21use std::{io, mem};
22use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash};
23use thiserror::Error;
24use tokio::time::{Sleep, sleep};
25use tracing::{debug, error, trace, warn};
26
27/// Defines optional time for address dial failure
28type FailureTime = Option<SystemTime>;
29
30/// Size of the LRU cache for peers.
31const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
32/// Size of the LRU cache for addresses of a single peer ID.
33const ADDRESSES_CACHE_SIZE: u32 = 30;
34/// Pause duration between network parameters save.
35const DATA_FLUSH_DURATION_SECS: u64 = 5;
36/// Defines an expiration period for the peer marked for the removal.
37const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
38/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
39const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
40/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
41const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
42
43/// Defines the event triggered when the peer address is removed from the permanent storage.
44#[derive(Debug, Clone)]
45pub struct PeerAddressRemovedEvent {
46    /// Peer ID
47    pub peer_id: PeerId,
48    /// Peer address
49    pub address: Multiaddr,
50}
51
52#[derive(Debug, Encode, Decode)]
53struct EncodableKnownPeerAddress {
54    multiaddr: Vec<u8>,
55    /// Failure time as Unix timestamp in seconds
56    failure_time: Option<u64>,
57}
58
59#[derive(Debug, Encode, Decode)]
60struct EncodableKnownPeers {
61    cache_size: u32,
62    timestamp: u64,
63    // Each entry is a tuple of peer ID + list of multiaddresses with corresponding failure time
64    known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
65}
66
67impl EncodableKnownPeers {
68    fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
69        let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
70
71        // Sort peers with the oldest expiration date first
72        self.known_peers
73            .sort_by_cached_key(|(_peer_id, addresses)| {
74                addresses.iter().fold(0u64, |acc, address| {
75                    acc.max(address.failure_time.unwrap_or(u64::MAX))
76                })
77            });
78
79        // Iterate over known peers with most recent failure time (or no failire time) first
80        'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
81            let mut peer_cache =
82                LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
83
84            let peer_id = match PeerId::from_bytes(&peer_id) {
85                Ok(peer_id) => peer_id,
86                Err(error) => {
87                    debug!(%error, "Failed to decode known peer ID, skipping peer entry");
88                    continue;
89                }
90            };
91            for address in addresses {
92                let multiaddr = match Multiaddr::try_from(address.multiaddr) {
93                    Ok(multiaddr) => multiaddr,
94                    Err(error) => {
95                        debug!(
96                            %error,
97                            "Failed to decode known peer multiaddress, skipping peer entry"
98                        );
99                        continue 'peers;
100                    }
101                };
102
103                peer_cache.insert(
104                    multiaddr,
105                    address.failure_time.map(|failure_time| {
106                        SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
107                    }),
108                );
109            }
110
111            peers_cache.insert(peer_id, peer_cache);
112        }
113
114        peers_cache
115    }
116
117    fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
118        let single_peer_encoded_address_size =
119            KnownPeersManager::single_peer_encoded_address_size();
120        Self {
121            cache_size,
122            timestamp: SystemTime::now()
123                .duration_since(SystemTime::UNIX_EPOCH)
124                .expect("Never before Unix epoch; qed")
125                .as_secs(),
126            known_peers: cache
127                .iter()
128                .map(|(peer_id, addresses)| {
129                    (
130                        peer_id.to_bytes(),
131                        addresses
132                            .iter()
133                            .filter_map(|(multiaddr, failure_time)| {
134                                let multiaddr_bytes = multiaddr.to_vec();
135
136                                if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
137                                {
138                                    // Skip unexpectedly large multiaddresses
139                                    debug!(
140                                        encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
141                                        limit = %single_peer_encoded_address_size,
142                                        ?multiaddr,
143                                        "Unexpectedly large multiaddress"
144                                    );
145                                    return None;
146                                }
147
148                                Some(EncodableKnownPeerAddress {
149                                    multiaddr: multiaddr_bytes,
150                                    failure_time: failure_time.map(|failure_time| {
151                                        failure_time
152                                            .duration_since(SystemTime::UNIX_EPOCH)
153                                            .expect("Never before Unix epoch; qed")
154                                            .as_secs()
155                                    }),
156                                })
157                            })
158                            .collect(),
159                    )
160                })
161                .collect(),
162        }
163    }
164}
165
166/// A/b slots with known peers where we write serialized known peers in one after another
167struct KnownPeersSlots {
168    a: MmapMut,
169    b: MmapMut,
170}
171
172impl KnownPeersSlots {
173    fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
174        let known_peers_bytes = encodable_known_peers.encode();
175        let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
176        encoded_bytes.copy_from_slice(&known_peers_bytes);
177        // Write checksum
178        remaining_bytes[..Blake3Hash::SIZE]
179            .copy_from_slice(blake3_hash(&known_peers_bytes).as_ref());
180        if let Err(error) = self.a.flush() {
181            warn!(%error, "Failed to flush known peers to disk");
182        }
183
184        // Swap slots such that we write into the opposite each time
185        mem::swap(&mut self.a, &mut self.b);
186    }
187}
188
189/// Defines operations with the networking parameters.
190#[async_trait]
191pub trait KnownPeersRegistry: Send + Sync {
192    /// Registers a peer ID and associated addresses
193    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
194
195    /// Unregisters associated addresses for peer ID.
196    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
197
198    /// Unregisters associated addresses for peer ID.
199    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
200
201    /// Returns all known peers and their addresses without P2P suffix at the end
202    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
203
204    /// Returns the number of known peers, and the number of addresses known for those peers.
205    fn count_known_peers(&mut self) -> (usize, usize);
206
207    /// Drive async work in the persistence provider
208    async fn run(&mut self);
209
210    /// Triggers when we removed the peer address from the permanent storage. Returns optional
211    /// event HandlerId. Option enables stub implementation. One of the usages is to notify
212    /// Kademlia about the expired(unreachable) address when it check for how long address was
213    /// unreachable.
214    fn on_unreachable_address(
215        &mut self,
216        handler: HandlerFn<PeerAddressRemovedEvent>,
217    ) -> Option<HandlerId>;
218}
219
220/// Networking manager implementation with NOOP implementation.
221#[derive(Clone, Default)]
222pub(crate) struct StubNetworkingParametersManager;
223
224impl StubNetworkingParametersManager {
225    /// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference.
226    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
227        Box::new(self)
228    }
229}
230
231#[async_trait]
232impl KnownPeersRegistry for StubNetworkingParametersManager {
233    async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
234
235    async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
236
237    fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
238
239    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
240        Vec::new()
241    }
242
243    fn count_known_peers(&mut self) -> (usize, usize) {
244        (0, 0)
245    }
246
247    async fn run(&mut self) {
248        // Never resolves
249        futures::future::pending().await
250    }
251
252    fn on_unreachable_address(
253        &mut self,
254        _handler: HandlerFn<PeerAddressRemovedEvent>,
255    ) -> Option<HandlerId> {
256        None
257    }
258}
259
260/// Configuration for [`KnownPeersManager`].
261#[derive(Debug, Clone)]
262pub struct KnownPeersManagerConfig {
263    /// Defines whether we return known peers in [`KnownPeersRegistry::all_known_peers()`]
264    pub enable_known_peers_source: bool,
265    /// Defines cache size.
266    pub cache_size: u32,
267    /// Peer ID list to filter on address adding.
268    pub ignore_peer_list: HashSet<PeerId>,
269    /// Defines whether we enable cache persistence.
270    pub path: Option<Box<Path>>,
271    /// Defines interval before the next peer address removes entry from the cache.
272    pub failed_address_cache_removal_interval: Duration,
273    /// Defines interval before the next peer address removal triggers [`PeerAddressRemovedEvent`].
274    pub failed_address_kademlia_removal_interval: Duration,
275    /// Amount of time after which stored known peers contents is assumed to be stale.
276    pub stale_known_peers_timeout: Duration,
277}
278
279impl Default for KnownPeersManagerConfig {
280    fn default() -> Self {
281        Self {
282            enable_known_peers_source: true,
283            cache_size: KNOWN_PEERS_CACHE_SIZE,
284            ignore_peer_list: Default::default(),
285            path: None,
286            failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
287            failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
288            stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
289        }
290    }
291}
292
293/// Networking parameters persistence errors.
294#[derive(Debug, Error)]
295pub enum KnownPeersManagerPersistenceError {
296    /// I/O error.
297    #[error("I/O error: {0}")]
298    Io(#[from] io::Error),
299    /// Can't preallocate known peers file, probably not enough space on disk
300    #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
301    CantPreallocateKnownPeersFile(io::Error),
302}
303
304/// Handles networking parameters. It manages network parameters set and its persistence.
305pub struct KnownPeersManager {
306    /// Defines whether the cache requires saving to DB
307    cache_need_saving: bool,
308    /// LRU cache for the known peers and their addresses
309    known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
310    /// Period between networking parameters saves.
311    networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
312    /// Slots backed by file that store known peers
313    known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
314    /// Event handler triggered when we decide to remove address from the storage.
315    address_removed: Handler<PeerAddressRemovedEvent>,
316    /// Defines configuration.
317    config: KnownPeersManagerConfig,
318}
319
320impl Drop for KnownPeersManager {
321    fn drop(&mut self) {
322        if self.cache_need_saving
323            && let Some(known_peers_slots) = &self.known_peers_slots
324        {
325            known_peers_slots
326                .lock()
327                .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
328                    &self.known_peers,
329                    self.config.cache_size,
330                ));
331        }
332    }
333}
334
335impl KnownPeersManager {
336    fn init_file(
337        path: &Path,
338        cache_size: u32,
339    ) -> Result<
340        (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
341        KnownPeersManagerPersistenceError,
342    > {
343        let mut file = OpenOptions::new()
344            .read(true)
345            .write(true)
346            .create(true)
347            .truncate(false)
348            .open(path)?;
349
350        let known_addresses_size = Self::known_addresses_size(cache_size);
351        let file_size = Self::file_size(cache_size);
352        // Try reading existing encoded known peers from file
353        let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
354
355        {
356            let mut file_contents = Vec::with_capacity(file_size);
357            file.read_to_end(&mut file_contents)?;
358            if !file_contents.is_empty() {
359                for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
360                    let known_addresses =
361                        match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
362                            Ok(known_addresses) => known_addresses,
363                            Err(error) => {
364                                debug!(%error, "Failed to decode encodable known peers");
365                                continue;
366                            }
367                        };
368
369                    let (encoded_bytes, remaining_bytes) =
370                        known_addresses_bytes.split_at(known_addresses.encoded_size());
371                    if remaining_bytes.len() < Blake3Hash::SIZE {
372                        debug!(
373                            remaining_bytes = %remaining_bytes.len(),
374                            "Not enough bytes to decode checksum, file was likely corrupted"
375                        );
376                        continue;
377                    }
378
379                    // Verify checksum
380                    let actual_checksum = blake3_hash(encoded_bytes);
381                    let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
382                    if *actual_checksum != *expected_checksum {
383                        debug!(
384                            encoded_bytes_len = %encoded_bytes.len(),
385                            actual_checksum = %hex::encode(actual_checksum),
386                            expected_checksum = %hex::encode(expected_checksum),
387                            "Hash doesn't match, possible disk corruption or file was just \
388                            created, ignoring"
389                        );
390                        continue;
391                    }
392
393                    match &mut maybe_newest_known_addresses {
394                        Some(newest_known_addresses) => {
395                            if newest_known_addresses.timestamp < known_addresses.timestamp {
396                                *newest_known_addresses = known_addresses;
397                            }
398                        }
399                        None => {
400                            maybe_newest_known_addresses.replace(known_addresses);
401                        }
402                    }
403                }
404            }
405        }
406
407        // *2 because we have a/b parts of the file
408        let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
409            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
410            // writes to fail later)
411            file.allocate(file_size as u64)
412                .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
413            // Truncating file (if necessary)
414            file.set_len(file_size as u64)?;
415            true
416        } else {
417            false
418        };
419
420        let mut a_mmap = unsafe {
421            MmapOptions::new()
422                .len(known_addresses_size)
423                .map_mut(&file)?
424        };
425        let mut b_mmap = unsafe {
426            MmapOptions::new()
427                .offset(known_addresses_size as u64)
428                .len(known_addresses_size)
429                .map_mut(&file)?
430        };
431
432        if file_resized {
433            // File might have been resized, write current known addresses into it
434            if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
435                let bytes = newest_known_addresses.encode();
436                a_mmap[..bytes.len()].copy_from_slice(&bytes);
437                a_mmap.flush()?;
438                b_mmap[..bytes.len()].copy_from_slice(&bytes);
439                b_mmap.flush()?;
440            }
441        }
442
443        let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
444            a: a_mmap,
445            b: b_mmap,
446        }));
447
448        Ok((maybe_newest_known_addresses, known_peers_slots))
449    }
450
451    /// Object constructor.
452    pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
453        let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
454            Self::init_file(path, config.cache_size)
455                .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
456        } else {
457            (None, None)
458        };
459
460        let known_peers = maybe_newest_known_addresses
461            .filter(|newest_known_addresses| {
462                let time_since_unix_epoch = SystemTime::now()
463                    .duration_since(SystemTime::UNIX_EPOCH)
464                    .expect("Never before Unix epoch; qed");
465                let known_peers_age = time_since_unix_epoch
466                    .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
467
468                known_peers_age <= config.stale_known_peers_timeout
469            })
470            .map(EncodableKnownPeers::into_cache)
471            .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
472
473        Ok(Self {
474            cache_need_saving: false,
475            known_peers,
476            networking_parameters_save_delay: Self::default_delay(),
477            known_peers_slots,
478            address_removed: Default::default(),
479            config,
480        })
481    }
482
483    /// Size of the backing file on disk
484    pub fn file_size(cache_size: u32) -> usize {
485        // *2 because we have a/b parts of the file
486        Self::known_addresses_size(cache_size) * 2
487    }
488
489    /// Creates a reference to the `NetworkingParametersRegistry` trait implementation.
490    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
491        Box::new(self)
492    }
493
494    // Create default delay for networking parameters.
495    fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
496        Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
497    }
498
499    fn single_peer_encoded_address_size() -> usize {
500        let multiaddr = Multiaddr::from_str(
501            "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
502        )
503        .expect("Valid multiaddr; qed");
504        // Use multiaddr size that is 3x larger than typical, should be enough for most practical
505        // cases
506        multiaddr.to_vec().encoded_size() * 3
507    }
508
509    /// Size of single peer known addresses, this is an estimate and in some pathological cases peer
510    /// will have to be rejected if encoding exceeds this length.
511    fn single_peer_encoded_size() -> usize {
512        // Peer ID encoding + compact encoding of the length of list of addresses + (length of a
513        // single peer address entry + optional failure time) * number of entries
514        PeerId::random().to_bytes().encoded_size()
515            + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
516            + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
517                * ADDRESSES_CACHE_SIZE as usize
518    }
519
520    /// Size of known addresses and accompanying metadata.
521    ///
522    /// NOTE: This is max size that needs to be allocated on disk for successful write of a single
523    /// `known_addresses` copy, the actual written data can occupy only a part of this size
524    fn known_addresses_size(cache_size: u32) -> usize {
525        // Timestamp (when was written) + compact encoding of the length of peer records + peer
526        // records + checksum
527        mem::size_of::<u64>()
528            + Compact::compact_len(&(cache_size))
529            + Self::single_peer_encoded_size() * cache_size as usize
530            + Blake3Hash::SIZE
531    }
532
533    fn persistent_enabled(&self) -> bool {
534        self.config.path.is_some()
535    }
536
537    #[cfg(test)]
538    pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
539        self.known_peers
540            .peek(peer_id)
541            .map(|addresses| addresses.peek(address).is_some())
542            .unwrap_or_default()
543    }
544}
545
546#[async_trait]
547impl KnownPeersRegistry for KnownPeersManager {
548    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
549        if self.config.ignore_peer_list.contains(&peer_id) {
550            debug!(
551                %peer_id,
552                addr_num=addresses.len(),
553                "Adding new peer addresses canceled (ignore list): {:?}",
554                addresses
555            );
556
557            return;
558        }
559
560        debug!(
561            %peer_id,
562            addr_num=addresses.len(),
563            "Add new peer addresses to the networking parameters registry: {:?}",
564            addresses
565        );
566
567        addresses
568            .iter()
569            .filter(|addr| {
570                // filter Memory addresses
571                !addr
572                    .into_iter()
573                    .any(|protocol| matches!(protocol, Protocol::Memory(..)))
574            })
575            .cloned()
576            .map(remove_p2p_suffix)
577            .for_each(|addr| {
578                // Add new address cache if it doesn't exist previously.
579                self.known_peers
580                    .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
581
582                if let Some(addresses) = self.known_peers.get(&peer_id) {
583                    let previous_entry = addresses.peek(&addr).cloned().flatten();
584                    addresses.insert(addr, None);
585                    if let Some(previous_entry) = previous_entry {
586                        trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
587                    }
588                }
589            });
590
591        self.cache_need_saving = true;
592    }
593
594    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
595        trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
596
597        let removed_addresses = remove_known_peer_addresses_internal(
598            &mut self.known_peers,
599            peer_id,
600            addresses,
601            self.config.failed_address_cache_removal_interval,
602            self.config.failed_address_kademlia_removal_interval,
603        );
604
605        for event in removed_addresses {
606            self.address_removed.call_simple(&event);
607        }
608
609        self.cache_need_saving = true;
610    }
611
612    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
613        trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
614
615        self.known_peers.remove(&peer_id);
616
617        self.cache_need_saving = true;
618    }
619
620    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
621        if !self.config.enable_known_peers_source {
622            return Vec::new();
623        }
624
625        self.known_peers
626            .iter()
627            .map(|(&peer_id, addresses)| {
628                (
629                    peer_id,
630                    addresses
631                        .iter()
632                        .map(|(addr, _failure_time)| addr.clone())
633                        .collect(),
634                )
635            })
636            .collect()
637    }
638
639    fn count_known_peers(&mut self) -> (usize, usize) {
640        if !self.config.enable_known_peers_source {
641            return (0, 0);
642        }
643
644        (
645            self.known_peers.len(),
646            self.known_peers
647                .iter()
648                .map(|(_peer, addresses)| addresses.len())
649                .sum(),
650        )
651    }
652
653    async fn run(&mut self) {
654        if !self.persistent_enabled() {
655            pending().await
656        }
657
658        loop {
659            (&mut self.networking_parameters_save_delay).await;
660
661            if let Some(known_peers_slots) = &self.known_peers_slots
662                && self.cache_need_saving
663            {
664                let known_peers =
665                    EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
666                let known_peers_slots = Arc::clone(known_peers_slots);
667                let write_known_peers_fut =
668                    AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || {
669                        known_peers_slots
670                            .lock()
671                            .write_to_inactive_slot(&known_peers);
672                    }));
673
674                if let Err(error) = write_known_peers_fut.await {
675                    error!(%error, "Failed to write known peers");
676                }
677
678                self.cache_need_saving = false;
679            }
680            self.networking_parameters_save_delay = KnownPeersManager::default_delay();
681        }
682    }
683
684    fn on_unreachable_address(
685        &mut self,
686        handler: HandlerFn<PeerAddressRemovedEvent>,
687    ) -> Option<HandlerId> {
688        let handler_id = self.address_removed.add(handler);
689
690        Some(handler_id)
691    }
692}
693
694/// Removes a P2p protocol suffix from the multiaddress if any.
695pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
696    let last_protocol = address.pop();
697
698    if let Some(Protocol::P2p(_)) = &last_protocol {
699        return address;
700    }
701
702    if let Some(protocol) = last_protocol {
703        address.push(protocol)
704    }
705
706    address
707}
708
709/// Appends a P2p protocol suffix to the multiaddress if required.
710pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
711    let last_protocol = address.pop();
712
713    if let Some(protocol) = last_protocol
714        && !matches!(protocol, Protocol::P2p(..))
715    {
716        address.push(protocol)
717    }
718    address.push(Protocol::P2p(peer_id));
719
720    address
721}
722
723// Testable implementation of the `remove_known_peer_addresses`
724pub(super) fn remove_known_peer_addresses_internal(
725    known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
726    peer_id: PeerId,
727    addresses: Vec<Multiaddr>,
728    expired_address_duration_persistent_storage: Duration,
729    expired_address_duration_kademlia: Duration,
730) -> Vec<PeerAddressRemovedEvent> {
731    let mut address_removed_events = Vec::new();
732    let now = SystemTime::now();
733
734    addresses
735        .into_iter()
736        .map(remove_p2p_suffix)
737        .for_each(|addr| {
738            // if peer_id is present in the cache
739            if let Some(addresses) = known_peers.peek_mut(&peer_id) {
740                let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
741                // Get mutable reference to first_failed_time for the address without updating
742                // the item's position in the cache
743                if let Some(first_failed_time) = addresses.peek_mut(&addr) {
744                    // if we failed previously with this address
745                    if let Some(time) = first_failed_time {
746                        // if we failed first time more than an hour ago (for Kademlia)
747                        if *time + expired_address_duration_kademlia < now {
748                            let address_removed = PeerAddressRemovedEvent {
749                                peer_id,
750                                address: addr.clone(),
751                            };
752
753                            address_removed_events.push(address_removed);
754
755                            trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
756                        }
757
758                        // if we failed first time more than a day ago (for persistent cache)
759                        if *time + expired_address_duration_persistent_storage < now {
760                            // Remove a failed address
761                            addresses.remove(&addr);
762
763                            // If the last address for peer
764                            if last_address {
765                                known_peers.remove(&peer_id);
766
767                                trace!(%peer_id, "Peer removed from the cache");
768                            }
769
770                            trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
771                        } else {
772                            trace!(
773                                %peer_id, "Saving failed connection attempt to a peer: {:?}",
774                                addr
775                            );
776                        }
777                    } else {
778                        // Set failure time
779                        first_failed_time.replace(now);
780
781                        trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
782                    }
783                }
784            }
785        });
786
787    address_removed_events
788}