subspace_networking/
node_runner.rs

1//! DSN node behaviour implementation.
2
3use crate::behavior::persistent_parameters::{
4    KnownPeersRegistry, PeerAddressRemovedEvent, append_p2p_suffix, remove_p2p_suffix,
5};
6use crate::behavior::{Behavior, Event};
7use crate::constructor::DummyRecordStore;
8use crate::constructor::temporary_bans::TemporaryBans;
9use crate::protocols::request_response::request_response_factory::{
10    Event as RequestResponseEvent, IfDisconnected,
11};
12use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
13use crate::utils::{SubspaceMetrics, is_global_address_or_dns, strip_peer_id};
14use async_lock::Mutex as AsyncMutex;
15use bytes::Bytes;
16use event_listener_primitives::HandlerId;
17use futures::channel::mpsc;
18use futures::future::Fuse;
19use futures::{FutureExt, StreamExt};
20use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
21use libp2p::core::ConnectedPoint;
22use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
23use libp2p::identify::Event as IdentifyEvent;
24use libp2p::kad::{
25    Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
26    GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
27    InboundRequest, KBucketKey, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult,
28    Quorum, Record, RecordKey,
29};
30use libp2p::metrics::{Metrics, Recorder};
31use libp2p::multiaddr::Protocol;
32use libp2p::swarm::dial_opts::DialOpts;
33use libp2p::swarm::{DialError, SwarmEvent};
34use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
35use nohash_hasher::IntMap;
36use parking_lot::Mutex;
37use std::collections::hash_map::Entry;
38use std::collections::{HashMap, HashSet};
39use std::net::IpAddr;
40use std::pin::Pin;
41use std::sync::atomic::Ordering;
42use std::sync::{Arc, Weak};
43use std::time::{Duration, Instant};
44use std::{fmt, slice};
45use tokio::sync::OwnedSemaphorePermit;
46use tokio::task::yield_now;
47use tokio::time::Sleep;
48use tracing::{debug, error, info, trace, warn};
49
50/// The maximum time between random Kademlia peer DHT queries.
51const MAX_RANDOM_QUERY_INTERVAL: Duration = Duration::from_secs(60);
52
53/// The time between external address refreshes and peer stats debug logging.
54const PERIODICAL_TASKS_INTERVAL: Duration = Duration::from_secs(5);
55
56/// The time between each peer stats info log.
57const PEER_INFO_LOG_INTERVAL: Duration = Duration::from_secs(300);
58
59/// The maximum number of OS-reported listener addresses we will use.
60const MAX_LISTEN_ADDRESSES: usize = 30;
61
62enum QueryResultSender {
63    Value {
64        sender: mpsc::UnboundedSender<PeerRecord>,
65        // Just holding onto permit while data structure is not dropped
66        _permit: OwnedSemaphorePermit,
67    },
68    ClosestPeers {
69        sender: mpsc::UnboundedSender<PeerId>,
70        // Just holding onto permit while data structure is not dropped
71        _permit: Option<OwnedSemaphorePermit>,
72    },
73    Providers {
74        key: RecordKey,
75        sender: mpsc::UnboundedSender<PeerId>,
76        // Just holding onto permit while data structure is not dropped
77        _permit: Option<OwnedSemaphorePermit>,
78    },
79    PutValue {
80        sender: mpsc::UnboundedSender<()>,
81        // Just holding onto permit while data structure is not dropped
82        _permit: OwnedSemaphorePermit,
83    },
84    Bootstrap {
85        sender: mpsc::UnboundedSender<()>,
86    },
87}
88
89#[derive(Debug, Default)]
90enum BootstrapCommandState {
91    #[default]
92    NotStarted,
93    InProgress(mpsc::UnboundedReceiver<()>),
94    Finished,
95}
96
97/// Runner for the Node.
98#[must_use = "Node does not function properly unless its runner is driven forward"]
99pub struct NodeRunner {
100    /// Should non-global addresses be added to the DHT?
101    allow_non_global_addresses_in_dht: bool,
102    /// Whether node is listening on some addresses
103    is_listening: bool,
104    command_receiver: mpsc::Receiver<Command>,
105    swarm: Swarm<Behavior>,
106    shared_weak: Weak<Shared>,
107    /// How frequently should random queries be done using Kademlia DHT to populate routing table.
108    next_random_query_interval: Duration,
109    query_id_receivers: HashMap<QueryId, QueryResultSender>,
110    /// Global subscription counter, is assigned to every (logical) subscription and is used for
111    /// unsubscribing.
112    next_subscription_id: usize,
113    /// Topic subscription senders for logical subscriptions (multiple logical subscriptions can be
114    /// present for the same physical subscription).
115    topic_subscription_senders: HashMap<TopicHash, IntMap<usize, mpsc::UnboundedSender<Bytes>>>,
116    random_query_timeout: Pin<Box<Fuse<Sleep>>>,
117    /// Defines an interval between periodical tasks.
118    periodical_tasks_interval: Pin<Box<Fuse<Sleep>>>,
119    /// The last time we logged info-level peer stats.
120    last_peer_stats_info_log: Instant,
121    /// Manages the networking parameters like known peers and addresses
122    known_peers_registry: Box<dyn KnownPeersRegistry>,
123    connected_servers: HashSet<PeerId>,
124    /// Defines set of peers with a permanent connection (and reconnection if necessary).
125    reserved_peers: HashMap<PeerId, Multiaddr>,
126    /// Temporarily banned peers.
127    temporary_bans: Arc<Mutex<TemporaryBans>>,
128    /// Libp2p Prometheus metrics.
129    libp2p_metrics: Option<Metrics>,
130    /// Subspace Prometheus metrics.
131    metrics: Option<SubspaceMetrics>,
132    /// Mapping from specific peer to ip addresses
133    peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
134    /// Defines protocol version for the network peers. Affects network partition.
135    protocol_version: String,
136    /// Addresses to bootstrap Kademlia network
137    bootstrap_addresses: Vec<Multiaddr>,
138    /// Ensures a single bootstrap on run() invocation.
139    bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
140    /// Receives an event on peer address removal from the persistent storage.
141    removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
142    /// Optional storage for the [`HandlerId`] of the address removal task.
143    /// We keep to stop the task along with the rest of the networking.
144    _address_removal_task_handler_id: Option<HandlerId>,
145}
146
147impl fmt::Debug for NodeRunner {
148    #[inline]
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("NodeRunner").finish_non_exhaustive()
151    }
152}
153
154// Helper struct for NodeRunner configuration (clippy requirement).
155pub(crate) struct NodeRunnerConfig {
156    pub(crate) allow_non_global_addresses_in_dht: bool,
157    /// Whether node is listening on some addresses
158    pub(crate) is_listening: bool,
159    pub(crate) command_receiver: mpsc::Receiver<Command>,
160    pub(crate) swarm: Swarm<Behavior>,
161    pub(crate) shared_weak: Weak<Shared>,
162    pub(crate) next_random_query_interval: Duration,
163    pub(crate) known_peers_registry: Box<dyn KnownPeersRegistry>,
164    pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
165    pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
166    pub(crate) libp2p_metrics: Option<Metrics>,
167    pub(crate) metrics: Option<SubspaceMetrics>,
168    pub(crate) protocol_version: String,
169    pub(crate) bootstrap_addresses: Vec<Multiaddr>,
170}
171
172impl NodeRunner {
173    pub(crate) fn new(
174        NodeRunnerConfig {
175            allow_non_global_addresses_in_dht,
176            is_listening,
177            command_receiver,
178            swarm,
179            shared_weak,
180            next_random_query_interval,
181            mut known_peers_registry,
182            reserved_peers,
183            temporary_bans,
184            libp2p_metrics,
185            metrics,
186            protocol_version,
187            bootstrap_addresses,
188        }: NodeRunnerConfig,
189    ) -> Self {
190        // Setup the address removal events exchange between persistent params storage and Kademlia.
191        let (removed_addresses_tx, removed_addresses_rx) = mpsc::unbounded();
192        let mut address_removal_task_handler_id = None;
193        if let Some(handler_id) = known_peers_registry.on_unreachable_address({
194            Arc::new(move |event| {
195                if let Err(error) = removed_addresses_tx.unbounded_send(event.clone()) {
196                    debug!(?error, ?event, "Cannot send PeerAddressRemovedEvent")
197                };
198            })
199        }) {
200            address_removal_task_handler_id.replace(handler_id);
201        }
202
203        Self {
204            allow_non_global_addresses_in_dht,
205            is_listening,
206            command_receiver,
207            swarm,
208            shared_weak,
209            next_random_query_interval,
210            query_id_receivers: HashMap::default(),
211            next_subscription_id: 0,
212            topic_subscription_senders: HashMap::default(),
213            // We'll make the first query right away and continue at the interval.
214            random_query_timeout: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
215            // We'll make the first dial right away and continue at the interval.
216            periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
217            // There's not much point logging immediately, we can wait until we've bootstrapped.
218            last_peer_stats_info_log: Instant::now(),
219            known_peers_registry,
220            connected_servers: HashSet::new(),
221            reserved_peers,
222            temporary_bans,
223            libp2p_metrics,
224            metrics,
225            peer_ip_addresses: HashMap::new(),
226            protocol_version,
227            bootstrap_addresses,
228            bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
229            removed_addresses_rx,
230            _address_removal_task_handler_id: address_removal_task_handler_id,
231        }
232    }
233
234    /// Drives the main networking future forward.
235    pub async fn run(&mut self) {
236        if self.is_listening {
237            // Wait for listen addresses, otherwise we will get ephemeral addresses in external address candidates that
238            // we do not want
239            loop {
240                if self.swarm.listeners().next().is_some() {
241                    break;
242                }
243
244                if let Some(swarm_event) = self.swarm.next().await {
245                    self.register_event_metrics(&swarm_event);
246                    self.handle_swarm_event(swarm_event).await;
247                } else {
248                    break;
249                }
250            }
251        }
252
253        self.bootstrap().await;
254
255        loop {
256            futures::select! {
257                _ = &mut self.random_query_timeout => {
258                    self.handle_random_query_interval();
259                    // Increase interval 2x, but limit it to MAX_RANDOM_QUERY_INTERVAL.
260                    self.random_query_timeout =
261                        Box::pin(tokio::time::sleep(self.next_random_query_interval).fuse());
262                    self.next_random_query_interval =
263                        (self.next_random_query_interval * 2).min(MAX_RANDOM_QUERY_INTERVAL);
264                },
265                swarm_event = self.swarm.next() => {
266                    if let Some(swarm_event) = swarm_event {
267                        self.register_event_metrics(&swarm_event);
268                        self.handle_swarm_event(swarm_event).await;
269                    } else {
270                        break;
271                    }
272                },
273                command = self.command_receiver.next() => {
274                    if let Some(command) = command {
275                        self.handle_command(command);
276                    } else {
277                        break;
278                    }
279                },
280                _ = self.known_peers_registry.run().fuse() => {
281                    trace!("Network parameters registry runner exited.")
282                },
283                _ = &mut self.periodical_tasks_interval => {
284                    self.handle_periodical_tasks().await;
285                    self.periodical_tasks_interval =
286                        Box::pin(tokio::time::sleep(PERIODICAL_TASKS_INTERVAL).fuse());
287                },
288                event = self.removed_addresses_rx.select_next_some() => {
289                    self.handle_removed_address_event(event);
290                },
291            }
292
293            // Allow to exit from busy loop during graceful shutdown
294            yield_now().await;
295        }
296    }
297
298    /// Bootstraps Kademlia network
299    async fn bootstrap(&mut self) {
300        // Add bootstrap nodes first to make sure there is space for them in k-buckets
301        for (peer_id, address) in strip_peer_id(self.bootstrap_addresses.clone()) {
302            self.swarm
303                .behaviour_mut()
304                .kademlia
305                .add_address(&peer_id, address);
306        }
307
308        let known_peers = self.known_peers_registry.all_known_peers().await;
309
310        if !known_peers.is_empty() {
311            for (peer_id, addresses) in known_peers {
312                for address in addresses.clone() {
313                    let address = match address.with_p2p(peer_id) {
314                        Ok(address) => address,
315                        Err(address) => {
316                            warn!(%peer_id, %address, "Failed to add peer ID to known peer address");
317                            break;
318                        }
319                    };
320                    self.swarm
321                        .behaviour_mut()
322                        .kademlia
323                        .add_address(&peer_id, address);
324                }
325
326                if let Err(error) = self
327                    .swarm
328                    .dial(DialOpts::peer_id(peer_id).addresses(addresses).build())
329                {
330                    warn!(%peer_id, %error, "Failed to dial peer during bootstrapping");
331                }
332            }
333
334            // Do bootstrap asynchronously
335            self.handle_command(Command::Bootstrap {
336                result_sender: None,
337            });
338            return;
339        }
340
341        let bootstrap_command_state = self.bootstrap_command_state.clone();
342        let mut bootstrap_command_state = bootstrap_command_state.lock().await;
343        let bootstrap_command_receiver = match &mut *bootstrap_command_state {
344            BootstrapCommandState::NotStarted => {
345                debug!("Bootstrap started.");
346
347                let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
348
349                self.handle_command(Command::Bootstrap {
350                    result_sender: Some(bootstrap_command_sender),
351                });
352
353                *bootstrap_command_state =
354                    BootstrapCommandState::InProgress(bootstrap_command_receiver);
355                match &mut *bootstrap_command_state {
356                    BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
357                        bootstrap_command_receiver
358                    }
359                    _ => {
360                        unreachable!("Was just set to that exact value");
361                    }
362                }
363            }
364            BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
365                bootstrap_command_receiver
366            }
367            BootstrapCommandState::Finished => {
368                return;
369            }
370        };
371
372        let mut bootstrap_step = 0;
373        loop {
374            futures::select! {
375                swarm_event = self.swarm.next() => {
376                    if let Some(swarm_event) = swarm_event {
377                        self.register_event_metrics(&swarm_event);
378                        self.handle_swarm_event(swarm_event).await;
379                    } else {
380                        break;
381                    }
382                },
383                result = bootstrap_command_receiver.next() => {
384                    if result.is_some() {
385                        debug!(%bootstrap_step, "Kademlia bootstrapping...");
386                        bootstrap_step += 1;
387                    } else {
388                        break;
389                    }
390                }
391            }
392        }
393
394        debug!("Bootstrap finished.");
395        *bootstrap_command_state = BootstrapCommandState::Finished;
396    }
397
398    /// Handles periodical tasks.
399    async fn handle_periodical_tasks(&mut self) {
400        // Log current connections.
401        let network_info = self.swarm.network_info();
402        let connections = network_info.connection_counters();
403
404        debug!(?connections, "Current DSN connections and limits.");
405
406        // Renew known external addresses.
407        let mut external_addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
408
409        if let Some(shared) = self.shared_weak.upgrade() {
410            debug!(?external_addresses, "Renew DSN external addresses.");
411            let mut addresses = shared.external_addresses.lock();
412            addresses.clear();
413            addresses.append(&mut external_addresses);
414        }
415
416        self.log_kademlia_stats();
417    }
418
419    fn handle_random_query_interval(&mut self) {
420        let random_peer_id = PeerId::random();
421
422        trace!("Starting random Kademlia query for {}", random_peer_id);
423
424        self.swarm
425            .behaviour_mut()
426            .kademlia
427            .get_closest_peers(random_peer_id);
428    }
429
430    fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) {
431        trace!(?event, "Peer address removed event.");
432
433        let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone())
434            .into_iter()
435            .map(|(peer_id, _)| peer_id)
436            .collect::<Vec<_>>();
437
438        if bootstrap_node_ids.contains(&event.peer_id) {
439            debug!(
440                ?event,
441                ?bootstrap_node_ids,
442                "Skipped removing bootstrap node from Kademlia buckets."
443            );
444
445            return;
446        }
447
448        // Remove both versions of the address
449        self.swarm.behaviour_mut().kademlia.remove_address(
450            &event.peer_id,
451            &append_p2p_suffix(event.peer_id, event.address.clone()),
452        );
453
454        self.swarm
455            .behaviour_mut()
456            .kademlia
457            .remove_address(&event.peer_id, &remove_p2p_suffix(event.address));
458    }
459
460    fn handle_remove_listeners(&mut self, removed_listeners: &[Multiaddr]) {
461        let shared = match self.shared_weak.upgrade() {
462            Some(shared) => shared,
463            None => {
464                return;
465            }
466        };
467
468        // Remove both versions of the address
469        let peer_id = shared.id;
470        shared.listeners.lock().retain(|old_listener| {
471            !removed_listeners.contains(&append_p2p_suffix(peer_id, old_listener.clone()))
472                && !removed_listeners.contains(&remove_p2p_suffix(old_listener.clone()))
473        });
474    }
475
476    async fn handle_swarm_event(&mut self, swarm_event: SwarmEvent<Event>) {
477        match swarm_event {
478            SwarmEvent::Behaviour(Event::Identify(event)) => {
479                self.handle_identify_event(event).await;
480            }
481            SwarmEvent::Behaviour(Event::Kademlia(event)) => {
482                self.handle_kademlia_event(event).await;
483            }
484            SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
485                self.handle_gossipsub_event(event).await;
486            }
487            SwarmEvent::Behaviour(Event::RequestResponse(event)) => {
488                self.handle_request_response_event(event).await;
489            }
490            SwarmEvent::Behaviour(Event::Autonat(event)) => {
491                self.handle_autonat_event(event).await;
492            }
493            ref event @ SwarmEvent::NewListenAddr { ref address, .. } => {
494                trace!(?event, "New local listener  event.");
495
496                let shared = match self.shared_weak.upgrade() {
497                    Some(shared) => shared,
498                    None => {
499                        return;
500                    }
501                };
502                shared.listeners.lock().push(address.clone());
503                shared.handlers.new_listener.call_simple(address);
504            }
505            ref event @ SwarmEvent::ListenerClosed { ref addresses, .. } => {
506                trace!(?event, "Local listener closed event.");
507                self.handle_remove_listeners(addresses);
508            }
509            ref event @ SwarmEvent::ExpiredListenAddr { ref address, .. } => {
510                trace!(?event, "Local listener expired event.");
511                self.handle_remove_listeners(slice::from_ref(address));
512            }
513            SwarmEvent::ConnectionEstablished {
514                peer_id,
515                endpoint,
516                num_established,
517                ..
518            } => {
519                // Save known addresses that were successfully dialed.
520                if let ConnectedPoint::Dialer { address, .. } = &endpoint {
521                    // filter non-global addresses when non-globals addresses are disabled
522                    if self.allow_non_global_addresses_in_dht || is_global_address_or_dns(address) {
523                        self.known_peers_registry
524                            .add_known_peer(peer_id, vec![address.clone()])
525                            .await;
526                    }
527                };
528
529                let shared = match self.shared_weak.upgrade() {
530                    Some(shared) => shared,
531                    None => {
532                        return;
533                    }
534                };
535
536                let is_reserved_peer = self.reserved_peers.contains_key(&peer_id);
537                debug!(
538                    %peer_id,
539                    %is_reserved_peer,
540                    ?endpoint,
541                    %num_established,
542                    "Connection established"
543                );
544
545                let maybe_remote_ip =
546                    endpoint
547                        .get_remote_address()
548                        .iter()
549                        .find_map(|protocol| match protocol {
550                            Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
551                            Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
552                            _ => None,
553                        });
554                if let Some(ip) = maybe_remote_ip {
555                    self.peer_ip_addresses
556                        .entry(peer_id)
557                        .and_modify(|ips| {
558                            ips.insert(ip);
559                        })
560                        .or_insert(HashSet::from([ip]));
561                }
562
563                let num_established_peer_connections = shared
564                    .num_established_peer_connections
565                    .fetch_add(1, Ordering::SeqCst)
566                    + 1;
567
568                shared
569                    .handlers
570                    .num_established_peer_connections_change
571                    .call_simple(&num_established_peer_connections);
572
573                // A new connection
574                if num_established.get() == 1 {
575                    shared.handlers.connected_peer.call_simple(&peer_id);
576                }
577
578                if let Some(metrics) = self.metrics.as_ref() {
579                    metrics.inc_established_connections()
580                }
581            }
582            SwarmEvent::ConnectionClosed {
583                peer_id,
584                num_established,
585                cause,
586                ..
587            } => {
588                let shared = match self.shared_weak.upgrade() {
589                    Some(shared) => shared,
590                    None => {
591                        return;
592                    }
593                };
594
595                debug!(
596                    %peer_id,
597                    ?cause,
598                    %num_established,
599                    "Connection closed with peer"
600                );
601
602                if num_established == 0 {
603                    self.peer_ip_addresses.remove(&peer_id);
604                    self.connected_servers.remove(&peer_id);
605                }
606                let num_established_peer_connections = shared
607                    .num_established_peer_connections
608                    .fetch_sub(1, Ordering::SeqCst)
609                    - 1;
610
611                shared
612                    .handlers
613                    .num_established_peer_connections_change
614                    .call_simple(&num_established_peer_connections);
615
616                // No more connections
617                if num_established == 0 {
618                    shared.handlers.disconnected_peer.call_simple(&peer_id);
619                }
620
621                if let Some(metrics) = self.metrics.as_ref() {
622                    metrics.dec_established_connections()
623                };
624            }
625            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
626                if let Some(peer_id) = &peer_id {
627                    let should_ban_temporarily =
628                        self.should_temporary_ban_on_dial_error(peer_id, &error);
629
630                    trace!(%should_ban_temporarily, "Temporary bans conditions.");
631
632                    if should_ban_temporarily {
633                        self.temporary_bans.lock().create_or_extend(peer_id);
634                        debug!(%peer_id, ?error, "Peer was temporarily banned.");
635                    }
636                }
637
638                debug!(
639                    ?peer_id,
640                    ?error,
641                    "SwarmEvent::OutgoingConnectionError for peer."
642                );
643
644                match error {
645                    DialError::Transport(ref addresses) => {
646                        for (addr, _) in addresses {
647                            trace!(?error, ?peer_id, %addr, "SwarmEvent::OutgoingConnectionError (DialError::Transport) for peer.");
648                            if let Some(peer_id) = peer_id {
649                                self.known_peers_registry
650                                    .remove_known_peer_addresses(peer_id, vec![addr.clone()])
651                                    .await;
652                            }
653                        }
654                    }
655                    DialError::WrongPeerId { obtained, .. } => {
656                        trace!(?error, ?peer_id, obtained_peer_id=?obtained, "SwarmEvent::WrongPeerId (DialError::WrongPeerId) for peer.");
657
658                        if let Some(ref peer_id) = peer_id {
659                            let kademlia = &mut self.swarm.behaviour_mut().kademlia;
660                            let _ = kademlia.remove_peer(peer_id);
661                        }
662                    }
663                    _ => {
664                        trace!(?error, ?peer_id, "SwarmEvent::OutgoingConnectionError");
665                    }
666                }
667            }
668            SwarmEvent::NewExternalAddrCandidate { address } => {
669                trace!(%address, "External address candidate");
670            }
671            SwarmEvent::ExternalAddrConfirmed { address } => {
672                debug!(%address, "Confirmed external address");
673
674                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
675                self.swarm.behaviour_mut().identify.push(connected_peers);
676            }
677            SwarmEvent::ExternalAddrExpired { address } => {
678                debug!(%address, "External address expired");
679
680                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
681                self.swarm.behaviour_mut().identify.push(connected_peers);
682            }
683            other => {
684                trace!("Other swarm event: {:?}", other);
685            }
686        }
687    }
688
689    fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
690        // TODO: Replace with banning of addresses rather peer IDs if this helps
691        if true {
692            return false;
693        }
694
695        // Ban temporarily only peers without active connections.
696        if self.swarm.is_connected(peer_id) {
697            return false;
698        }
699
700        match &error {
701            DialError::Transport(addresses) => {
702                for (_, error) in addresses {
703                    match error {
704                        TransportError::MultiaddrNotSupported(_) => {
705                            return true;
706                        }
707                        TransportError::Other(_) => {
708                            // Ignore "temporary ban" errors
709                            if self.temporary_bans.lock().is_banned(peer_id) {
710                                return false;
711                            }
712                        }
713                    }
714                }
715                // Other errors that are not related to temporary bans
716                true
717            }
718            DialError::LocalPeerId { .. } => {
719                // We don't ban ourselves
720                debug!("Local peer dial attempt detected.");
721
722                false
723            }
724            DialError::NoAddresses => {
725                // Let's wait until we get addresses
726                true
727            }
728            DialError::DialPeerConditionFalse(_) => {
729                // These are local conditions, we don't need to ban remote peers
730                false
731            }
732            DialError::Aborted => {
733                // Seems like a transient event
734                false
735            }
736            DialError::WrongPeerId { .. } => {
737                // It's likely that peer was restarted with different identity
738                false
739            }
740            DialError::Denied { .. } => {
741                // We exceeded the connection limits or we hit a black listed peer
742                false
743            }
744        }
745    }
746
747    async fn handle_identify_event(&mut self, event: IdentifyEvent) {
748        let local_peer_id = *self.swarm.local_peer_id();
749
750        if let IdentifyEvent::Received {
751            peer_id, mut info, ..
752        } = event
753        {
754            debug!(?peer_id, protocols = ?info.protocols, "IdentifyEvent::Received");
755
756            // Check for network partition
757            if info.protocol_version != self.protocol_version {
758                debug!(
759                    %local_peer_id,
760                    %peer_id,
761                    local_protocol_version = %self.protocol_version,
762                    peer_protocol_version = %info.protocol_version,
763                    "Peer has different protocol version, banning temporarily",
764                );
765
766                self.temporary_bans.lock().create_or_extend(&peer_id);
767                // Forget about this peer until they upgrade
768                let _ = self.swarm.disconnect_peer_id(peer_id);
769                self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
770                self.known_peers_registry
771                    .remove_all_known_peer_addresses(peer_id);
772
773                return;
774            }
775
776            // Remove temporary ban if there was any
777            self.temporary_bans.lock().remove(&peer_id);
778
779            if info.listen_addrs.len() > MAX_LISTEN_ADDRESSES {
780                debug!(
781                    %local_peer_id,
782                    %peer_id,
783                    "Node has reported more than {} addresses; it is identified by {} and {}",
784                    MAX_LISTEN_ADDRESSES, info.protocol_version, info.agent_version
785                );
786                info.listen_addrs.truncate(MAX_LISTEN_ADDRESSES);
787            }
788
789            let kademlia = &mut self.swarm.behaviour_mut().kademlia;
790            let full_kademlia_support = kademlia
791                .protocol_names()
792                .iter()
793                .all(|local_protocol| info.protocols.contains(local_protocol));
794
795            if full_kademlia_support {
796                let received_addresses = info
797                    .listen_addrs
798                    .into_iter()
799                    .filter(|address| {
800                        if self.allow_non_global_addresses_in_dht
801                            || is_global_address_or_dns(address)
802                        {
803                            true
804                        } else {
805                            trace!(
806                                %local_peer_id,
807                                %peer_id,
808                                %address,
809                                "Ignoring self-reported non-global address",
810                            );
811
812                            false
813                        }
814                    })
815                    .collect::<Vec<_>>();
816                let received_address_strings = received_addresses
817                    .iter()
818                    .map(ToString::to_string)
819                    .collect::<Vec<_>>();
820                let old_addresses = kademlia
821                    .kbucket(peer_id)
822                    .and_then(|peers| {
823                        let key = peer_id.into();
824                        peers.iter().find_map(|peer| {
825                            (peer.node.key == &key).then_some(
826                                peer.node
827                                    .value
828                                    .iter()
829                                    .filter(|existing_address| {
830                                        let existing_address = existing_address.to_string();
831
832                                        !received_address_strings.iter().any(|received_address| {
833                                            received_address.starts_with(&existing_address)
834                                                || existing_address.starts_with(received_address)
835                                        })
836                                    })
837                                    .cloned()
838                                    .collect::<Vec<_>>(),
839                            )
840                        })
841                    })
842                    .unwrap_or_default();
843
844                for address in received_addresses {
845                    debug!(
846                        %local_peer_id,
847                        %peer_id,
848                        %address,
849                        protocol_names = ?kademlia.protocol_names(),
850                        "Adding self-reported address to Kademlia DHT",
851                    );
852
853                    kademlia.add_address(&peer_id, address);
854                }
855
856                for old_address in old_addresses {
857                    trace!(
858                        %local_peer_id,
859                        %peer_id,
860                        %old_address,
861                        "Removing old self-reported address from Kademlia DHT",
862                    );
863
864                    kademlia.remove_address(&peer_id, &old_address);
865                }
866
867                self.connected_servers.insert(peer_id);
868            } else {
869                debug!(
870                    %local_peer_id,
871                    %peer_id,
872                    peer_protocols = ?info.protocols,
873                    protocol_names = ?kademlia.protocol_names(),
874                    "Peer doesn't support our Kademlia DHT protocol",
875                );
876
877                kademlia.remove_peer(&peer_id);
878                self.connected_servers.remove(&peer_id);
879            }
880        }
881    }
882
883    async fn handle_kademlia_event(&mut self, event: KademliaEvent) {
884        trace!("Kademlia event: {:?}", event);
885
886        match event {
887            KademliaEvent::InboundRequest {
888                request: InboundRequest::AddProvider { record, .. },
889            } => {
890                debug!("Unexpected AddProvider request received: {:?}", record);
891            }
892            KademliaEvent::UnroutablePeer { peer } => {
893                debug!(%peer, "Unroutable peer detected");
894
895                self.swarm.behaviour_mut().kademlia.remove_peer(&peer);
896
897                if let Some(shared) = self.shared_weak.upgrade() {
898                    shared
899                        .handlers
900                        .peer_discovered
901                        .call_simple(&PeerDiscovered::UnroutablePeer { peer_id: peer });
902                }
903            }
904            KademliaEvent::RoutablePeer { peer, address } => {
905                debug!(?address, "Routable peer detected: {:?}", peer);
906
907                if let Some(shared) = self.shared_weak.upgrade() {
908                    shared
909                        .handlers
910                        .peer_discovered
911                        .call_simple(&PeerDiscovered::RoutablePeer {
912                            peer_id: peer,
913                            address,
914                        });
915                }
916            }
917            KademliaEvent::PendingRoutablePeer { peer, address } => {
918                debug!(?address, "Pending routable peer detected: {:?}", peer);
919
920                if let Some(shared) = self.shared_weak.upgrade() {
921                    shared
922                        .handlers
923                        .peer_discovered
924                        .call_simple(&PeerDiscovered::RoutablePeer {
925                            peer_id: peer,
926                            address,
927                        });
928                }
929            }
930            KademliaEvent::OutboundQueryProgressed {
931                step: ProgressStep { last, .. },
932                id,
933                result: QueryResult::GetClosestPeers(result),
934                ..
935            } => {
936                let mut cancelled = false;
937                if let Some(QueryResultSender::ClosestPeers { sender, .. }) =
938                    self.query_id_receivers.get(&id)
939                {
940                    match result {
941                        Ok(GetClosestPeersOk { key, peers }) => {
942                            trace!(
943                                "Get closest peers query for {} yielded {} results",
944                                hex::encode(key),
945                                peers.len(),
946                            );
947
948                            if peers.is_empty()
949                                // Connected peers collection is not empty.
950                                && self.swarm.connected_peers().next().is_some()
951                            {
952                                debug!("Random Kademlia query has yielded empty list of peers");
953                            }
954
955                            for peer in peers {
956                                cancelled = Self::unbounded_send_and_cancel_on_error(
957                                    &mut self.swarm.behaviour_mut().kademlia,
958                                    sender,
959                                    peer.peer_id,
960                                    "GetClosestPeersOk",
961                                    &id,
962                                ) || cancelled;
963                            }
964                        }
965                        Err(GetClosestPeersError::Timeout { key, peers }) => {
966                            debug!(
967                                "Get closest peers query for {} timed out with {} results",
968                                hex::encode(key),
969                                peers.len(),
970                            );
971
972                            for peer in peers {
973                                cancelled = Self::unbounded_send_and_cancel_on_error(
974                                    &mut self.swarm.behaviour_mut().kademlia,
975                                    sender,
976                                    peer.peer_id,
977                                    "GetClosestPeersError::Timeout",
978                                    &id,
979                                ) || cancelled;
980                            }
981                        }
982                    }
983                }
984
985                if last || cancelled {
986                    // There will be no more progress
987                    self.query_id_receivers.remove(&id);
988                }
989            }
990            KademliaEvent::OutboundQueryProgressed {
991                step: ProgressStep { last, .. },
992                id,
993                result: QueryResult::GetRecord(result),
994                ..
995            } => {
996                let mut cancelled = false;
997                if let Some(QueryResultSender::Value { sender, .. }) =
998                    self.query_id_receivers.get(&id)
999                {
1000                    match result {
1001                        Ok(GetRecordOk::FoundRecord(rec)) => {
1002                            trace!(
1003                                key = hex::encode(&rec.record.key),
1004                                "Get record query succeeded",
1005                            );
1006
1007                            cancelled = Self::unbounded_send_and_cancel_on_error(
1008                                &mut self.swarm.behaviour_mut().kademlia,
1009                                sender,
1010                                rec,
1011                                "GetRecordOk",
1012                                &id,
1013                            ) || cancelled;
1014                        }
1015                        Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
1016                            trace!("Get record query yielded no results");
1017                        }
1018                        Err(error) => match error {
1019                            GetRecordError::NotFound { key, .. } => {
1020                                debug!(
1021                                    key = hex::encode(&key),
1022                                    "Get record query failed with no results",
1023                                );
1024                            }
1025                            GetRecordError::QuorumFailed { key, records, .. } => {
1026                                debug!(
1027                                    key = hex::encode(&key),
1028                                    "Get record query quorum failed with {} results",
1029                                    records.len(),
1030                                );
1031                            }
1032                            GetRecordError::Timeout { key } => {
1033                                debug!(key = hex::encode(&key), "Get record query timed out");
1034                            }
1035                        },
1036                    }
1037                }
1038
1039                if last || cancelled {
1040                    // There will be no more progress
1041                    self.query_id_receivers.remove(&id);
1042                }
1043            }
1044            KademliaEvent::OutboundQueryProgressed {
1045                step: ProgressStep { last, .. },
1046                id,
1047                result: QueryResult::GetProviders(result),
1048                ..
1049            } => {
1050                let mut cancelled = false;
1051                if let Some(QueryResultSender::Providers { key, sender, .. }) =
1052                    self.query_id_receivers.get(&id)
1053                {
1054                    match result {
1055                        Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1056                            trace!(
1057                                key = hex::encode(&key),
1058                                "Get providers query yielded {} results",
1059                                providers.len(),
1060                            );
1061
1062                            for provider in providers {
1063                                cancelled = Self::unbounded_send_and_cancel_on_error(
1064                                    &mut self.swarm.behaviour_mut().kademlia,
1065                                    sender,
1066                                    provider,
1067                                    "GetProvidersOk",
1068                                    &id,
1069                                ) || cancelled;
1070                            }
1071                        }
1072                        Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
1073                            trace!(
1074                                key = hex::encode(key),
1075                                closest_peers = %closest_peers.len(),
1076                                "Get providers query yielded no results"
1077                            );
1078                        }
1079                        Err(error) => {
1080                            let GetProvidersError::Timeout { key, .. } = error;
1081
1082                            debug!(
1083                                key = hex::encode(&key),
1084                                "Get providers query failed with no results",
1085                            );
1086                        }
1087                    }
1088                }
1089
1090                if last || cancelled {
1091                    // There will be no more progress
1092                    self.query_id_receivers.remove(&id);
1093                }
1094            }
1095            KademliaEvent::OutboundQueryProgressed {
1096                step: ProgressStep { last, .. },
1097                id,
1098                result: QueryResult::PutRecord(result),
1099                ..
1100            } => {
1101                let mut cancelled = false;
1102                if let Some(QueryResultSender::PutValue { sender, .. }) =
1103                    self.query_id_receivers.get(&id)
1104                {
1105                    match result {
1106                        Ok(PutRecordOk { key, .. }) => {
1107                            trace!("Put record query for {} succeeded", hex::encode(&key));
1108
1109                            cancelled = Self::unbounded_send_and_cancel_on_error(
1110                                &mut self.swarm.behaviour_mut().kademlia,
1111                                sender,
1112                                (),
1113                                "PutRecordOk",
1114                                &id,
1115                            ) || cancelled;
1116                        }
1117                        Err(error) => {
1118                            debug!(?error, "Put record query failed.");
1119                        }
1120                    }
1121                }
1122
1123                if last || cancelled {
1124                    // There will be no more progress
1125                    self.query_id_receivers.remove(&id);
1126                }
1127            }
1128            KademliaEvent::OutboundQueryProgressed {
1129                step: ProgressStep { last, count },
1130                id,
1131                result: QueryResult::Bootstrap(result),
1132                stats,
1133            } => {
1134                debug!(?stats, %last, %count, ?id, ?result, "Bootstrap OutboundQueryProgressed step.");
1135
1136                let mut cancelled = false;
1137                if let Some(QueryResultSender::Bootstrap { sender }) =
1138                    self.query_id_receivers.get_mut(&id)
1139                {
1140                    match result {
1141                        Ok(BootstrapOk {
1142                            peer,
1143                            num_remaining,
1144                        }) => {
1145                            trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded");
1146
1147                            cancelled = Self::unbounded_send_and_cancel_on_error(
1148                                &mut self.swarm.behaviour_mut().kademlia,
1149                                sender,
1150                                (),
1151                                "Bootstrap",
1152                                &id,
1153                            ) || cancelled;
1154                        }
1155                        Err(error) => {
1156                            debug!(?error, "Bootstrap query failed.");
1157                        }
1158                    }
1159                }
1160
1161                if last || cancelled {
1162                    // There will be no more progress
1163                    self.query_id_receivers.remove(&id);
1164                }
1165            }
1166            _ => {}
1167        }
1168    }
1169
1170    // Returns `true` if query was cancelled
1171    fn unbounded_send_and_cancel_on_error<T>(
1172        kademlia: &mut Kademlia<DummyRecordStore>,
1173        sender: &mpsc::UnboundedSender<T>,
1174        value: T,
1175        channel: &'static str,
1176        id: &QueryId,
1177    ) -> bool {
1178        if sender.unbounded_send(value).is_err() {
1179            debug!("{} channel was dropped", channel);
1180
1181            // Cancel query
1182            if let Some(mut query) = kademlia.query_mut(id) {
1183                query.finish();
1184            }
1185            true
1186        } else {
1187            false
1188        }
1189    }
1190
1191    async fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
1192        if let GossipsubEvent::Message { message, .. } = event
1193            && let Some(senders) = self.topic_subscription_senders.get(&message.topic)
1194        {
1195            let bytes = Bytes::from(message.data);
1196
1197            for sender in senders.values() {
1198                // Doesn't matter if receiver is still listening for messages or not.
1199                let _ = sender.unbounded_send(bytes.clone());
1200            }
1201        }
1202    }
1203
1204    async fn handle_request_response_event(&mut self, event: RequestResponseEvent) {
1205        // No actions on statistics events.
1206        trace!("Request response event: {:?}", event);
1207    }
1208
1209    async fn handle_autonat_event(&mut self, event: AutonatEvent) {
1210        trace!(?event, "Autonat event received.");
1211        let autonat = &self.swarm.behaviour().autonat;
1212        debug!(
1213            public_address=?autonat.public_address(),
1214            confidence=%autonat.confidence(),
1215            "Current public address confidence."
1216        );
1217
1218        match event {
1219            AutonatEvent::InboundProbe(_inbound_probe_event) => {
1220                // We do not care about this event
1221            }
1222            AutonatEvent::OutboundProbe(outbound_probe_event) => {
1223                match outbound_probe_event {
1224                    OutboundProbeEvent::Request { peer, .. } => {
1225                        // For outbound probe request add peer to allow list to ensure they can dial us back and not hit
1226                        // global incoming connection limit
1227                        self.swarm
1228                            .behaviour_mut()
1229                            .connection_limits
1230                            // We expect a single successful dial from this peer
1231                            .add_to_incoming_allow_list(
1232                                peer,
1233                                self.peer_ip_addresses
1234                                    .get(&peer)
1235                                    .iter()
1236                                    .flat_map(|ip_addresses| ip_addresses.iter())
1237                                    .copied(),
1238                                1,
1239                            );
1240                    }
1241                    OutboundProbeEvent::Response { peer, .. } => {
1242                        self.swarm
1243                            .behaviour_mut()
1244                            .connection_limits
1245                            .remove_from_incoming_allow_list(&peer, Some(1));
1246                    }
1247                    OutboundProbeEvent::Error { peer, .. } => {
1248                        if let Some(peer) = peer {
1249                            self.swarm
1250                                .behaviour_mut()
1251                                .connection_limits
1252                                .remove_from_incoming_allow_list(&peer, Some(1));
1253                        }
1254                    }
1255                }
1256            }
1257            AutonatEvent::StatusChanged { old, new } => {
1258                debug!(?old, ?new, "Public address status changed.");
1259
1260                // TODO: Remove block once https://github.com/libp2p/rust-libp2p/issues/4863 is resolved
1261                if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new.clone()) {
1262                    self.swarm.remove_external_address(&old_address);
1263                    debug!(
1264                        ?old_address,
1265                        new_status = ?new,
1266                        "Removing old external address...",
1267                    );
1268
1269                    // Trigger potential mode change manually
1270                    self.swarm.behaviour_mut().kademlia.set_mode(None);
1271                }
1272
1273                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
1274                self.swarm.behaviour_mut().identify.push(connected_peers);
1275            }
1276        }
1277    }
1278
1279    fn handle_command(&mut self, command: Command) {
1280        match command {
1281            Command::GetValue {
1282                key,
1283                result_sender,
1284                permit,
1285            } => {
1286                let query_id = self
1287                    .swarm
1288                    .behaviour_mut()
1289                    .kademlia
1290                    .get_record(key.to_bytes().into());
1291
1292                self.query_id_receivers.insert(
1293                    query_id,
1294                    QueryResultSender::Value {
1295                        sender: result_sender,
1296                        _permit: permit,
1297                    },
1298                );
1299            }
1300            Command::PutValue {
1301                key,
1302                value,
1303                result_sender,
1304                permit,
1305            } => {
1306                let local_peer_id = *self.swarm.local_peer_id();
1307
1308                let record = Record {
1309                    key: key.into(),
1310                    value,
1311                    publisher: Some(local_peer_id),
1312                    expires: None, // No time expiration.
1313                };
1314                let query_result = self
1315                    .swarm
1316                    .behaviour_mut()
1317                    .kademlia
1318                    .put_record(record, Quorum::One);
1319
1320                match query_result {
1321                    Ok(query_id) => {
1322                        self.query_id_receivers.insert(
1323                            query_id,
1324                            QueryResultSender::PutValue {
1325                                sender: result_sender,
1326                                _permit: permit,
1327                            },
1328                        );
1329                    }
1330                    Err(err) => {
1331                        warn!(?err, "Failed to put value.");
1332                    }
1333                }
1334            }
1335            Command::Subscribe {
1336                topic,
1337                result_sender,
1338            } => {
1339                if !self.swarm.behaviour().gossipsub.is_enabled() {
1340                    panic!("Gossipsub protocol is disabled.");
1341                }
1342
1343                let topic_hash = topic.hash();
1344                let (sender, receiver) = mpsc::unbounded();
1345
1346                // Unconditionally create subscription ID, code is simpler this way.
1347                let subscription_id = self.next_subscription_id;
1348                self.next_subscription_id += 1;
1349
1350                let created_subscription = CreatedSubscription {
1351                    subscription_id,
1352                    receiver,
1353                };
1354
1355                match self.topic_subscription_senders.entry(topic_hash) {
1356                    Entry::Occupied(mut entry) => {
1357                        // In case subscription already exists, just add one more sender to it.
1358                        if result_sender.send(Ok(created_subscription)).is_ok() {
1359                            entry.get_mut().insert(subscription_id, sender);
1360                        }
1361                    }
1362                    Entry::Vacant(entry) => {
1363                        // Otherwise subscription needs to be created.
1364
1365                        if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1366                            match gossipsub.subscribe(&topic) {
1367                                Ok(true) => {
1368                                    if result_sender.send(Ok(created_subscription)).is_ok() {
1369                                        entry
1370                                            .insert(IntMap::from_iter([(subscription_id, sender)]));
1371                                    }
1372                                }
1373                                Ok(false) => {
1374                                    panic!(
1375                                        "Logic error, topic subscription wasn't created, this \
1376                                        must never happen"
1377                                    );
1378                                }
1379                                Err(error) => {
1380                                    let _ = result_sender.send(Err(error));
1381                                }
1382                            }
1383                        }
1384                    }
1385                }
1386            }
1387            Command::Unsubscribe {
1388                topic,
1389                subscription_id,
1390            } => {
1391                if !self.swarm.behaviour().gossipsub.is_enabled() {
1392                    panic!("Gossipsub protocol is disabled.");
1393                }
1394
1395                if let Entry::Occupied(mut entry) =
1396                    self.topic_subscription_senders.entry(topic.hash())
1397                {
1398                    entry.get_mut().remove(&subscription_id);
1399
1400                    // If last sender was removed - unsubscribe.
1401                    if entry.get().is_empty() {
1402                        entry.remove_entry();
1403
1404                        if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut()
1405                            && !gossipsub.unsubscribe(&topic)
1406                        {
1407                            warn!(
1408                                "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1409                                    this is a logic error in the subspace or swarm libraries"
1410                            );
1411                        }
1412                    }
1413                } else {
1414                    error!(
1415                        "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1416                        this is a logic error in the subspace library"
1417                    );
1418                }
1419            }
1420            Command::Publish {
1421                topic,
1422                message,
1423                result_sender,
1424            } => {
1425                if !self.swarm.behaviour().gossipsub.is_enabled() {
1426                    panic!("Gossipsub protocol is disabled.");
1427                }
1428
1429                if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1430                    // Doesn't matter if receiver still waits for response.
1431                    let _ =
1432                        result_sender.send(gossipsub.publish(topic, message).map(|_message_id| ()));
1433                }
1434            }
1435            Command::GetClosestPeers {
1436                key,
1437                result_sender,
1438                permit,
1439            } => {
1440                let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(key);
1441
1442                self.query_id_receivers.insert(
1443                    query_id,
1444                    QueryResultSender::ClosestPeers {
1445                        sender: result_sender,
1446                        _permit: permit,
1447                    },
1448                );
1449            }
1450            Command::GetClosestLocalPeers {
1451                key,
1452                source,
1453                result_sender,
1454            } => {
1455                let source = source.unwrap_or_else(|| *self.swarm.local_peer_id());
1456                let result = self
1457                    .swarm
1458                    .behaviour_mut()
1459                    .kademlia
1460                    .find_closest_local_peers(&KBucketKey::from(key), &source)
1461                    .filter(|peer| !peer.multiaddrs.is_empty())
1462                    .map(|peer| (peer.node_id, peer.multiaddrs))
1463                    .collect();
1464
1465                // Doesn't matter if receiver still waits for response.
1466                let _ = result_sender.send(result);
1467            }
1468            Command::GenericRequest {
1469                peer_id,
1470                addresses,
1471                protocol_name,
1472                request,
1473                result_sender,
1474            } => {
1475                self.swarm.behaviour_mut().request_response.send_request(
1476                    &peer_id,
1477                    protocol_name,
1478                    request,
1479                    result_sender,
1480                    IfDisconnected::TryConnect,
1481                    addresses,
1482                );
1483            }
1484            Command::GetProviders {
1485                key,
1486                result_sender,
1487                permit,
1488            } => {
1489                let query_id = self
1490                    .swarm
1491                    .behaviour_mut()
1492                    .kademlia
1493                    .get_providers(key.clone());
1494
1495                self.query_id_receivers.insert(
1496                    query_id,
1497                    QueryResultSender::Providers {
1498                        key,
1499                        sender: result_sender,
1500                        _permit: permit,
1501                    },
1502                );
1503            }
1504            Command::BanPeer { peer_id } => {
1505                self.ban_peer(peer_id);
1506            }
1507            Command::Dial { address } => {
1508                let _ = self.swarm.dial(address);
1509            }
1510            Command::ConnectedPeers { result_sender } => {
1511                let connected_peers = self.swarm.connected_peers().cloned().collect();
1512
1513                let _ = result_sender.send(connected_peers);
1514            }
1515            Command::ConnectedServers { result_sender } => {
1516                let connected_servers = self.connected_servers.iter().cloned().collect();
1517
1518                let _ = result_sender.send(connected_servers);
1519            }
1520            Command::Bootstrap { result_sender } => {
1521                let kademlia = &mut self.swarm.behaviour_mut().kademlia;
1522
1523                match kademlia.bootstrap() {
1524                    Ok(query_id) => {
1525                        if let Some(result_sender) = result_sender {
1526                            self.query_id_receivers.insert(
1527                                query_id,
1528                                QueryResultSender::Bootstrap {
1529                                    sender: result_sender,
1530                                },
1531                            );
1532                        }
1533                    }
1534                    Err(err) => {
1535                        debug!(?err, "Bootstrap error.");
1536                    }
1537                }
1538            }
1539        }
1540    }
1541
1542    fn ban_peer(&mut self, peer_id: PeerId) {
1543        // Remove temporary ban if there is any before creating a permanent one
1544        self.temporary_bans.lock().remove(&peer_id);
1545
1546        debug!(?peer_id, "Banning peer on network level");
1547
1548        self.swarm.behaviour_mut().block_list.block_peer(peer_id);
1549        self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
1550        self.known_peers_registry
1551            .remove_all_known_peer_addresses(peer_id);
1552    }
1553
1554    fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
1555        if let Some(ref mut metrics) = self.libp2p_metrics {
1556            match swarm_event {
1557                SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
1558                    metrics.record(ping_event);
1559                }
1560                SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
1561                    metrics.record(identify_event);
1562                }
1563                SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
1564                    metrics.record(kademlia_event);
1565                }
1566                SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
1567                    metrics.record(gossipsub_event);
1568                }
1569                // TODO: implement in the upstream repository
1570                // SwarmEvent::Behaviour(Event::RequestResponse(request_response_event)) => {
1571                //     self.metrics.record(request_response_event);
1572                // }
1573                swarm_event => {
1574                    metrics.record(swarm_event);
1575                }
1576            }
1577        }
1578    }
1579
1580    fn log_kademlia_stats(&mut self) {
1581        let mut kad_peers = 0;
1582        let mut kad_peers_with_no_address = 0;
1583
1584        for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1585            for entry in kbucket.iter() {
1586                kad_peers += 1;
1587                if entry.node.value.len() == 0 {
1588                    kad_peers_with_no_address += 1;
1589                }
1590            }
1591        }
1592
1593        let (known_peers, known_peer_addresses) = self.known_peers_registry.count_known_peers();
1594
1595        let connected_peers = self.connected_servers.len();
1596
1597        let peers_with_ip_address = self.peer_ip_addresses.len();
1598        let peer_ip_address_count = self
1599            .peer_ip_addresses
1600            .values()
1601            .map(|addresses| addresses.len())
1602            .sum::<usize>();
1603
1604        debug!(
1605            %connected_peers,
1606            %kad_peers,
1607            %kad_peers_with_no_address,
1608            %known_peers,
1609            %known_peer_addresses,
1610            %peers_with_ip_address,
1611            %peer_ip_address_count,
1612            "dsn peers",
1613        );
1614
1615        if self.last_peer_stats_info_log.elapsed() >= PEER_INFO_LOG_INTERVAL {
1616            self.last_peer_stats_info_log = Instant::now();
1617            info!("dsn: actively using {connected_peers}/{kad_peers} known peers");
1618        }
1619    }
1620}