subspace_networking/behavior/
persistent_parameters.rs

1use crate::utils::{AsyncJoinOnDrop, Handler, HandlerFn};
2use async_trait::async_trait;
3use event_listener_primitives::HandlerId;
4use fs2::FileExt;
5use futures::future::{pending, Fuse};
6use futures::FutureExt;
7use libp2p::multiaddr::Protocol;
8use libp2p::{Multiaddr, PeerId};
9use memmap2::{MmapMut, MmapOptions};
10use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
11use parking_lot::Mutex;
12use schnellru::{ByLength, LruMap};
13use std::collections::HashSet;
14use std::fs::OpenOptions;
15use std::io::{Read, Seek, SeekFrom};
16use std::path::Path;
17use std::pin::Pin;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21use std::{io, mem};
22use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
23use thiserror::Error;
24use tokio::time::{sleep, Sleep};
25use tracing::{debug, error, trace, warn};
26
27/// Defines optional time for address dial failure
28type FailureTime = Option<SystemTime>;
29
30/// Size of the LRU cache for peers.
31const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
32/// Size of the LRU cache for addresses of a single peer ID.
33const ADDRESSES_CACHE_SIZE: u32 = 30;
34/// Pause duration between network parameters save.
35const DATA_FLUSH_DURATION_SECS: u64 = 5;
36/// Defines an expiration period for the peer marked for the removal.
37const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
38/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
39const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
40/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
41const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
42
43/// Defines the event triggered when the peer address is removed from the permanent storage.
44#[derive(Debug, Clone)]
45pub struct PeerAddressRemovedEvent {
46    /// Peer ID
47    pub peer_id: PeerId,
48    /// Peer address
49    pub address: Multiaddr,
50}
51
52#[derive(Debug, Encode, Decode)]
53struct EncodableKnownPeerAddress {
54    multiaddr: Vec<u8>,
55    /// Failure time as Unix timestamp in seconds
56    failure_time: Option<u64>,
57}
58
59#[derive(Debug, Encode, Decode)]
60struct EncodableKnownPeers {
61    cache_size: u32,
62    timestamp: u64,
63    // Each entry is a tuple of peer ID + list of multiaddresses with corresponding failure time
64    known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
65}
66
67impl EncodableKnownPeers {
68    fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
69        let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
70
71        // Sort peers with the oldest expiration date first
72        self.known_peers
73            .sort_by_cached_key(|(_peer_id, addresses)| {
74                addresses.iter().fold(0u64, |acc, address| {
75                    acc.max(address.failure_time.unwrap_or(u64::MAX))
76                })
77            });
78
79        // Iterate over known peers with most recent failure time (or no failire time) first
80        'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
81            let mut peer_cache =
82                LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
83
84            let peer_id = match PeerId::from_bytes(&peer_id) {
85                Ok(peer_id) => peer_id,
86                Err(error) => {
87                    debug!(%error, "Failed to decode known peer ID, skipping peer entry");
88                    continue;
89                }
90            };
91            for address in addresses {
92                let multiaddr = match Multiaddr::try_from(address.multiaddr) {
93                    Ok(multiaddr) => multiaddr,
94                    Err(error) => {
95                        debug!(
96                            %error,
97                            "Failed to decode known peer multiaddress, skipping peer entry"
98                        );
99                        continue 'peers;
100                    }
101                };
102
103                peer_cache.insert(
104                    multiaddr,
105                    address.failure_time.map(|failure_time| {
106                        SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
107                    }),
108                );
109            }
110
111            peers_cache.insert(peer_id, peer_cache);
112        }
113
114        peers_cache
115    }
116
117    fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
118        let single_peer_encoded_address_size =
119            KnownPeersManager::single_peer_encoded_address_size();
120        Self {
121            cache_size,
122            timestamp: SystemTime::now()
123                .duration_since(SystemTime::UNIX_EPOCH)
124                .expect("Never before Unix epoch; qed")
125                .as_secs(),
126            known_peers: cache
127                .iter()
128                .map(|(peer_id, addresses)| {
129                    (
130                        peer_id.to_bytes(),
131                        addresses
132                            .iter()
133                            .filter_map(|(multiaddr, failure_time)| {
134                                let multiaddr_bytes = multiaddr.to_vec();
135
136                                if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
137                                {
138                                    // Skip unexpectedly large multiaddresses
139                                    debug!(
140                                        encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
141                                        limit = %single_peer_encoded_address_size,
142                                        ?multiaddr,
143                                        "Unexpectedly large multiaddress"
144                                    );
145                                    return None;
146                                }
147
148                                Some(EncodableKnownPeerAddress {
149                                    multiaddr: multiaddr_bytes,
150                                    failure_time: failure_time.map(|failure_time| {
151                                        failure_time
152                                            .duration_since(SystemTime::UNIX_EPOCH)
153                                            .expect("Never before Unix epoch; qed")
154                                            .as_secs()
155                                    }),
156                                })
157                            })
158                            .collect(),
159                    )
160                })
161                .collect(),
162        }
163    }
164}
165
166/// A/b slots with known peers where we write serialized known peers in one after another
167struct KnownPeersSlots {
168    a: MmapMut,
169    b: MmapMut,
170}
171
172impl KnownPeersSlots {
173    fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
174        let known_peers_bytes = encodable_known_peers.encode();
175        let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
176        encoded_bytes.copy_from_slice(&known_peers_bytes);
177        // Write checksum
178        remaining_bytes[..Blake3Hash::SIZE]
179            .copy_from_slice(blake3_hash(&known_peers_bytes).as_ref());
180        if let Err(error) = self.a.flush() {
181            warn!(%error, "Failed to flush known peers to disk");
182        }
183
184        // Swap slots such that we write into the opposite each time
185        mem::swap(&mut self.a, &mut self.b);
186    }
187}
188
189/// Defines operations with the networking parameters.
190#[async_trait]
191pub trait KnownPeersRegistry: Send + Sync {
192    /// Registers a peer ID and associated addresses
193    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
194
195    /// Unregisters associated addresses for peer ID.
196    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
197
198    /// Unregisters associated addresses for peer ID.
199    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
200
201    /// Returns all known peers and their addresses without P2P suffix at the end
202    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
203
204    /// Drive async work in the persistence provider
205    async fn run(&mut self);
206
207    /// Triggers when we removed the peer address from the permanent storage. Returns optional
208    /// event HandlerId. Option enables stub implementation. One of the usages is to notify
209    /// Kademlia about the expired(unreachable) address when it check for how long address was
210    /// unreachable.
211    fn on_unreachable_address(
212        &mut self,
213        handler: HandlerFn<PeerAddressRemovedEvent>,
214    ) -> Option<HandlerId>;
215}
216
217/// Networking manager implementation with NOOP implementation.
218#[derive(Clone, Default)]
219pub(crate) struct StubNetworkingParametersManager;
220
221impl StubNetworkingParametersManager {
222    /// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference.
223    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
224        Box::new(self)
225    }
226}
227
228#[async_trait]
229impl KnownPeersRegistry for StubNetworkingParametersManager {
230    async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
231
232    async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
233
234    fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
235
236    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
237        Vec::new()
238    }
239
240    async fn run(&mut self) {
241        // Never resolves
242        futures::future::pending().await
243    }
244
245    fn on_unreachable_address(
246        &mut self,
247        _handler: HandlerFn<PeerAddressRemovedEvent>,
248    ) -> Option<HandlerId> {
249        None
250    }
251}
252
253/// Configuration for [`KnownPeersManager`].
254#[derive(Debug, Clone)]
255pub struct KnownPeersManagerConfig {
256    /// Defines whether we return known peers in [`KnownPeersRegistry::all_known_peers()`]
257    pub enable_known_peers_source: bool,
258    /// Defines cache size.
259    pub cache_size: u32,
260    /// Peer ID list to filter on address adding.
261    pub ignore_peer_list: HashSet<PeerId>,
262    /// Defines whether we enable cache persistence.
263    pub path: Option<Box<Path>>,
264    /// Defines interval before the next peer address removes entry from the cache.
265    pub failed_address_cache_removal_interval: Duration,
266    /// Defines interval before the next peer address removal triggers [`PeerAddressRemovedEvent`].
267    pub failed_address_kademlia_removal_interval: Duration,
268    /// Amount of time after which stored known peers contents is assumed to be stale.
269    pub stale_known_peers_timeout: Duration,
270}
271
272impl Default for KnownPeersManagerConfig {
273    fn default() -> Self {
274        Self {
275            enable_known_peers_source: true,
276            cache_size: KNOWN_PEERS_CACHE_SIZE,
277            ignore_peer_list: Default::default(),
278            path: None,
279            failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
280            failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
281            stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
282        }
283    }
284}
285
286/// Networking parameters persistence errors.
287#[derive(Debug, Error)]
288pub enum KnownPeersManagerPersistenceError {
289    /// I/O error.
290    #[error("I/O error: {0}")]
291    Io(#[from] io::Error),
292    /// Can't preallocate known peers file, probably not enough space on disk
293    #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
294    CantPreallocateKnownPeersFile(io::Error),
295}
296
297/// Handles networking parameters. It manages network parameters set and its persistence.
298pub struct KnownPeersManager {
299    /// Defines whether the cache requires saving to DB
300    cache_need_saving: bool,
301    /// LRU cache for the known peers and their addresses
302    known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
303    /// Period between networking parameters saves.
304    networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
305    /// Slots backed by file that store known peers
306    known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
307    /// Event handler triggered when we decide to remove address from the storage.
308    address_removed: Handler<PeerAddressRemovedEvent>,
309    /// Defines configuration.
310    config: KnownPeersManagerConfig,
311}
312
313impl Drop for KnownPeersManager {
314    fn drop(&mut self) {
315        if self.cache_need_saving {
316            if let Some(known_peers_slots) = &self.known_peers_slots {
317                known_peers_slots
318                    .lock()
319                    .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
320                        &self.known_peers,
321                        self.config.cache_size,
322                    ));
323            }
324        }
325    }
326}
327
328impl KnownPeersManager {
329    fn init_file(
330        path: &Path,
331        cache_size: u32,
332    ) -> Result<
333        (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
334        KnownPeersManagerPersistenceError,
335    > {
336        let mut file = OpenOptions::new()
337            .read(true)
338            .write(true)
339            .create(true)
340            .truncate(false)
341            .open(path)?;
342
343        let known_addresses_size = Self::known_addresses_size(cache_size);
344        let file_size = Self::file_size(cache_size);
345        // Try reading existing encoded known peers from file
346        let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
347
348        {
349            let mut file_contents = Vec::with_capacity(file_size);
350            file.read_to_end(&mut file_contents)?;
351            if !file_contents.is_empty() {
352                for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
353                    let known_addresses =
354                        match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
355                            Ok(known_addresses) => known_addresses,
356                            Err(error) => {
357                                debug!(%error, "Failed to decode encodable known peers");
358                                continue;
359                            }
360                        };
361
362                    let (encoded_bytes, remaining_bytes) =
363                        known_addresses_bytes.split_at(known_addresses.encoded_size());
364                    if remaining_bytes.len() < Blake3Hash::SIZE {
365                        debug!(
366                            remaining_bytes = %remaining_bytes.len(),
367                            "Not enough bytes to decode checksum, file was likely corrupted"
368                        );
369                        continue;
370                    }
371
372                    // Verify checksum
373                    let actual_checksum = blake3_hash(encoded_bytes);
374                    let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
375                    if *actual_checksum != *expected_checksum {
376                        debug!(
377                            encoded_bytes_len = %encoded_bytes.len(),
378                            actual_checksum = %hex::encode(actual_checksum),
379                            expected_checksum = %hex::encode(expected_checksum),
380                            "Hash doesn't match, possible disk corruption or file was just \
381                            created, ignoring"
382                        );
383                        continue;
384                    }
385
386                    match &mut maybe_newest_known_addresses {
387                        Some(newest_known_addresses) => {
388                            if newest_known_addresses.timestamp < known_addresses.timestamp {
389                                *newest_known_addresses = known_addresses;
390                            }
391                        }
392                        None => {
393                            maybe_newest_known_addresses.replace(known_addresses);
394                        }
395                    }
396                }
397            }
398        }
399
400        // *2 because we have a/b parts of the file
401        let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
402            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
403            // writes to fail later)
404            file.allocate(file_size as u64)
405                .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
406            // Truncating file (if necessary)
407            file.set_len(file_size as u64)?;
408            true
409        } else {
410            false
411        };
412
413        let mut a_mmap = unsafe {
414            MmapOptions::new()
415                .len(known_addresses_size)
416                .map_mut(&file)?
417        };
418        let mut b_mmap = unsafe {
419            MmapOptions::new()
420                .offset(known_addresses_size as u64)
421                .len(known_addresses_size)
422                .map_mut(&file)?
423        };
424
425        if file_resized {
426            // File might have been resized, write current known addresses into it
427            if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
428                let bytes = newest_known_addresses.encode();
429                a_mmap[..bytes.len()].copy_from_slice(&bytes);
430                a_mmap.flush()?;
431                b_mmap[..bytes.len()].copy_from_slice(&bytes);
432                b_mmap.flush()?;
433            }
434        }
435
436        let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
437            a: a_mmap,
438            b: b_mmap,
439        }));
440
441        Ok((maybe_newest_known_addresses, known_peers_slots))
442    }
443
444    /// Object constructor.
445    pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
446        let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
447            Self::init_file(path, config.cache_size)
448                .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
449        } else {
450            (None, None)
451        };
452
453        let known_peers = maybe_newest_known_addresses
454            .filter(|newest_known_addresses| {
455                let time_since_unix_epoch = SystemTime::now()
456                    .duration_since(SystemTime::UNIX_EPOCH)
457                    .expect("Never before Unix epoch; qed");
458                let known_peers_age = time_since_unix_epoch
459                    .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
460
461                known_peers_age <= config.stale_known_peers_timeout
462            })
463            .map(EncodableKnownPeers::into_cache)
464            .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
465
466        Ok(Self {
467            cache_need_saving: false,
468            known_peers,
469            networking_parameters_save_delay: Self::default_delay(),
470            known_peers_slots,
471            address_removed: Default::default(),
472            config,
473        })
474    }
475
476    /// Size of the backing file on disk
477    pub fn file_size(cache_size: u32) -> usize {
478        // *2 because we have a/b parts of the file
479        Self::known_addresses_size(cache_size) * 2
480    }
481
482    /// Creates a reference to the `NetworkingParametersRegistry` trait implementation.
483    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
484        Box::new(self)
485    }
486
487    // Create default delay for networking parameters.
488    fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
489        Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
490    }
491
492    fn single_peer_encoded_address_size() -> usize {
493        let multiaddr = Multiaddr::from_str(
494            "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
495        )
496        .expect("Valid multiaddr; qed");
497        // Use multiaddr size that is 3x larger than typical, should be enough for most practical
498        // cases
499        multiaddr.to_vec().encoded_size() * 3
500    }
501
502    /// Size of single peer known addresses, this is an estimate and in some pathological cases peer
503    /// will have to be rejected if encoding exceeds this length.
504    fn single_peer_encoded_size() -> usize {
505        // Peer ID encoding + compact encoding of the length of list of addresses + (length of a
506        // single peer address entry + optional failure time) * number of entries
507        PeerId::random().to_bytes().encoded_size()
508            + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
509            + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
510                * ADDRESSES_CACHE_SIZE as usize
511    }
512
513    /// Size of known addresses and accompanying metadata.
514    ///
515    /// NOTE: This is max size that needs to be allocated on disk for successful write of a single
516    /// `known_addresses` copy, the actual written data can occupy only a part of this size
517    fn known_addresses_size(cache_size: u32) -> usize {
518        // Timestamp (when was written) + compact encoding of the length of peer records + peer
519        // records + checksum
520        mem::size_of::<u64>()
521            + Compact::compact_len(&(cache_size))
522            + Self::single_peer_encoded_size() * cache_size as usize
523            + Blake3Hash::SIZE
524    }
525
526    fn persistent_enabled(&self) -> bool {
527        self.config.path.is_some()
528    }
529
530    #[cfg(test)]
531    pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
532        self.known_peers
533            .peek(peer_id)
534            .map(|addresses| addresses.peek(address).is_some())
535            .unwrap_or_default()
536    }
537}
538
539#[async_trait]
540impl KnownPeersRegistry for KnownPeersManager {
541    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
542        if self.config.ignore_peer_list.contains(&peer_id) {
543            debug!(
544                %peer_id,
545                addr_num=addresses.len(),
546                "Adding new peer addresses canceled (ignore list): {:?}",
547                addresses
548            );
549
550            return;
551        }
552
553        debug!(
554            %peer_id,
555            addr_num=addresses.len(),
556            "Add new peer addresses to the networking parameters registry: {:?}",
557            addresses
558        );
559
560        addresses
561            .iter()
562            .filter(|addr| {
563                // filter Memory addresses
564                !addr
565                    .into_iter()
566                    .any(|protocol| matches!(protocol, Protocol::Memory(..)))
567            })
568            .cloned()
569            .map(remove_p2p_suffix)
570            .for_each(|addr| {
571                // Add new address cache if it doesn't exist previously.
572                self.known_peers
573                    .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
574
575                if let Some(addresses) = self.known_peers.get(&peer_id) {
576                    let previous_entry = addresses.peek(&addr).cloned().flatten();
577                    addresses.insert(addr, None);
578                    if let Some(previous_entry) = previous_entry {
579                        trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
580                    }
581                }
582            });
583
584        self.cache_need_saving = true;
585    }
586
587    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
588        trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
589
590        let removed_addresses = remove_known_peer_addresses_internal(
591            &mut self.known_peers,
592            peer_id,
593            addresses,
594            self.config.failed_address_cache_removal_interval,
595            self.config.failed_address_kademlia_removal_interval,
596        );
597
598        for event in removed_addresses {
599            self.address_removed.call_simple(&event);
600        }
601
602        self.cache_need_saving = true;
603    }
604
605    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
606        trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
607
608        self.known_peers.remove(&peer_id);
609
610        self.cache_need_saving = true;
611    }
612
613    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
614        if !self.config.enable_known_peers_source {
615            return Vec::new();
616        }
617
618        self.known_peers
619            .iter()
620            .map(|(&peer_id, addresses)| {
621                (
622                    peer_id,
623                    addresses
624                        .iter()
625                        .map(|(addr, _failure_time)| addr.clone())
626                        .collect(),
627                )
628            })
629            .collect()
630    }
631
632    async fn run(&mut self) {
633        if !self.persistent_enabled() {
634            pending().await
635        }
636
637        loop {
638            (&mut self.networking_parameters_save_delay).await;
639
640            if let Some(known_peers_slots) = &self.known_peers_slots {
641                if self.cache_need_saving {
642                    let known_peers =
643                        EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
644                    let known_peers_slots = Arc::clone(known_peers_slots);
645                    let write_known_peers_fut =
646                        AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || {
647                            known_peers_slots
648                                .lock()
649                                .write_to_inactive_slot(&known_peers);
650                        }));
651
652                    if let Err(error) = write_known_peers_fut.await {
653                        error!(%error, "Failed to write known peers");
654                    }
655
656                    self.cache_need_saving = false;
657                }
658            }
659            self.networking_parameters_save_delay = KnownPeersManager::default_delay();
660        }
661    }
662
663    fn on_unreachable_address(
664        &mut self,
665        handler: HandlerFn<PeerAddressRemovedEvent>,
666    ) -> Option<HandlerId> {
667        let handler_id = self.address_removed.add(handler);
668
669        Some(handler_id)
670    }
671}
672
673/// Removes a P2p protocol suffix from the multiaddress if any.
674pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
675    let last_protocol = address.pop();
676
677    if let Some(Protocol::P2p(_)) = &last_protocol {
678        return address;
679    }
680
681    if let Some(protocol) = last_protocol {
682        address.push(protocol)
683    }
684
685    address
686}
687
688/// Appends a P2p protocol suffix to the multiaddress if required.
689pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
690    let last_protocol = address.pop();
691
692    if let Some(protocol) = last_protocol {
693        if !matches!(protocol, Protocol::P2p(..)) {
694            address.push(protocol)
695        }
696    }
697    address.push(Protocol::P2p(peer_id));
698
699    address
700}
701
702// Testable implementation of the `remove_known_peer_addresses`
703pub(super) fn remove_known_peer_addresses_internal(
704    known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
705    peer_id: PeerId,
706    addresses: Vec<Multiaddr>,
707    expired_address_duration_persistent_storage: Duration,
708    expired_address_duration_kademlia: Duration,
709) -> Vec<PeerAddressRemovedEvent> {
710    let mut address_removed_events = Vec::new();
711    let now = SystemTime::now();
712
713    addresses
714        .into_iter()
715        .map(remove_p2p_suffix)
716        .for_each(|addr| {
717            // if peer_id is present in the cache
718            if let Some(addresses) = known_peers.peek_mut(&peer_id) {
719                let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
720                // Get mutable reference to first_failed_time for the address without updating
721                // the item's position in the cache
722                if let Some(first_failed_time) = addresses.peek_mut(&addr) {
723                    // if we failed previously with this address
724                    if let Some(time) = first_failed_time {
725                        // if we failed first time more than an hour ago (for Kademlia)
726                        if *time + expired_address_duration_kademlia < now {
727                            let address_removed = PeerAddressRemovedEvent {
728                                peer_id,
729                                address: addr.clone(),
730                            };
731
732                            address_removed_events.push(address_removed);
733
734                            trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
735                        }
736
737                        // if we failed first time more than a day ago (for persistent cache)
738                        if *time + expired_address_duration_persistent_storage < now {
739                            // Remove a failed address
740                            addresses.remove(&addr);
741
742                            // If the last address for peer
743                            if last_address {
744                                known_peers.remove(&peer_id);
745
746                                trace!(%peer_id, "Peer removed from the cache");
747                            }
748
749                            trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
750                        } else {
751                            trace!(
752                                %peer_id, "Saving failed connection attempt to a peer: {:?}",
753                                addr
754                            );
755                        }
756                    } else {
757                        // Set failure time
758                        first_failed_time.replace(now);
759
760                        trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
761                    }
762                }
763            }
764        });
765
766    address_removed_events
767}