subspace_networking/
node_runner.rs

1use crate::behavior::persistent_parameters::{
2    append_p2p_suffix, remove_p2p_suffix, KnownPeersRegistry, PeerAddressRemovedEvent,
3};
4use crate::behavior::{Behavior, Event};
5use crate::constructor::temporary_bans::TemporaryBans;
6use crate::constructor::DummyRecordStore;
7use crate::protocols::request_response::request_response_factory::{
8    Event as RequestResponseEvent, IfDisconnected,
9};
10use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
11use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics};
12use async_lock::Mutex as AsyncMutex;
13use bytes::Bytes;
14use event_listener_primitives::HandlerId;
15use futures::channel::mpsc;
16use futures::future::Fuse;
17use futures::{FutureExt, StreamExt};
18use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
19use libp2p::core::ConnectedPoint;
20use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
21use libp2p::identify::Event as IdentifyEvent;
22use libp2p::kad::{
23    Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
24    GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
25    InboundRequest, KBucketKey, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult,
26    Quorum, Record, RecordKey,
27};
28use libp2p::metrics::{Metrics, Recorder};
29use libp2p::multiaddr::Protocol;
30use libp2p::swarm::dial_opts::DialOpts;
31use libp2p::swarm::{DialError, SwarmEvent};
32use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
33use nohash_hasher::IntMap;
34use parking_lot::Mutex;
35use std::collections::hash_map::Entry;
36use std::collections::{HashMap, HashSet};
37use std::fmt;
38use std::net::IpAddr;
39use std::pin::Pin;
40use std::sync::atomic::Ordering;
41use std::sync::{Arc, Weak};
42use std::time::Duration;
43use tokio::sync::OwnedSemaphorePermit;
44use tokio::task::yield_now;
45use tokio::time::Sleep;
46use tracing::{debug, error, trace, warn};
47
48enum QueryResultSender {
49    Value {
50        sender: mpsc::UnboundedSender<PeerRecord>,
51        // Just holding onto permit while data structure is not dropped
52        _permit: OwnedSemaphorePermit,
53    },
54    ClosestPeers {
55        sender: mpsc::UnboundedSender<PeerId>,
56        // Just holding onto permit while data structure is not dropped
57        _permit: Option<OwnedSemaphorePermit>,
58    },
59    Providers {
60        key: RecordKey,
61        sender: mpsc::UnboundedSender<PeerId>,
62        // Just holding onto permit while data structure is not dropped
63        _permit: Option<OwnedSemaphorePermit>,
64    },
65    PutValue {
66        sender: mpsc::UnboundedSender<()>,
67        // Just holding onto permit while data structure is not dropped
68        _permit: OwnedSemaphorePermit,
69    },
70    Bootstrap {
71        sender: mpsc::UnboundedSender<()>,
72    },
73}
74
75#[derive(Debug, Default)]
76enum BootstrapCommandState {
77    #[default]
78    NotStarted,
79    InProgress(mpsc::UnboundedReceiver<()>),
80    Finished,
81}
82
83/// Runner for the Node.
84#[must_use = "Node does not function properly unless its runner is driven forward"]
85pub struct NodeRunner {
86    /// Should non-global addresses be added to the DHT?
87    allow_non_global_addresses_in_dht: bool,
88    /// Whether node is listening on some addresses
89    is_listening: bool,
90    command_receiver: mpsc::Receiver<Command>,
91    swarm: Swarm<Behavior>,
92    shared_weak: Weak<Shared>,
93    /// How frequently should random queries be done using Kademlia DHT to populate routing table.
94    next_random_query_interval: Duration,
95    query_id_receivers: HashMap<QueryId, QueryResultSender>,
96    /// Global subscription counter, is assigned to every (logical) subscription and is used for
97    /// unsubscribing.
98    next_subscription_id: usize,
99    /// Topic subscription senders for logical subscriptions (multiple logical subscriptions can be
100    /// present for the same physical subscription).
101    topic_subscription_senders: HashMap<TopicHash, IntMap<usize, mpsc::UnboundedSender<Bytes>>>,
102    random_query_timeout: Pin<Box<Fuse<Sleep>>>,
103    /// Defines an interval between periodical tasks.
104    periodical_tasks_interval: Pin<Box<Fuse<Sleep>>>,
105    /// Manages the networking parameters like known peers and addresses
106    known_peers_registry: Box<dyn KnownPeersRegistry>,
107    connected_servers: HashSet<PeerId>,
108    /// Defines set of peers with a permanent connection (and reconnection if necessary).
109    reserved_peers: HashMap<PeerId, Multiaddr>,
110    /// Temporarily banned peers.
111    temporary_bans: Arc<Mutex<TemporaryBans>>,
112    /// Libp2p Prometheus metrics.
113    libp2p_metrics: Option<Metrics>,
114    /// Subspace Prometheus metrics.
115    metrics: Option<SubspaceMetrics>,
116    /// Mapping from specific peer to ip addresses
117    peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
118    /// Defines protocol version for the network peers. Affects network partition.
119    protocol_version: String,
120    /// Addresses to bootstrap Kademlia network
121    bootstrap_addresses: Vec<Multiaddr>,
122    /// Ensures a single bootstrap on run() invocation.
123    bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
124    /// Receives an event on peer address removal from the persistent storage.
125    removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
126    /// Optional storage for the [`HandlerId`] of the address removal task.
127    /// We keep to stop the task along with the rest of the networking.
128    _address_removal_task_handler_id: Option<HandlerId>,
129}
130
131impl fmt::Debug for NodeRunner {
132    #[inline]
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("NodeRunner").finish_non_exhaustive()
135    }
136}
137
138// Helper struct for NodeRunner configuration (clippy requirement).
139pub(crate) struct NodeRunnerConfig {
140    pub(crate) allow_non_global_addresses_in_dht: bool,
141    /// Whether node is listening on some addresses
142    pub(crate) is_listening: bool,
143    pub(crate) command_receiver: mpsc::Receiver<Command>,
144    pub(crate) swarm: Swarm<Behavior>,
145    pub(crate) shared_weak: Weak<Shared>,
146    pub(crate) next_random_query_interval: Duration,
147    pub(crate) known_peers_registry: Box<dyn KnownPeersRegistry>,
148    pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
149    pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
150    pub(crate) libp2p_metrics: Option<Metrics>,
151    pub(crate) metrics: Option<SubspaceMetrics>,
152    pub(crate) protocol_version: String,
153    pub(crate) bootstrap_addresses: Vec<Multiaddr>,
154}
155
156impl NodeRunner {
157    pub(crate) fn new(
158        NodeRunnerConfig {
159            allow_non_global_addresses_in_dht,
160            is_listening,
161            command_receiver,
162            swarm,
163            shared_weak,
164            next_random_query_interval,
165            mut known_peers_registry,
166            reserved_peers,
167            temporary_bans,
168            libp2p_metrics,
169            metrics,
170            protocol_version,
171            bootstrap_addresses,
172        }: NodeRunnerConfig,
173    ) -> Self {
174        // Setup the address removal events exchange between persistent params storage and Kademlia.
175        let (removed_addresses_tx, removed_addresses_rx) = mpsc::unbounded();
176        let mut address_removal_task_handler_id = None;
177        if let Some(handler_id) = known_peers_registry.on_unreachable_address({
178            Arc::new(move |event| {
179                if let Err(error) = removed_addresses_tx.unbounded_send(event.clone()) {
180                    debug!(?error, ?event, "Cannot send PeerAddressRemovedEvent")
181                };
182            })
183        }) {
184            address_removal_task_handler_id.replace(handler_id);
185        }
186
187        Self {
188            allow_non_global_addresses_in_dht,
189            is_listening,
190            command_receiver,
191            swarm,
192            shared_weak,
193            next_random_query_interval,
194            query_id_receivers: HashMap::default(),
195            next_subscription_id: 0,
196            topic_subscription_senders: HashMap::default(),
197            // We'll make the first query right away and continue at the interval.
198            random_query_timeout: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
199            // We'll make the first dial right away and continue at the interval.
200            periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
201            known_peers_registry,
202            connected_servers: HashSet::new(),
203            reserved_peers,
204            temporary_bans,
205            libp2p_metrics,
206            metrics,
207            peer_ip_addresses: HashMap::new(),
208            protocol_version,
209            bootstrap_addresses,
210            bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
211            removed_addresses_rx,
212            _address_removal_task_handler_id: address_removal_task_handler_id,
213        }
214    }
215
216    /// Drives the main networking future forward.
217    pub async fn run(&mut self) {
218        if self.is_listening {
219            // Wait for listen addresses, otherwise we will get ephemeral addresses in external address candidates that
220            // we do not want
221            loop {
222                if self.swarm.listeners().next().is_some() {
223                    break;
224                }
225
226                if let Some(swarm_event) = self.swarm.next().await {
227                    self.register_event_metrics(&swarm_event);
228                    self.handle_swarm_event(swarm_event).await;
229                } else {
230                    break;
231                }
232            }
233        }
234
235        self.bootstrap().await;
236
237        loop {
238            futures::select! {
239                _ = &mut self.random_query_timeout => {
240                    self.handle_random_query_interval();
241                    // Increase interval 2x, but to at most 60 seconds.
242                    self.random_query_timeout =
243                        Box::pin(tokio::time::sleep(self.next_random_query_interval).fuse());
244                    self.next_random_query_interval =
245                        (self.next_random_query_interval * 2).min(Duration::from_secs(60));
246                },
247                swarm_event = self.swarm.next() => {
248                    if let Some(swarm_event) = swarm_event {
249                        self.register_event_metrics(&swarm_event);
250                        self.handle_swarm_event(swarm_event).await;
251                    } else {
252                        break;
253                    }
254                },
255                command = self.command_receiver.next() => {
256                    if let Some(command) = command {
257                        self.handle_command(command);
258                    } else {
259                        break;
260                    }
261                },
262                _ = self.known_peers_registry.run().fuse() => {
263                    trace!("Network parameters registry runner exited.")
264                },
265                _ = &mut self.periodical_tasks_interval => {
266                    self.handle_periodical_tasks().await;
267
268                    self.periodical_tasks_interval =
269                        Box::pin(tokio::time::sleep(Duration::from_secs(5)).fuse());
270                },
271                event = self.removed_addresses_rx.select_next_some() => {
272                    self.handle_removed_address_event(event);
273                },
274            }
275
276            // Allow to exit from busy loop during graceful shutdown
277            yield_now().await;
278        }
279    }
280
281    /// Bootstraps Kademlia network
282    async fn bootstrap(&mut self) {
283        // Add bootstrap nodes first to make sure there is space for them in k-buckets
284        for (peer_id, address) in strip_peer_id(self.bootstrap_addresses.clone()) {
285            self.swarm
286                .behaviour_mut()
287                .kademlia
288                .add_address(&peer_id, address);
289        }
290
291        let known_peers = self.known_peers_registry.all_known_peers().await;
292
293        if !known_peers.is_empty() {
294            for (peer_id, addresses) in known_peers {
295                for address in addresses.clone() {
296                    let address = match address.with_p2p(peer_id) {
297                        Ok(address) => address,
298                        Err(address) => {
299                            warn!(%peer_id, %address, "Failed to add peer ID to known peer address");
300                            break;
301                        }
302                    };
303                    self.swarm
304                        .behaviour_mut()
305                        .kademlia
306                        .add_address(&peer_id, address);
307                }
308
309                if let Err(error) = self
310                    .swarm
311                    .dial(DialOpts::peer_id(peer_id).addresses(addresses).build())
312                {
313                    warn!(%peer_id, %error, "Failed to dial peer during bootstrapping");
314                }
315            }
316
317            // Do bootstrap asynchronously
318            self.handle_command(Command::Bootstrap {
319                result_sender: None,
320            });
321            return;
322        }
323
324        let bootstrap_command_state = self.bootstrap_command_state.clone();
325        let mut bootstrap_command_state = bootstrap_command_state.lock().await;
326        let bootstrap_command_receiver = match &mut *bootstrap_command_state {
327            BootstrapCommandState::NotStarted => {
328                debug!("Bootstrap started.");
329
330                let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
331
332                self.handle_command(Command::Bootstrap {
333                    result_sender: Some(bootstrap_command_sender),
334                });
335
336                *bootstrap_command_state =
337                    BootstrapCommandState::InProgress(bootstrap_command_receiver);
338                match &mut *bootstrap_command_state {
339                    BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
340                        bootstrap_command_receiver
341                    }
342                    _ => {
343                        unreachable!("Was just set to that exact value");
344                    }
345                }
346            }
347            BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
348                bootstrap_command_receiver
349            }
350            BootstrapCommandState::Finished => {
351                return;
352            }
353        };
354
355        let mut bootstrap_step = 0;
356        loop {
357            futures::select! {
358                swarm_event = self.swarm.next() => {
359                    if let Some(swarm_event) = swarm_event {
360                        self.register_event_metrics(&swarm_event);
361                        self.handle_swarm_event(swarm_event).await;
362                    } else {
363                        break;
364                    }
365                },
366                result = bootstrap_command_receiver.next() => {
367                    if result.is_some() {
368                        debug!(%bootstrap_step, "Kademlia bootstrapping...");
369                        bootstrap_step += 1;
370                    } else {
371                        break;
372                    }
373                }
374            }
375        }
376
377        debug!("Bootstrap finished.");
378        *bootstrap_command_state = BootstrapCommandState::Finished;
379    }
380
381    /// Handles periodical tasks.
382    async fn handle_periodical_tasks(&mut self) {
383        // Log current connections.
384        let network_info = self.swarm.network_info();
385        let connections = network_info.connection_counters();
386
387        debug!(?connections, "Current connections and limits.");
388
389        // Renew known external addresses.
390        let mut external_addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
391
392        if let Some(shared) = self.shared_weak.upgrade() {
393            debug!(?external_addresses, "Renew external addresses.");
394            let mut addresses = shared.external_addresses.lock();
395            addresses.clear();
396            addresses.append(&mut external_addresses);
397        }
398
399        self.log_kademlia_stats();
400    }
401
402    fn handle_random_query_interval(&mut self) {
403        let random_peer_id = PeerId::random();
404
405        trace!("Starting random Kademlia query for {}", random_peer_id);
406
407        self.swarm
408            .behaviour_mut()
409            .kademlia
410            .get_closest_peers(random_peer_id);
411    }
412
413    fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) {
414        trace!(?event, "Peer address removed event.");
415
416        let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone())
417            .into_iter()
418            .map(|(peer_id, _)| peer_id)
419            .collect::<Vec<_>>();
420
421        if bootstrap_node_ids.contains(&event.peer_id) {
422            debug!(
423                ?event,
424                ?bootstrap_node_ids,
425                "Skipped removing bootstrap node from Kademlia buckets."
426            );
427
428            return;
429        }
430
431        // Remove both versions of the address
432        self.swarm.behaviour_mut().kademlia.remove_address(
433            &event.peer_id,
434            &append_p2p_suffix(event.peer_id, event.address.clone()),
435        );
436
437        self.swarm
438            .behaviour_mut()
439            .kademlia
440            .remove_address(&event.peer_id, &remove_p2p_suffix(event.address));
441    }
442
443    fn handle_remove_listeners(&mut self, removed_listeners: &[Multiaddr]) {
444        let shared = match self.shared_weak.upgrade() {
445            Some(shared) => shared,
446            None => {
447                return;
448            }
449        };
450
451        // Remove both versions of the address
452        let peer_id = shared.id;
453        shared.listeners.lock().retain(|old_listener| {
454            !removed_listeners.contains(&append_p2p_suffix(peer_id, old_listener.clone()))
455                && !removed_listeners.contains(&remove_p2p_suffix(old_listener.clone()))
456        });
457    }
458
459    async fn handle_swarm_event(&mut self, swarm_event: SwarmEvent<Event>) {
460        match swarm_event {
461            SwarmEvent::Behaviour(Event::Identify(event)) => {
462                self.handle_identify_event(event).await;
463            }
464            SwarmEvent::Behaviour(Event::Kademlia(event)) => {
465                self.handle_kademlia_event(event).await;
466            }
467            SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
468                self.handle_gossipsub_event(event).await;
469            }
470            SwarmEvent::Behaviour(Event::RequestResponse(event)) => {
471                self.handle_request_response_event(event).await;
472            }
473            SwarmEvent::Behaviour(Event::Autonat(event)) => {
474                self.handle_autonat_event(event).await;
475            }
476            ref event @ SwarmEvent::NewListenAddr { ref address, .. } => {
477                trace!(?event, "New local listener  event.");
478
479                let shared = match self.shared_weak.upgrade() {
480                    Some(shared) => shared,
481                    None => {
482                        return;
483                    }
484                };
485                shared.listeners.lock().push(address.clone());
486                shared.handlers.new_listener.call_simple(address);
487            }
488            ref event @ SwarmEvent::ListenerClosed { ref addresses, .. } => {
489                trace!(?event, "Local listener closed event.");
490                self.handle_remove_listeners(addresses);
491            }
492            ref event @ SwarmEvent::ExpiredListenAddr { ref address, .. } => {
493                trace!(?event, "Local listener expired event.");
494                self.handle_remove_listeners(&[address.clone()]);
495            }
496            SwarmEvent::ConnectionEstablished {
497                peer_id,
498                endpoint,
499                num_established,
500                ..
501            } => {
502                // Save known addresses that were successfully dialed.
503                if let ConnectedPoint::Dialer { address, .. } = &endpoint {
504                    // filter non-global addresses when non-globals addresses are disabled
505                    if self.allow_non_global_addresses_in_dht || is_global_address_or_dns(address) {
506                        self.known_peers_registry
507                            .add_known_peer(peer_id, vec![address.clone()])
508                            .await;
509                    }
510                };
511
512                let shared = match self.shared_weak.upgrade() {
513                    Some(shared) => shared,
514                    None => {
515                        return;
516                    }
517                };
518
519                let is_reserved_peer = self.reserved_peers.contains_key(&peer_id);
520                debug!(
521                    %peer_id,
522                    %is_reserved_peer,
523                    ?endpoint,
524                    %num_established,
525                    "Connection established"
526                );
527
528                let maybe_remote_ip =
529                    endpoint
530                        .get_remote_address()
531                        .iter()
532                        .find_map(|protocol| match protocol {
533                            Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
534                            Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
535                            _ => None,
536                        });
537                if let Some(ip) = maybe_remote_ip {
538                    self.peer_ip_addresses
539                        .entry(peer_id)
540                        .and_modify(|ips| {
541                            ips.insert(ip);
542                        })
543                        .or_insert(HashSet::from([ip]));
544                }
545
546                let num_established_peer_connections = shared
547                    .num_established_peer_connections
548                    .fetch_add(1, Ordering::SeqCst)
549                    + 1;
550
551                shared
552                    .handlers
553                    .num_established_peer_connections_change
554                    .call_simple(&num_established_peer_connections);
555
556                // A new connection
557                if num_established.get() == 1 {
558                    shared.handlers.connected_peer.call_simple(&peer_id);
559                }
560
561                if let Some(metrics) = self.metrics.as_ref() {
562                    metrics.inc_established_connections()
563                }
564            }
565            SwarmEvent::ConnectionClosed {
566                peer_id,
567                num_established,
568                cause,
569                ..
570            } => {
571                let shared = match self.shared_weak.upgrade() {
572                    Some(shared) => shared,
573                    None => {
574                        return;
575                    }
576                };
577
578                debug!(
579                    %peer_id,
580                    ?cause,
581                    %num_established,
582                    "Connection closed with peer"
583                );
584
585                if num_established == 0 {
586                    self.peer_ip_addresses.remove(&peer_id);
587                    self.connected_servers.remove(&peer_id);
588                }
589                let num_established_peer_connections = shared
590                    .num_established_peer_connections
591                    .fetch_sub(1, Ordering::SeqCst)
592                    - 1;
593
594                shared
595                    .handlers
596                    .num_established_peer_connections_change
597                    .call_simple(&num_established_peer_connections);
598
599                // No more connections
600                if num_established == 0 {
601                    shared.handlers.disconnected_peer.call_simple(&peer_id);
602                }
603
604                if let Some(metrics) = self.metrics.as_ref() {
605                    metrics.dec_established_connections()
606                };
607            }
608            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
609                if let Some(peer_id) = &peer_id {
610                    let should_ban_temporarily =
611                        self.should_temporary_ban_on_dial_error(peer_id, &error);
612
613                    trace!(%should_ban_temporarily, "Temporary bans conditions.");
614
615                    if should_ban_temporarily {
616                        self.temporary_bans.lock().create_or_extend(peer_id);
617                        debug!(%peer_id, ?error, "Peer was temporarily banned.");
618                    }
619                }
620
621                debug!(
622                    ?peer_id,
623                    ?error,
624                    "SwarmEvent::OutgoingConnectionError for peer."
625                );
626
627                match error {
628                    DialError::Transport(ref addresses) => {
629                        for (addr, _) in addresses {
630                            trace!(?error, ?peer_id, %addr, "SwarmEvent::OutgoingConnectionError (DialError::Transport) for peer.");
631                            if let Some(peer_id) = peer_id {
632                                self.known_peers_registry
633                                    .remove_known_peer_addresses(peer_id, vec![addr.clone()])
634                                    .await;
635                            }
636                        }
637                    }
638                    DialError::WrongPeerId { obtained, .. } => {
639                        trace!(?error, ?peer_id, obtained_peer_id=?obtained, "SwarmEvent::WrongPeerId (DialError::WrongPeerId) for peer.");
640
641                        if let Some(ref peer_id) = peer_id {
642                            let kademlia = &mut self.swarm.behaviour_mut().kademlia;
643                            let _ = kademlia.remove_peer(peer_id);
644                        }
645                    }
646                    _ => {
647                        trace!(?error, ?peer_id, "SwarmEvent::OutgoingConnectionError");
648                    }
649                }
650            }
651            SwarmEvent::NewExternalAddrCandidate { address } => {
652                trace!(%address, "External address candidate");
653            }
654            SwarmEvent::ExternalAddrConfirmed { address } => {
655                debug!(%address, "Confirmed external address");
656
657                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
658                self.swarm.behaviour_mut().identify.push(connected_peers);
659            }
660            SwarmEvent::ExternalAddrExpired { address } => {
661                debug!(%address, "External address expired");
662
663                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
664                self.swarm.behaviour_mut().identify.push(connected_peers);
665            }
666            other => {
667                trace!("Other swarm event: {:?}", other);
668            }
669        }
670    }
671
672    fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
673        // TODO: Replace with banning of addresses rather peer IDs if this helps
674        if true {
675            return false;
676        }
677
678        // Ban temporarily only peers without active connections.
679        if self.swarm.is_connected(peer_id) {
680            return false;
681        }
682
683        match &error {
684            DialError::Transport(addresses) => {
685                for (_, error) in addresses {
686                    match error {
687                        TransportError::MultiaddrNotSupported(_) => {
688                            return true;
689                        }
690                        TransportError::Other(_) => {
691                            // Ignore "temporary ban" errors
692                            if self.temporary_bans.lock().is_banned(peer_id) {
693                                return false;
694                            }
695                        }
696                    }
697                }
698                // Other errors that are not related to temporary bans
699                true
700            }
701            DialError::LocalPeerId { .. } => {
702                // We don't ban ourselves
703                debug!("Local peer dial attempt detected.");
704
705                false
706            }
707            DialError::NoAddresses => {
708                // Let's wait until we get addresses
709                true
710            }
711            DialError::DialPeerConditionFalse(_) => {
712                // These are local conditions, we don't need to ban remote peers
713                false
714            }
715            DialError::Aborted => {
716                // Seems like a transient event
717                false
718            }
719            DialError::WrongPeerId { .. } => {
720                // It's likely that peer was restarted with different identity
721                false
722            }
723            DialError::Denied { .. } => {
724                // We exceeded the connection limits or we hit a black listed peer
725                false
726            }
727        }
728    }
729
730    async fn handle_identify_event(&mut self, event: IdentifyEvent) {
731        let local_peer_id = *self.swarm.local_peer_id();
732
733        if let IdentifyEvent::Received {
734            peer_id, mut info, ..
735        } = event
736        {
737            debug!(?peer_id, protocols = ?info.protocols, "IdentifyEvent::Received");
738
739            // Check for network partition
740            if info.protocol_version != self.protocol_version {
741                debug!(
742                    %local_peer_id,
743                    %peer_id,
744                    local_protocol_version = %self.protocol_version,
745                    peer_protocol_version = %info.protocol_version,
746                    "Peer has different protocol version, banning temporarily",
747                );
748
749                self.temporary_bans.lock().create_or_extend(&peer_id);
750                // Forget about this peer until they upgrade
751                let _ = self.swarm.disconnect_peer_id(peer_id);
752                self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
753                self.known_peers_registry
754                    .remove_all_known_peer_addresses(peer_id);
755
756                return;
757            }
758
759            // Remove temporary ban if there was any
760            self.temporary_bans.lock().remove(&peer_id);
761
762            if info.listen_addrs.len() > 30 {
763                debug!(
764                    %local_peer_id,
765                    %peer_id,
766                    "Node has reported more than 30 addresses; it is identified by {} and {}",
767                    info.protocol_version, info.agent_version
768                );
769                info.listen_addrs.truncate(30);
770            }
771
772            let kademlia = &mut self.swarm.behaviour_mut().kademlia;
773            let full_kademlia_support = kademlia
774                .protocol_names()
775                .iter()
776                .all(|local_protocol| info.protocols.contains(local_protocol));
777
778            if full_kademlia_support {
779                let received_addresses = info
780                    .listen_addrs
781                    .into_iter()
782                    .filter(|address| {
783                        if self.allow_non_global_addresses_in_dht
784                            || is_global_address_or_dns(address)
785                        {
786                            true
787                        } else {
788                            trace!(
789                                %local_peer_id,
790                                %peer_id,
791                                %address,
792                                "Ignoring self-reported non-global address",
793                            );
794
795                            false
796                        }
797                    })
798                    .collect::<Vec<_>>();
799                let received_address_strings = received_addresses
800                    .iter()
801                    .map(ToString::to_string)
802                    .collect::<Vec<_>>();
803                let old_addresses = kademlia
804                    .kbucket(peer_id)
805                    .and_then(|peers| {
806                        let key = peer_id.into();
807                        peers.iter().find_map(|peer| {
808                            (peer.node.key == &key).then_some(
809                                peer.node
810                                    .value
811                                    .iter()
812                                    .filter(|existing_address| {
813                                        let existing_address = existing_address.to_string();
814
815                                        !received_address_strings.iter().any(|received_address| {
816                                            received_address.starts_with(&existing_address)
817                                                || existing_address.starts_with(received_address)
818                                        })
819                                    })
820                                    .cloned()
821                                    .collect::<Vec<_>>(),
822                            )
823                        })
824                    })
825                    .unwrap_or_default();
826
827                for address in received_addresses {
828                    debug!(
829                        %local_peer_id,
830                        %peer_id,
831                        %address,
832                        protocol_names = ?kademlia.protocol_names(),
833                        "Adding self-reported address to Kademlia DHT",
834                    );
835
836                    kademlia.add_address(&peer_id, address);
837                }
838
839                for old_address in old_addresses {
840                    trace!(
841                        %local_peer_id,
842                        %peer_id,
843                        %old_address,
844                        "Removing old self-reported address from Kademlia DHT",
845                    );
846
847                    kademlia.remove_address(&peer_id, &old_address);
848                }
849
850                self.connected_servers.insert(peer_id);
851            } else {
852                debug!(
853                    %local_peer_id,
854                    %peer_id,
855                    peer_protocols = ?info.protocols,
856                    protocol_names = ?kademlia.protocol_names(),
857                    "Peer doesn't support our Kademlia DHT protocol",
858                );
859
860                kademlia.remove_peer(&peer_id);
861                self.connected_servers.remove(&peer_id);
862            }
863        }
864    }
865
866    async fn handle_kademlia_event(&mut self, event: KademliaEvent) {
867        trace!("Kademlia event: {:?}", event);
868
869        match event {
870            KademliaEvent::InboundRequest {
871                request: InboundRequest::AddProvider { record, .. },
872            } => {
873                debug!("Unexpected AddProvider request received: {:?}", record);
874            }
875            KademliaEvent::UnroutablePeer { peer } => {
876                debug!(%peer, "Unroutable peer detected");
877
878                self.swarm.behaviour_mut().kademlia.remove_peer(&peer);
879
880                if let Some(shared) = self.shared_weak.upgrade() {
881                    shared
882                        .handlers
883                        .peer_discovered
884                        .call_simple(&PeerDiscovered::UnroutablePeer { peer_id: peer });
885                }
886            }
887            KademliaEvent::RoutablePeer { peer, address } => {
888                debug!(?address, "Routable peer detected: {:?}", peer);
889
890                if let Some(shared) = self.shared_weak.upgrade() {
891                    shared
892                        .handlers
893                        .peer_discovered
894                        .call_simple(&PeerDiscovered::RoutablePeer {
895                            peer_id: peer,
896                            address,
897                        });
898                }
899            }
900            KademliaEvent::PendingRoutablePeer { peer, address } => {
901                debug!(?address, "Pending routable peer detected: {:?}", peer);
902
903                if let Some(shared) = self.shared_weak.upgrade() {
904                    shared
905                        .handlers
906                        .peer_discovered
907                        .call_simple(&PeerDiscovered::RoutablePeer {
908                            peer_id: peer,
909                            address,
910                        });
911                }
912            }
913            KademliaEvent::OutboundQueryProgressed {
914                step: ProgressStep { last, .. },
915                id,
916                result: QueryResult::GetClosestPeers(result),
917                ..
918            } => {
919                let mut cancelled = false;
920                if let Some(QueryResultSender::ClosestPeers { sender, .. }) =
921                    self.query_id_receivers.get(&id)
922                {
923                    match result {
924                        Ok(GetClosestPeersOk { key, peers }) => {
925                            trace!(
926                                "Get closest peers query for {} yielded {} results",
927                                hex::encode(key),
928                                peers.len(),
929                            );
930
931                            if peers.is_empty()
932                                // Connected peers collection is not empty.
933                                && self.swarm.connected_peers().next().is_some()
934                            {
935                                debug!("Random Kademlia query has yielded empty list of peers");
936                            }
937
938                            for peer in peers {
939                                cancelled = Self::unbounded_send_and_cancel_on_error(
940                                    &mut self.swarm.behaviour_mut().kademlia,
941                                    sender,
942                                    peer.peer_id,
943                                    "GetClosestPeersOk",
944                                    &id,
945                                ) || cancelled;
946                            }
947                        }
948                        Err(GetClosestPeersError::Timeout { key, peers }) => {
949                            debug!(
950                                "Get closest peers query for {} timed out with {} results",
951                                hex::encode(key),
952                                peers.len(),
953                            );
954
955                            for peer in peers {
956                                cancelled = Self::unbounded_send_and_cancel_on_error(
957                                    &mut self.swarm.behaviour_mut().kademlia,
958                                    sender,
959                                    peer.peer_id,
960                                    "GetClosestPeersError::Timeout",
961                                    &id,
962                                ) || cancelled;
963                            }
964                        }
965                    }
966                }
967
968                if last || cancelled {
969                    // There will be no more progress
970                    self.query_id_receivers.remove(&id);
971                }
972            }
973            KademliaEvent::OutboundQueryProgressed {
974                step: ProgressStep { last, .. },
975                id,
976                result: QueryResult::GetRecord(result),
977                ..
978            } => {
979                let mut cancelled = false;
980                if let Some(QueryResultSender::Value { sender, .. }) =
981                    self.query_id_receivers.get(&id)
982                {
983                    match result {
984                        Ok(GetRecordOk::FoundRecord(rec)) => {
985                            trace!(
986                                key = hex::encode(&rec.record.key),
987                                "Get record query succeeded",
988                            );
989
990                            cancelled = Self::unbounded_send_and_cancel_on_error(
991                                &mut self.swarm.behaviour_mut().kademlia,
992                                sender,
993                                rec,
994                                "GetRecordOk",
995                                &id,
996                            ) || cancelled;
997                        }
998                        Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
999                            trace!("Get record query yielded no results");
1000                        }
1001                        Err(error) => match error {
1002                            GetRecordError::NotFound { key, .. } => {
1003                                debug!(
1004                                    key = hex::encode(&key),
1005                                    "Get record query failed with no results",
1006                                );
1007                            }
1008                            GetRecordError::QuorumFailed { key, records, .. } => {
1009                                debug!(
1010                                    key = hex::encode(&key),
1011                                    "Get record query quorum failed with {} results",
1012                                    records.len(),
1013                                );
1014                            }
1015                            GetRecordError::Timeout { key } => {
1016                                debug!(key = hex::encode(&key), "Get record query timed out");
1017                            }
1018                        },
1019                    }
1020                }
1021
1022                if last || cancelled {
1023                    // There will be no more progress
1024                    self.query_id_receivers.remove(&id);
1025                }
1026            }
1027            KademliaEvent::OutboundQueryProgressed {
1028                step: ProgressStep { last, .. },
1029                id,
1030                result: QueryResult::GetProviders(result),
1031                ..
1032            } => {
1033                let mut cancelled = false;
1034                if let Some(QueryResultSender::Providers { key, sender, .. }) =
1035                    self.query_id_receivers.get(&id)
1036                {
1037                    match result {
1038                        Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1039                            trace!(
1040                                key = hex::encode(&key),
1041                                "Get providers query yielded {} results",
1042                                providers.len(),
1043                            );
1044
1045                            for provider in providers {
1046                                cancelled = Self::unbounded_send_and_cancel_on_error(
1047                                    &mut self.swarm.behaviour_mut().kademlia,
1048                                    sender,
1049                                    provider,
1050                                    "GetProvidersOk",
1051                                    &id,
1052                                ) || cancelled;
1053                            }
1054                        }
1055                        Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
1056                            trace!(
1057                                key = hex::encode(key),
1058                                closest_peers = %closest_peers.len(),
1059                                "Get providers query yielded no results"
1060                            );
1061                        }
1062                        Err(error) => {
1063                            let GetProvidersError::Timeout { key, .. } = error;
1064
1065                            debug!(
1066                                key = hex::encode(&key),
1067                                "Get providers query failed with no results",
1068                            );
1069                        }
1070                    }
1071                }
1072
1073                if last || cancelled {
1074                    // There will be no more progress
1075                    self.query_id_receivers.remove(&id);
1076                }
1077            }
1078            KademliaEvent::OutboundQueryProgressed {
1079                step: ProgressStep { last, .. },
1080                id,
1081                result: QueryResult::PutRecord(result),
1082                ..
1083            } => {
1084                let mut cancelled = false;
1085                if let Some(QueryResultSender::PutValue { sender, .. }) =
1086                    self.query_id_receivers.get(&id)
1087                {
1088                    match result {
1089                        Ok(PutRecordOk { key, .. }) => {
1090                            trace!("Put record query for {} succeeded", hex::encode(&key));
1091
1092                            cancelled = Self::unbounded_send_and_cancel_on_error(
1093                                &mut self.swarm.behaviour_mut().kademlia,
1094                                sender,
1095                                (),
1096                                "PutRecordOk",
1097                                &id,
1098                            ) || cancelled;
1099                        }
1100                        Err(error) => {
1101                            debug!(?error, "Put record query failed.");
1102                        }
1103                    }
1104                }
1105
1106                if last || cancelled {
1107                    // There will be no more progress
1108                    self.query_id_receivers.remove(&id);
1109                }
1110            }
1111            KademliaEvent::OutboundQueryProgressed {
1112                step: ProgressStep { last, count },
1113                id,
1114                result: QueryResult::Bootstrap(result),
1115                stats,
1116            } => {
1117                debug!(?stats, %last, %count, ?id, ?result, "Bootstrap OutboundQueryProgressed step.");
1118
1119                let mut cancelled = false;
1120                if let Some(QueryResultSender::Bootstrap { sender }) =
1121                    self.query_id_receivers.get_mut(&id)
1122                {
1123                    match result {
1124                        Ok(BootstrapOk {
1125                            peer,
1126                            num_remaining,
1127                        }) => {
1128                            trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded");
1129
1130                            cancelled = Self::unbounded_send_and_cancel_on_error(
1131                                &mut self.swarm.behaviour_mut().kademlia,
1132                                sender,
1133                                (),
1134                                "Bootstrap",
1135                                &id,
1136                            ) || cancelled;
1137                        }
1138                        Err(error) => {
1139                            debug!(?error, "Bootstrap query failed.");
1140                        }
1141                    }
1142                }
1143
1144                if last || cancelled {
1145                    // There will be no more progress
1146                    self.query_id_receivers.remove(&id);
1147                }
1148            }
1149            _ => {}
1150        }
1151    }
1152
1153    // Returns `true` if query was cancelled
1154    fn unbounded_send_and_cancel_on_error<T>(
1155        kademlia: &mut Kademlia<DummyRecordStore>,
1156        sender: &mpsc::UnboundedSender<T>,
1157        value: T,
1158        channel: &'static str,
1159        id: &QueryId,
1160    ) -> bool {
1161        if sender.unbounded_send(value).is_err() {
1162            debug!("{} channel was dropped", channel);
1163
1164            // Cancel query
1165            if let Some(mut query) = kademlia.query_mut(id) {
1166                query.finish();
1167            }
1168            true
1169        } else {
1170            false
1171        }
1172    }
1173
1174    async fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
1175        if let GossipsubEvent::Message { message, .. } = event {
1176            if let Some(senders) = self.topic_subscription_senders.get(&message.topic) {
1177                let bytes = Bytes::from(message.data);
1178
1179                for sender in senders.values() {
1180                    // Doesn't matter if receiver is still listening for messages or not.
1181                    let _ = sender.unbounded_send(bytes.clone());
1182                }
1183            }
1184        }
1185    }
1186
1187    async fn handle_request_response_event(&mut self, event: RequestResponseEvent) {
1188        // No actions on statistics events.
1189        trace!("Request response event: {:?}", event);
1190    }
1191
1192    async fn handle_autonat_event(&mut self, event: AutonatEvent) {
1193        trace!(?event, "Autonat event received.");
1194        let autonat = &self.swarm.behaviour().autonat;
1195        debug!(
1196            public_address=?autonat.public_address(),
1197            confidence=%autonat.confidence(),
1198            "Current public address confidence."
1199        );
1200
1201        match event {
1202            AutonatEvent::InboundProbe(_inbound_probe_event) => {
1203                // We do not care about this event
1204            }
1205            AutonatEvent::OutboundProbe(outbound_probe_event) => {
1206                match outbound_probe_event {
1207                    OutboundProbeEvent::Request { peer, .. } => {
1208                        // For outbound probe request add peer to allow list to ensure they can dial us back and not hit
1209                        // global incoming connection limit
1210                        self.swarm
1211                            .behaviour_mut()
1212                            .connection_limits
1213                            // We expect a single successful dial from this peer
1214                            .add_to_incoming_allow_list(
1215                                peer,
1216                                self.peer_ip_addresses
1217                                    .get(&peer)
1218                                    .iter()
1219                                    .flat_map(|ip_addresses| ip_addresses.iter())
1220                                    .copied(),
1221                                1,
1222                            );
1223                    }
1224                    OutboundProbeEvent::Response { peer, .. } => {
1225                        self.swarm
1226                            .behaviour_mut()
1227                            .connection_limits
1228                            .remove_from_incoming_allow_list(&peer, Some(1));
1229                    }
1230                    OutboundProbeEvent::Error { peer, .. } => {
1231                        if let Some(peer) = peer {
1232                            self.swarm
1233                                .behaviour_mut()
1234                                .connection_limits
1235                                .remove_from_incoming_allow_list(&peer, Some(1));
1236                        }
1237                    }
1238                }
1239            }
1240            AutonatEvent::StatusChanged { old, new } => {
1241                debug!(?old, ?new, "Public address status changed.");
1242
1243                // TODO: Remove block once https://github.com/libp2p/rust-libp2p/issues/4863 is resolved
1244                if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new.clone()) {
1245                    self.swarm.remove_external_address(&old_address);
1246                    debug!(
1247                        ?old_address,
1248                        new_status = ?new,
1249                        "Removing old external address...",
1250                    );
1251
1252                    // Trigger potential mode change manually
1253                    self.swarm.behaviour_mut().kademlia.set_mode(None);
1254                }
1255
1256                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
1257                self.swarm.behaviour_mut().identify.push(connected_peers);
1258            }
1259        }
1260    }
1261
1262    fn handle_command(&mut self, command: Command) {
1263        match command {
1264            Command::GetValue {
1265                key,
1266                result_sender,
1267                permit,
1268            } => {
1269                let query_id = self
1270                    .swarm
1271                    .behaviour_mut()
1272                    .kademlia
1273                    .get_record(key.to_bytes().into());
1274
1275                self.query_id_receivers.insert(
1276                    query_id,
1277                    QueryResultSender::Value {
1278                        sender: result_sender,
1279                        _permit: permit,
1280                    },
1281                );
1282            }
1283            Command::PutValue {
1284                key,
1285                value,
1286                result_sender,
1287                permit,
1288            } => {
1289                let local_peer_id = *self.swarm.local_peer_id();
1290
1291                let record = Record {
1292                    key: key.into(),
1293                    value,
1294                    publisher: Some(local_peer_id),
1295                    expires: None, // No time expiration.
1296                };
1297                let query_result = self
1298                    .swarm
1299                    .behaviour_mut()
1300                    .kademlia
1301                    .put_record(record, Quorum::One);
1302
1303                match query_result {
1304                    Ok(query_id) => {
1305                        self.query_id_receivers.insert(
1306                            query_id,
1307                            QueryResultSender::PutValue {
1308                                sender: result_sender,
1309                                _permit: permit,
1310                            },
1311                        );
1312                    }
1313                    Err(err) => {
1314                        warn!(?err, "Failed to put value.");
1315                    }
1316                }
1317            }
1318            Command::Subscribe {
1319                topic,
1320                result_sender,
1321            } => {
1322                if !self.swarm.behaviour().gossipsub.is_enabled() {
1323                    panic!("Gossipsub protocol is disabled.");
1324                }
1325
1326                let topic_hash = topic.hash();
1327                let (sender, receiver) = mpsc::unbounded();
1328
1329                // Unconditionally create subscription ID, code is simpler this way.
1330                let subscription_id = self.next_subscription_id;
1331                self.next_subscription_id += 1;
1332
1333                let created_subscription = CreatedSubscription {
1334                    subscription_id,
1335                    receiver,
1336                };
1337
1338                match self.topic_subscription_senders.entry(topic_hash) {
1339                    Entry::Occupied(mut entry) => {
1340                        // In case subscription already exists, just add one more sender to it.
1341                        if result_sender.send(Ok(created_subscription)).is_ok() {
1342                            entry.get_mut().insert(subscription_id, sender);
1343                        }
1344                    }
1345                    Entry::Vacant(entry) => {
1346                        // Otherwise subscription needs to be created.
1347
1348                        if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1349                            match gossipsub.subscribe(&topic) {
1350                                Ok(true) => {
1351                                    if result_sender.send(Ok(created_subscription)).is_ok() {
1352                                        entry
1353                                            .insert(IntMap::from_iter([(subscription_id, sender)]));
1354                                    }
1355                                }
1356                                Ok(false) => {
1357                                    panic!(
1358                                        "Logic error, topic subscription wasn't created, this \
1359                                        must never happen"
1360                                    );
1361                                }
1362                                Err(error) => {
1363                                    let _ = result_sender.send(Err(error));
1364                                }
1365                            }
1366                        }
1367                    }
1368                }
1369            }
1370            Command::Unsubscribe {
1371                topic,
1372                subscription_id,
1373            } => {
1374                if !self.swarm.behaviour().gossipsub.is_enabled() {
1375                    panic!("Gossipsub protocol is disabled.");
1376                }
1377
1378                if let Entry::Occupied(mut entry) =
1379                    self.topic_subscription_senders.entry(topic.hash())
1380                {
1381                    entry.get_mut().remove(&subscription_id);
1382
1383                    // If last sender was removed - unsubscribe.
1384                    if entry.get().is_empty() {
1385                        entry.remove_entry();
1386
1387                        if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1388                            if !gossipsub.unsubscribe(&topic) {
1389                                warn!(
1390                                    "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1391                                    this is a logic error in the subspace or swarm libraries"
1392                                );
1393                            }
1394                        }
1395                    }
1396                } else {
1397                    error!(
1398                        "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1399                        this is a logic error in the subspace library"
1400                    );
1401                }
1402            }
1403            Command::Publish {
1404                topic,
1405                message,
1406                result_sender,
1407            } => {
1408                if !self.swarm.behaviour().gossipsub.is_enabled() {
1409                    panic!("Gossipsub protocol is disabled.");
1410                }
1411
1412                if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1413                    // Doesn't matter if receiver still waits for response.
1414                    let _ =
1415                        result_sender.send(gossipsub.publish(topic, message).map(|_message_id| ()));
1416                }
1417            }
1418            Command::GetClosestPeers {
1419                key,
1420                result_sender,
1421                permit,
1422            } => {
1423                let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(key);
1424
1425                self.query_id_receivers.insert(
1426                    query_id,
1427                    QueryResultSender::ClosestPeers {
1428                        sender: result_sender,
1429                        _permit: permit,
1430                    },
1431                );
1432            }
1433            Command::GetClosestLocalPeers {
1434                key,
1435                source,
1436                result_sender,
1437            } => {
1438                let source = source.unwrap_or_else(|| *self.swarm.local_peer_id());
1439                let result = self
1440                    .swarm
1441                    .behaviour_mut()
1442                    .kademlia
1443                    .find_closest_local_peers(&KBucketKey::from(key), &source)
1444                    .filter(|peer| !peer.multiaddrs.is_empty())
1445                    .map(|peer| (peer.node_id, peer.multiaddrs))
1446                    .collect();
1447
1448                // Doesn't matter if receiver still waits for response.
1449                let _ = result_sender.send(result);
1450            }
1451            Command::GenericRequest {
1452                peer_id,
1453                addresses,
1454                protocol_name,
1455                request,
1456                result_sender,
1457            } => {
1458                self.swarm.behaviour_mut().request_response.send_request(
1459                    &peer_id,
1460                    protocol_name,
1461                    request,
1462                    result_sender,
1463                    IfDisconnected::TryConnect,
1464                    addresses,
1465                );
1466            }
1467            Command::GetProviders {
1468                key,
1469                result_sender,
1470                permit,
1471            } => {
1472                let query_id = self
1473                    .swarm
1474                    .behaviour_mut()
1475                    .kademlia
1476                    .get_providers(key.clone());
1477
1478                self.query_id_receivers.insert(
1479                    query_id,
1480                    QueryResultSender::Providers {
1481                        key,
1482                        sender: result_sender,
1483                        _permit: permit,
1484                    },
1485                );
1486            }
1487            Command::BanPeer { peer_id } => {
1488                self.ban_peer(peer_id);
1489            }
1490            Command::Dial { address } => {
1491                let _ = self.swarm.dial(address);
1492            }
1493            Command::ConnectedPeers { result_sender } => {
1494                let connected_peers = self.swarm.connected_peers().cloned().collect();
1495
1496                let _ = result_sender.send(connected_peers);
1497            }
1498            Command::ConnectedServers { result_sender } => {
1499                let connected_servers = self.connected_servers.iter().cloned().collect();
1500
1501                let _ = result_sender.send(connected_servers);
1502            }
1503            Command::Bootstrap { result_sender } => {
1504                let kademlia = &mut self.swarm.behaviour_mut().kademlia;
1505
1506                match kademlia.bootstrap() {
1507                    Ok(query_id) => {
1508                        if let Some(result_sender) = result_sender {
1509                            self.query_id_receivers.insert(
1510                                query_id,
1511                                QueryResultSender::Bootstrap {
1512                                    sender: result_sender,
1513                                },
1514                            );
1515                        }
1516                    }
1517                    Err(err) => {
1518                        debug!(?err, "Bootstrap error.");
1519                    }
1520                }
1521            }
1522        }
1523    }
1524
1525    fn ban_peer(&mut self, peer_id: PeerId) {
1526        // Remove temporary ban if there is any before creating a permanent one
1527        self.temporary_bans.lock().remove(&peer_id);
1528
1529        debug!(?peer_id, "Banning peer on network level");
1530
1531        self.swarm.behaviour_mut().block_list.block_peer(peer_id);
1532        self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
1533        self.known_peers_registry
1534            .remove_all_known_peer_addresses(peer_id);
1535    }
1536
1537    fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
1538        if let Some(ref mut metrics) = self.libp2p_metrics {
1539            match swarm_event {
1540                SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
1541                    metrics.record(ping_event);
1542                }
1543                SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
1544                    metrics.record(identify_event);
1545                }
1546                SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
1547                    metrics.record(kademlia_event);
1548                }
1549                SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
1550                    metrics.record(gossipsub_event);
1551                }
1552                // TODO: implement in the upstream repository
1553                // SwarmEvent::Behaviour(Event::RequestResponse(request_response_event)) => {
1554                //     self.metrics.record(request_response_event);
1555                // }
1556                swarm_event => {
1557                    metrics.record(swarm_event);
1558                }
1559            }
1560        }
1561    }
1562
1563    fn log_kademlia_stats(&mut self) {
1564        let mut peer_counter = 0;
1565        let mut peer_with_no_address_counter = 0;
1566        for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1567            for entry in kbucket.iter() {
1568                peer_counter += 1;
1569                if entry.node.value.len() == 0 {
1570                    peer_with_no_address_counter += 1;
1571                }
1572            }
1573        }
1574
1575        debug!(
1576            peers = %peer_counter,
1577            peers_with_no_address = %peer_with_no_address_counter,
1578            "Kademlia stats"
1579        );
1580    }
1581}