1use crate::behavior::persistent_parameters::{
4 KnownPeersRegistry, PeerAddressRemovedEvent, append_p2p_suffix, remove_p2p_suffix,
5};
6use crate::behavior::{Behavior, Event};
7use crate::constructor::DummyRecordStore;
8use crate::constructor::temporary_bans::TemporaryBans;
9use crate::protocols::request_response::request_response_factory::{
10 Event as RequestResponseEvent, IfDisconnected,
11};
12use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
13use crate::utils::{SubspaceMetrics, is_global_address_or_dns, strip_peer_id};
14use async_lock::Mutex as AsyncMutex;
15use bytes::Bytes;
16use event_listener_primitives::HandlerId;
17use futures::channel::mpsc;
18use futures::future::Fuse;
19use futures::{FutureExt, StreamExt};
20use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
21use libp2p::core::ConnectedPoint;
22use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
23use libp2p::identify::Event as IdentifyEvent;
24use libp2p::kad::{
25 Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
26 GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
27 InboundRequest, KBucketKey, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult,
28 Quorum, Record, RecordKey,
29};
30use libp2p::metrics::{Metrics, Recorder};
31use libp2p::multiaddr::Protocol;
32use libp2p::swarm::dial_opts::DialOpts;
33use libp2p::swarm::{DialError, SwarmEvent};
34use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
35use nohash_hasher::IntMap;
36use parking_lot::Mutex;
37use std::collections::hash_map::Entry;
38use std::collections::{HashMap, HashSet};
39use std::net::IpAddr;
40use std::pin::Pin;
41use std::sync::atomic::Ordering;
42use std::sync::{Arc, Weak};
43use std::time::{Duration, Instant};
44use std::{fmt, slice};
45use tokio::sync::OwnedSemaphorePermit;
46use tokio::task::yield_now;
47use tokio::time::Sleep;
48use tracing::{debug, error, info, trace, warn};
49
50const MAX_RANDOM_QUERY_INTERVAL: Duration = Duration::from_secs(60);
52
53const PERIODICAL_TASKS_INTERVAL: Duration = Duration::from_secs(5);
55
56const PEER_INFO_LOG_INTERVAL: Duration = Duration::from_secs(300);
58
59const MAX_LISTEN_ADDRESSES: usize = 30;
61
62enum QueryResultSender {
63 Value {
64 sender: mpsc::UnboundedSender<PeerRecord>,
65 _permit: OwnedSemaphorePermit,
67 },
68 ClosestPeers {
69 sender: mpsc::UnboundedSender<PeerId>,
70 _permit: Option<OwnedSemaphorePermit>,
72 },
73 Providers {
74 key: RecordKey,
75 sender: mpsc::UnboundedSender<PeerId>,
76 _permit: Option<OwnedSemaphorePermit>,
78 },
79 PutValue {
80 sender: mpsc::UnboundedSender<()>,
81 _permit: OwnedSemaphorePermit,
83 },
84 Bootstrap {
85 sender: mpsc::UnboundedSender<()>,
86 },
87}
88
89#[derive(Debug, Default)]
90enum BootstrapCommandState {
91 #[default]
92 NotStarted,
93 InProgress(mpsc::UnboundedReceiver<()>),
94 Finished,
95}
96
97#[must_use = "Node does not function properly unless its runner is driven forward"]
99pub struct NodeRunner {
100 allow_non_global_addresses_in_dht: bool,
102 is_listening: bool,
104 command_receiver: mpsc::Receiver<Command>,
105 swarm: Swarm<Behavior>,
106 shared_weak: Weak<Shared>,
107 next_random_query_interval: Duration,
109 query_id_receivers: HashMap<QueryId, QueryResultSender>,
110 next_subscription_id: usize,
113 topic_subscription_senders: HashMap<TopicHash, IntMap<usize, mpsc::UnboundedSender<Bytes>>>,
116 random_query_timeout: Pin<Box<Fuse<Sleep>>>,
117 periodical_tasks_interval: Pin<Box<Fuse<Sleep>>>,
119 last_peer_stats_info_log: Instant,
121 known_peers_registry: Box<dyn KnownPeersRegistry>,
123 connected_servers: HashSet<PeerId>,
124 reserved_peers: HashMap<PeerId, Multiaddr>,
126 temporary_bans: Arc<Mutex<TemporaryBans>>,
128 libp2p_metrics: Option<Metrics>,
130 metrics: Option<SubspaceMetrics>,
132 peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
134 protocol_version: String,
136 bootstrap_addresses: Vec<Multiaddr>,
138 bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
140 removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
142 _address_removal_task_handler_id: Option<HandlerId>,
145}
146
147impl fmt::Debug for NodeRunner {
148 #[inline]
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("NodeRunner").finish_non_exhaustive()
151 }
152}
153
154pub(crate) struct NodeRunnerConfig {
156 pub(crate) allow_non_global_addresses_in_dht: bool,
157 pub(crate) is_listening: bool,
159 pub(crate) command_receiver: mpsc::Receiver<Command>,
160 pub(crate) swarm: Swarm<Behavior>,
161 pub(crate) shared_weak: Weak<Shared>,
162 pub(crate) next_random_query_interval: Duration,
163 pub(crate) known_peers_registry: Box<dyn KnownPeersRegistry>,
164 pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
165 pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
166 pub(crate) libp2p_metrics: Option<Metrics>,
167 pub(crate) metrics: Option<SubspaceMetrics>,
168 pub(crate) protocol_version: String,
169 pub(crate) bootstrap_addresses: Vec<Multiaddr>,
170}
171
172impl NodeRunner {
173 pub(crate) fn new(
174 NodeRunnerConfig {
175 allow_non_global_addresses_in_dht,
176 is_listening,
177 command_receiver,
178 swarm,
179 shared_weak,
180 next_random_query_interval,
181 mut known_peers_registry,
182 reserved_peers,
183 temporary_bans,
184 libp2p_metrics,
185 metrics,
186 protocol_version,
187 bootstrap_addresses,
188 }: NodeRunnerConfig,
189 ) -> Self {
190 let (removed_addresses_tx, removed_addresses_rx) = mpsc::unbounded();
192 let mut address_removal_task_handler_id = None;
193 if let Some(handler_id) = known_peers_registry.on_unreachable_address({
194 Arc::new(move |event| {
195 if let Err(error) = removed_addresses_tx.unbounded_send(event.clone()) {
196 debug!(?error, ?event, "Cannot send PeerAddressRemovedEvent")
197 };
198 })
199 }) {
200 address_removal_task_handler_id.replace(handler_id);
201 }
202
203 Self {
204 allow_non_global_addresses_in_dht,
205 is_listening,
206 command_receiver,
207 swarm,
208 shared_weak,
209 next_random_query_interval,
210 query_id_receivers: HashMap::default(),
211 next_subscription_id: 0,
212 topic_subscription_senders: HashMap::default(),
213 random_query_timeout: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
215 periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
217 last_peer_stats_info_log: Instant::now(),
219 known_peers_registry,
220 connected_servers: HashSet::new(),
221 reserved_peers,
222 temporary_bans,
223 libp2p_metrics,
224 metrics,
225 peer_ip_addresses: HashMap::new(),
226 protocol_version,
227 bootstrap_addresses,
228 bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
229 removed_addresses_rx,
230 _address_removal_task_handler_id: address_removal_task_handler_id,
231 }
232 }
233
234 pub async fn run(&mut self) {
236 if self.is_listening {
237 loop {
240 if self.swarm.listeners().next().is_some() {
241 break;
242 }
243
244 if let Some(swarm_event) = self.swarm.next().await {
245 self.register_event_metrics(&swarm_event);
246 self.handle_swarm_event(swarm_event).await;
247 } else {
248 break;
249 }
250 }
251 }
252
253 self.bootstrap().await;
254
255 loop {
256 futures::select! {
257 _ = &mut self.random_query_timeout => {
258 self.handle_random_query_interval();
259 self.random_query_timeout =
261 Box::pin(tokio::time::sleep(self.next_random_query_interval).fuse());
262 self.next_random_query_interval =
263 (self.next_random_query_interval * 2).min(MAX_RANDOM_QUERY_INTERVAL);
264 },
265 swarm_event = self.swarm.next() => {
266 if let Some(swarm_event) = swarm_event {
267 self.register_event_metrics(&swarm_event);
268 self.handle_swarm_event(swarm_event).await;
269 } else {
270 break;
271 }
272 },
273 command = self.command_receiver.next() => {
274 if let Some(command) = command {
275 self.handle_command(command);
276 } else {
277 break;
278 }
279 },
280 _ = self.known_peers_registry.run().fuse() => {
281 trace!("Network parameters registry runner exited.")
282 },
283 _ = &mut self.periodical_tasks_interval => {
284 self.handle_periodical_tasks().await;
285 self.periodical_tasks_interval =
286 Box::pin(tokio::time::sleep(PERIODICAL_TASKS_INTERVAL).fuse());
287 },
288 event = self.removed_addresses_rx.select_next_some() => {
289 self.handle_removed_address_event(event);
290 },
291 }
292
293 yield_now().await;
295 }
296 }
297
298 async fn bootstrap(&mut self) {
300 for (peer_id, address) in strip_peer_id(self.bootstrap_addresses.clone()) {
302 self.swarm
303 .behaviour_mut()
304 .kademlia
305 .add_address(&peer_id, address);
306 }
307
308 let known_peers = self.known_peers_registry.all_known_peers().await;
309
310 if !known_peers.is_empty() {
311 for (peer_id, addresses) in known_peers {
312 for address in addresses.clone() {
313 let address = match address.with_p2p(peer_id) {
314 Ok(address) => address,
315 Err(address) => {
316 warn!(%peer_id, %address, "Failed to add peer ID to known peer address");
317 break;
318 }
319 };
320 self.swarm
321 .behaviour_mut()
322 .kademlia
323 .add_address(&peer_id, address);
324 }
325
326 if let Err(error) = self
327 .swarm
328 .dial(DialOpts::peer_id(peer_id).addresses(addresses).build())
329 {
330 warn!(%peer_id, %error, "Failed to dial peer during bootstrapping");
331 }
332 }
333
334 self.handle_command(Command::Bootstrap {
336 result_sender: None,
337 });
338 return;
339 }
340
341 let bootstrap_command_state = self.bootstrap_command_state.clone();
342 let mut bootstrap_command_state = bootstrap_command_state.lock().await;
343 let bootstrap_command_receiver = match &mut *bootstrap_command_state {
344 BootstrapCommandState::NotStarted => {
345 debug!("Bootstrap started.");
346
347 let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
348
349 self.handle_command(Command::Bootstrap {
350 result_sender: Some(bootstrap_command_sender),
351 });
352
353 *bootstrap_command_state =
354 BootstrapCommandState::InProgress(bootstrap_command_receiver);
355 match &mut *bootstrap_command_state {
356 BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
357 bootstrap_command_receiver
358 }
359 _ => {
360 unreachable!("Was just set to that exact value");
361 }
362 }
363 }
364 BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
365 bootstrap_command_receiver
366 }
367 BootstrapCommandState::Finished => {
368 return;
369 }
370 };
371
372 let mut bootstrap_step = 0;
373 loop {
374 futures::select! {
375 swarm_event = self.swarm.next() => {
376 if let Some(swarm_event) = swarm_event {
377 self.register_event_metrics(&swarm_event);
378 self.handle_swarm_event(swarm_event).await;
379 } else {
380 break;
381 }
382 },
383 result = bootstrap_command_receiver.next() => {
384 if result.is_some() {
385 debug!(%bootstrap_step, "Kademlia bootstrapping...");
386 bootstrap_step += 1;
387 } else {
388 break;
389 }
390 }
391 }
392 }
393
394 debug!("Bootstrap finished.");
395 *bootstrap_command_state = BootstrapCommandState::Finished;
396 }
397
398 async fn handle_periodical_tasks(&mut self) {
400 let network_info = self.swarm.network_info();
402 let connections = network_info.connection_counters();
403
404 debug!(?connections, "Current DSN connections and limits.");
405
406 let mut external_addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
408
409 if let Some(shared) = self.shared_weak.upgrade() {
410 debug!(?external_addresses, "Renew DSN external addresses.");
411 let mut addresses = shared.external_addresses.lock();
412 addresses.clear();
413 addresses.append(&mut external_addresses);
414 }
415
416 self.log_kademlia_stats();
417 }
418
419 fn handle_random_query_interval(&mut self) {
420 let random_peer_id = PeerId::random();
421
422 trace!("Starting random Kademlia query for {}", random_peer_id);
423
424 self.swarm
425 .behaviour_mut()
426 .kademlia
427 .get_closest_peers(random_peer_id);
428 }
429
430 fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) {
431 trace!(?event, "Peer address removed event.");
432
433 let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone())
434 .into_iter()
435 .map(|(peer_id, _)| peer_id)
436 .collect::<Vec<_>>();
437
438 if bootstrap_node_ids.contains(&event.peer_id) {
439 debug!(
440 ?event,
441 ?bootstrap_node_ids,
442 "Skipped removing bootstrap node from Kademlia buckets."
443 );
444
445 return;
446 }
447
448 self.swarm.behaviour_mut().kademlia.remove_address(
450 &event.peer_id,
451 &append_p2p_suffix(event.peer_id, event.address.clone()),
452 );
453
454 self.swarm
455 .behaviour_mut()
456 .kademlia
457 .remove_address(&event.peer_id, &remove_p2p_suffix(event.address));
458 }
459
460 fn handle_remove_listeners(&mut self, removed_listeners: &[Multiaddr]) {
461 let shared = match self.shared_weak.upgrade() {
462 Some(shared) => shared,
463 None => {
464 return;
465 }
466 };
467
468 let peer_id = shared.id;
470 shared.listeners.lock().retain(|old_listener| {
471 !removed_listeners.contains(&append_p2p_suffix(peer_id, old_listener.clone()))
472 && !removed_listeners.contains(&remove_p2p_suffix(old_listener.clone()))
473 });
474 }
475
476 async fn handle_swarm_event(&mut self, swarm_event: SwarmEvent<Event>) {
477 match swarm_event {
478 SwarmEvent::Behaviour(Event::Identify(event)) => {
479 self.handle_identify_event(event).await;
480 }
481 SwarmEvent::Behaviour(Event::Kademlia(event)) => {
482 self.handle_kademlia_event(event).await;
483 }
484 SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
485 self.handle_gossipsub_event(event).await;
486 }
487 SwarmEvent::Behaviour(Event::RequestResponse(event)) => {
488 self.handle_request_response_event(event).await;
489 }
490 SwarmEvent::Behaviour(Event::Autonat(event)) => {
491 self.handle_autonat_event(event).await;
492 }
493 ref event @ SwarmEvent::NewListenAddr { ref address, .. } => {
494 trace!(?event, "New local listener event.");
495
496 let shared = match self.shared_weak.upgrade() {
497 Some(shared) => shared,
498 None => {
499 return;
500 }
501 };
502 shared.listeners.lock().push(address.clone());
503 shared.handlers.new_listener.call_simple(address);
504 }
505 ref event @ SwarmEvent::ListenerClosed { ref addresses, .. } => {
506 trace!(?event, "Local listener closed event.");
507 self.handle_remove_listeners(addresses);
508 }
509 ref event @ SwarmEvent::ExpiredListenAddr { ref address, .. } => {
510 trace!(?event, "Local listener expired event.");
511 self.handle_remove_listeners(slice::from_ref(address));
512 }
513 SwarmEvent::ConnectionEstablished {
514 peer_id,
515 endpoint,
516 num_established,
517 ..
518 } => {
519 if let ConnectedPoint::Dialer { address, .. } = &endpoint {
521 if self.allow_non_global_addresses_in_dht || is_global_address_or_dns(address) {
523 self.known_peers_registry
524 .add_known_peer(peer_id, vec![address.clone()])
525 .await;
526 }
527 };
528
529 let shared = match self.shared_weak.upgrade() {
530 Some(shared) => shared,
531 None => {
532 return;
533 }
534 };
535
536 let is_reserved_peer = self.reserved_peers.contains_key(&peer_id);
537 debug!(
538 %peer_id,
539 %is_reserved_peer,
540 ?endpoint,
541 %num_established,
542 "Connection established"
543 );
544
545 let maybe_remote_ip =
546 endpoint
547 .get_remote_address()
548 .iter()
549 .find_map(|protocol| match protocol {
550 Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
551 Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
552 _ => None,
553 });
554 if let Some(ip) = maybe_remote_ip {
555 self.peer_ip_addresses
556 .entry(peer_id)
557 .and_modify(|ips| {
558 ips.insert(ip);
559 })
560 .or_insert(HashSet::from([ip]));
561 }
562
563 let num_established_peer_connections = shared
564 .num_established_peer_connections
565 .fetch_add(1, Ordering::SeqCst)
566 + 1;
567
568 shared
569 .handlers
570 .num_established_peer_connections_change
571 .call_simple(&num_established_peer_connections);
572
573 if num_established.get() == 1 {
575 shared.handlers.connected_peer.call_simple(&peer_id);
576 }
577
578 if let Some(metrics) = self.metrics.as_ref() {
579 metrics.inc_established_connections()
580 }
581 }
582 SwarmEvent::ConnectionClosed {
583 peer_id,
584 num_established,
585 cause,
586 ..
587 } => {
588 let shared = match self.shared_weak.upgrade() {
589 Some(shared) => shared,
590 None => {
591 return;
592 }
593 };
594
595 debug!(
596 %peer_id,
597 ?cause,
598 %num_established,
599 "Connection closed with peer"
600 );
601
602 if num_established == 0 {
603 self.peer_ip_addresses.remove(&peer_id);
604 self.connected_servers.remove(&peer_id);
605 }
606 let num_established_peer_connections = shared
607 .num_established_peer_connections
608 .fetch_sub(1, Ordering::SeqCst)
609 - 1;
610
611 shared
612 .handlers
613 .num_established_peer_connections_change
614 .call_simple(&num_established_peer_connections);
615
616 if num_established == 0 {
618 shared.handlers.disconnected_peer.call_simple(&peer_id);
619 }
620
621 if let Some(metrics) = self.metrics.as_ref() {
622 metrics.dec_established_connections()
623 };
624 }
625 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
626 if let Some(peer_id) = &peer_id {
627 let should_ban_temporarily =
628 self.should_temporary_ban_on_dial_error(peer_id, &error);
629
630 trace!(%should_ban_temporarily, "Temporary bans conditions.");
631
632 if should_ban_temporarily {
633 self.temporary_bans.lock().create_or_extend(peer_id);
634 debug!(%peer_id, ?error, "Peer was temporarily banned.");
635 }
636 }
637
638 debug!(
639 ?peer_id,
640 ?error,
641 "SwarmEvent::OutgoingConnectionError for peer."
642 );
643
644 match error {
645 DialError::Transport(ref addresses) => {
646 for (addr, _) in addresses {
647 trace!(?error, ?peer_id, %addr, "SwarmEvent::OutgoingConnectionError (DialError::Transport) for peer.");
648 if let Some(peer_id) = peer_id {
649 self.known_peers_registry
650 .remove_known_peer_addresses(peer_id, vec![addr.clone()])
651 .await;
652 }
653 }
654 }
655 DialError::WrongPeerId { obtained, .. } => {
656 trace!(?error, ?peer_id, obtained_peer_id=?obtained, "SwarmEvent::WrongPeerId (DialError::WrongPeerId) for peer.");
657
658 if let Some(ref peer_id) = peer_id {
659 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
660 let _ = kademlia.remove_peer(peer_id);
661 }
662 }
663 _ => {
664 trace!(?error, ?peer_id, "SwarmEvent::OutgoingConnectionError");
665 }
666 }
667 }
668 SwarmEvent::NewExternalAddrCandidate { address } => {
669 trace!(%address, "External address candidate");
670 }
671 SwarmEvent::ExternalAddrConfirmed { address } => {
672 debug!(%address, "Confirmed external address");
673
674 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
675 self.swarm.behaviour_mut().identify.push(connected_peers);
676 }
677 SwarmEvent::ExternalAddrExpired { address } => {
678 debug!(%address, "External address expired");
679
680 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
681 self.swarm.behaviour_mut().identify.push(connected_peers);
682 }
683 other => {
684 trace!("Other swarm event: {:?}", other);
685 }
686 }
687 }
688
689 fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
690 if true {
692 return false;
693 }
694
695 if self.swarm.is_connected(peer_id) {
697 return false;
698 }
699
700 match &error {
701 DialError::Transport(addresses) => {
702 for (_, error) in addresses {
703 match error {
704 TransportError::MultiaddrNotSupported(_) => {
705 return true;
706 }
707 TransportError::Other(_) => {
708 if self.temporary_bans.lock().is_banned(peer_id) {
710 return false;
711 }
712 }
713 }
714 }
715 true
717 }
718 DialError::LocalPeerId { .. } => {
719 debug!("Local peer dial attempt detected.");
721
722 false
723 }
724 DialError::NoAddresses => {
725 true
727 }
728 DialError::DialPeerConditionFalse(_) => {
729 false
731 }
732 DialError::Aborted => {
733 false
735 }
736 DialError::WrongPeerId { .. } => {
737 false
739 }
740 DialError::Denied { .. } => {
741 false
743 }
744 }
745 }
746
747 async fn handle_identify_event(&mut self, event: IdentifyEvent) {
748 let local_peer_id = *self.swarm.local_peer_id();
749
750 if let IdentifyEvent::Received {
751 peer_id, mut info, ..
752 } = event
753 {
754 debug!(?peer_id, protocols = ?info.protocols, "IdentifyEvent::Received");
755
756 if info.protocol_version != self.protocol_version {
758 debug!(
759 %local_peer_id,
760 %peer_id,
761 local_protocol_version = %self.protocol_version,
762 peer_protocol_version = %info.protocol_version,
763 "Peer has different protocol version, banning temporarily",
764 );
765
766 self.temporary_bans.lock().create_or_extend(&peer_id);
767 let _ = self.swarm.disconnect_peer_id(peer_id);
769 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
770 self.known_peers_registry
771 .remove_all_known_peer_addresses(peer_id);
772
773 return;
774 }
775
776 self.temporary_bans.lock().remove(&peer_id);
778
779 if info.listen_addrs.len() > MAX_LISTEN_ADDRESSES {
780 debug!(
781 %local_peer_id,
782 %peer_id,
783 "Node has reported more than {} addresses; it is identified by {} and {}",
784 MAX_LISTEN_ADDRESSES, info.protocol_version, info.agent_version
785 );
786 info.listen_addrs.truncate(MAX_LISTEN_ADDRESSES);
787 }
788
789 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
790 let full_kademlia_support = kademlia
791 .protocol_names()
792 .iter()
793 .all(|local_protocol| info.protocols.contains(local_protocol));
794
795 if full_kademlia_support {
796 let received_addresses = info
797 .listen_addrs
798 .into_iter()
799 .filter(|address| {
800 if self.allow_non_global_addresses_in_dht
801 || is_global_address_or_dns(address)
802 {
803 true
804 } else {
805 trace!(
806 %local_peer_id,
807 %peer_id,
808 %address,
809 "Ignoring self-reported non-global address",
810 );
811
812 false
813 }
814 })
815 .collect::<Vec<_>>();
816 let received_address_strings = received_addresses
817 .iter()
818 .map(ToString::to_string)
819 .collect::<Vec<_>>();
820 let old_addresses = kademlia
821 .kbucket(peer_id)
822 .and_then(|peers| {
823 let key = peer_id.into();
824 peers.iter().find_map(|peer| {
825 (peer.node.key == &key).then_some(
826 peer.node
827 .value
828 .iter()
829 .filter(|existing_address| {
830 let existing_address = existing_address.to_string();
831
832 !received_address_strings.iter().any(|received_address| {
833 received_address.starts_with(&existing_address)
834 || existing_address.starts_with(received_address)
835 })
836 })
837 .cloned()
838 .collect::<Vec<_>>(),
839 )
840 })
841 })
842 .unwrap_or_default();
843
844 for address in received_addresses {
845 debug!(
846 %local_peer_id,
847 %peer_id,
848 %address,
849 protocol_names = ?kademlia.protocol_names(),
850 "Adding self-reported address to Kademlia DHT",
851 );
852
853 kademlia.add_address(&peer_id, address);
854 }
855
856 for old_address in old_addresses {
857 trace!(
858 %local_peer_id,
859 %peer_id,
860 %old_address,
861 "Removing old self-reported address from Kademlia DHT",
862 );
863
864 kademlia.remove_address(&peer_id, &old_address);
865 }
866
867 self.connected_servers.insert(peer_id);
868 } else {
869 debug!(
870 %local_peer_id,
871 %peer_id,
872 peer_protocols = ?info.protocols,
873 protocol_names = ?kademlia.protocol_names(),
874 "Peer doesn't support our Kademlia DHT protocol",
875 );
876
877 kademlia.remove_peer(&peer_id);
878 self.connected_servers.remove(&peer_id);
879 }
880 }
881 }
882
883 async fn handle_kademlia_event(&mut self, event: KademliaEvent) {
884 trace!("Kademlia event: {:?}", event);
885
886 match event {
887 KademliaEvent::InboundRequest {
888 request: InboundRequest::AddProvider { record, .. },
889 } => {
890 debug!("Unexpected AddProvider request received: {:?}", record);
891 }
892 KademliaEvent::UnroutablePeer { peer } => {
893 debug!(%peer, "Unroutable peer detected");
894
895 self.swarm.behaviour_mut().kademlia.remove_peer(&peer);
896
897 if let Some(shared) = self.shared_weak.upgrade() {
898 shared
899 .handlers
900 .peer_discovered
901 .call_simple(&PeerDiscovered::UnroutablePeer { peer_id: peer });
902 }
903 }
904 KademliaEvent::RoutablePeer { peer, address } => {
905 debug!(?address, "Routable peer detected: {:?}", peer);
906
907 if let Some(shared) = self.shared_weak.upgrade() {
908 shared
909 .handlers
910 .peer_discovered
911 .call_simple(&PeerDiscovered::RoutablePeer {
912 peer_id: peer,
913 address,
914 });
915 }
916 }
917 KademliaEvent::PendingRoutablePeer { peer, address } => {
918 debug!(?address, "Pending routable peer detected: {:?}", peer);
919
920 if let Some(shared) = self.shared_weak.upgrade() {
921 shared
922 .handlers
923 .peer_discovered
924 .call_simple(&PeerDiscovered::RoutablePeer {
925 peer_id: peer,
926 address,
927 });
928 }
929 }
930 KademliaEvent::OutboundQueryProgressed {
931 step: ProgressStep { last, .. },
932 id,
933 result: QueryResult::GetClosestPeers(result),
934 ..
935 } => {
936 let mut cancelled = false;
937 if let Some(QueryResultSender::ClosestPeers { sender, .. }) =
938 self.query_id_receivers.get(&id)
939 {
940 match result {
941 Ok(GetClosestPeersOk { key, peers }) => {
942 trace!(
943 "Get closest peers query for {} yielded {} results",
944 hex::encode(key),
945 peers.len(),
946 );
947
948 if peers.is_empty()
949 && self.swarm.connected_peers().next().is_some()
951 {
952 debug!("Random Kademlia query has yielded empty list of peers");
953 }
954
955 for peer in peers {
956 cancelled = Self::unbounded_send_and_cancel_on_error(
957 &mut self.swarm.behaviour_mut().kademlia,
958 sender,
959 peer.peer_id,
960 "GetClosestPeersOk",
961 &id,
962 ) || cancelled;
963 }
964 }
965 Err(GetClosestPeersError::Timeout { key, peers }) => {
966 debug!(
967 "Get closest peers query for {} timed out with {} results",
968 hex::encode(key),
969 peers.len(),
970 );
971
972 for peer in peers {
973 cancelled = Self::unbounded_send_and_cancel_on_error(
974 &mut self.swarm.behaviour_mut().kademlia,
975 sender,
976 peer.peer_id,
977 "GetClosestPeersError::Timeout",
978 &id,
979 ) || cancelled;
980 }
981 }
982 }
983 }
984
985 if last || cancelled {
986 self.query_id_receivers.remove(&id);
988 }
989 }
990 KademliaEvent::OutboundQueryProgressed {
991 step: ProgressStep { last, .. },
992 id,
993 result: QueryResult::GetRecord(result),
994 ..
995 } => {
996 let mut cancelled = false;
997 if let Some(QueryResultSender::Value { sender, .. }) =
998 self.query_id_receivers.get(&id)
999 {
1000 match result {
1001 Ok(GetRecordOk::FoundRecord(rec)) => {
1002 trace!(
1003 key = hex::encode(&rec.record.key),
1004 "Get record query succeeded",
1005 );
1006
1007 cancelled = Self::unbounded_send_and_cancel_on_error(
1008 &mut self.swarm.behaviour_mut().kademlia,
1009 sender,
1010 rec,
1011 "GetRecordOk",
1012 &id,
1013 ) || cancelled;
1014 }
1015 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
1016 trace!("Get record query yielded no results");
1017 }
1018 Err(error) => match error {
1019 GetRecordError::NotFound { key, .. } => {
1020 debug!(
1021 key = hex::encode(&key),
1022 "Get record query failed with no results",
1023 );
1024 }
1025 GetRecordError::QuorumFailed { key, records, .. } => {
1026 debug!(
1027 key = hex::encode(&key),
1028 "Get record query quorum failed with {} results",
1029 records.len(),
1030 );
1031 }
1032 GetRecordError::Timeout { key } => {
1033 debug!(key = hex::encode(&key), "Get record query timed out");
1034 }
1035 },
1036 }
1037 }
1038
1039 if last || cancelled {
1040 self.query_id_receivers.remove(&id);
1042 }
1043 }
1044 KademliaEvent::OutboundQueryProgressed {
1045 step: ProgressStep { last, .. },
1046 id,
1047 result: QueryResult::GetProviders(result),
1048 ..
1049 } => {
1050 let mut cancelled = false;
1051 if let Some(QueryResultSender::Providers { key, sender, .. }) =
1052 self.query_id_receivers.get(&id)
1053 {
1054 match result {
1055 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1056 trace!(
1057 key = hex::encode(&key),
1058 "Get providers query yielded {} results",
1059 providers.len(),
1060 );
1061
1062 for provider in providers {
1063 cancelled = Self::unbounded_send_and_cancel_on_error(
1064 &mut self.swarm.behaviour_mut().kademlia,
1065 sender,
1066 provider,
1067 "GetProvidersOk",
1068 &id,
1069 ) || cancelled;
1070 }
1071 }
1072 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
1073 trace!(
1074 key = hex::encode(key),
1075 closest_peers = %closest_peers.len(),
1076 "Get providers query yielded no results"
1077 );
1078 }
1079 Err(error) => {
1080 let GetProvidersError::Timeout { key, .. } = error;
1081
1082 debug!(
1083 key = hex::encode(&key),
1084 "Get providers query failed with no results",
1085 );
1086 }
1087 }
1088 }
1089
1090 if last || cancelled {
1091 self.query_id_receivers.remove(&id);
1093 }
1094 }
1095 KademliaEvent::OutboundQueryProgressed {
1096 step: ProgressStep { last, .. },
1097 id,
1098 result: QueryResult::PutRecord(result),
1099 ..
1100 } => {
1101 let mut cancelled = false;
1102 if let Some(QueryResultSender::PutValue { sender, .. }) =
1103 self.query_id_receivers.get(&id)
1104 {
1105 match result {
1106 Ok(PutRecordOk { key, .. }) => {
1107 trace!("Put record query for {} succeeded", hex::encode(&key));
1108
1109 cancelled = Self::unbounded_send_and_cancel_on_error(
1110 &mut self.swarm.behaviour_mut().kademlia,
1111 sender,
1112 (),
1113 "PutRecordOk",
1114 &id,
1115 ) || cancelled;
1116 }
1117 Err(error) => {
1118 debug!(?error, "Put record query failed.");
1119 }
1120 }
1121 }
1122
1123 if last || cancelled {
1124 self.query_id_receivers.remove(&id);
1126 }
1127 }
1128 KademliaEvent::OutboundQueryProgressed {
1129 step: ProgressStep { last, count },
1130 id,
1131 result: QueryResult::Bootstrap(result),
1132 stats,
1133 } => {
1134 debug!(?stats, %last, %count, ?id, ?result, "Bootstrap OutboundQueryProgressed step.");
1135
1136 let mut cancelled = false;
1137 if let Some(QueryResultSender::Bootstrap { sender }) =
1138 self.query_id_receivers.get_mut(&id)
1139 {
1140 match result {
1141 Ok(BootstrapOk {
1142 peer,
1143 num_remaining,
1144 }) => {
1145 trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded");
1146
1147 cancelled = Self::unbounded_send_and_cancel_on_error(
1148 &mut self.swarm.behaviour_mut().kademlia,
1149 sender,
1150 (),
1151 "Bootstrap",
1152 &id,
1153 ) || cancelled;
1154 }
1155 Err(error) => {
1156 debug!(?error, "Bootstrap query failed.");
1157 }
1158 }
1159 }
1160
1161 if last || cancelled {
1162 self.query_id_receivers.remove(&id);
1164 }
1165 }
1166 _ => {}
1167 }
1168 }
1169
1170 fn unbounded_send_and_cancel_on_error<T>(
1172 kademlia: &mut Kademlia<DummyRecordStore>,
1173 sender: &mpsc::UnboundedSender<T>,
1174 value: T,
1175 channel: &'static str,
1176 id: &QueryId,
1177 ) -> bool {
1178 if sender.unbounded_send(value).is_err() {
1179 debug!("{} channel was dropped", channel);
1180
1181 if let Some(mut query) = kademlia.query_mut(id) {
1183 query.finish();
1184 }
1185 true
1186 } else {
1187 false
1188 }
1189 }
1190
1191 async fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
1192 if let GossipsubEvent::Message { message, .. } = event
1193 && let Some(senders) = self.topic_subscription_senders.get(&message.topic)
1194 {
1195 let bytes = Bytes::from(message.data);
1196
1197 for sender in senders.values() {
1198 let _ = sender.unbounded_send(bytes.clone());
1200 }
1201 }
1202 }
1203
1204 async fn handle_request_response_event(&mut self, event: RequestResponseEvent) {
1205 trace!("Request response event: {:?}", event);
1207 }
1208
1209 async fn handle_autonat_event(&mut self, event: AutonatEvent) {
1210 trace!(?event, "Autonat event received.");
1211 let autonat = &self.swarm.behaviour().autonat;
1212 debug!(
1213 public_address=?autonat.public_address(),
1214 confidence=%autonat.confidence(),
1215 "Current public address confidence."
1216 );
1217
1218 match event {
1219 AutonatEvent::InboundProbe(_inbound_probe_event) => {
1220 }
1222 AutonatEvent::OutboundProbe(outbound_probe_event) => {
1223 match outbound_probe_event {
1224 OutboundProbeEvent::Request { peer, .. } => {
1225 self.swarm
1228 .behaviour_mut()
1229 .connection_limits
1230 .add_to_incoming_allow_list(
1232 peer,
1233 self.peer_ip_addresses
1234 .get(&peer)
1235 .iter()
1236 .flat_map(|ip_addresses| ip_addresses.iter())
1237 .copied(),
1238 1,
1239 );
1240 }
1241 OutboundProbeEvent::Response { peer, .. } => {
1242 self.swarm
1243 .behaviour_mut()
1244 .connection_limits
1245 .remove_from_incoming_allow_list(&peer, Some(1));
1246 }
1247 OutboundProbeEvent::Error { peer, .. } => {
1248 if let Some(peer) = peer {
1249 self.swarm
1250 .behaviour_mut()
1251 .connection_limits
1252 .remove_from_incoming_allow_list(&peer, Some(1));
1253 }
1254 }
1255 }
1256 }
1257 AutonatEvent::StatusChanged { old, new } => {
1258 debug!(?old, ?new, "Public address status changed.");
1259
1260 if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new.clone()) {
1262 self.swarm.remove_external_address(&old_address);
1263 debug!(
1264 ?old_address,
1265 new_status = ?new,
1266 "Removing old external address...",
1267 );
1268
1269 self.swarm.behaviour_mut().kademlia.set_mode(None);
1271 }
1272
1273 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
1274 self.swarm.behaviour_mut().identify.push(connected_peers);
1275 }
1276 }
1277 }
1278
1279 fn handle_command(&mut self, command: Command) {
1280 match command {
1281 Command::GetValue {
1282 key,
1283 result_sender,
1284 permit,
1285 } => {
1286 let query_id = self
1287 .swarm
1288 .behaviour_mut()
1289 .kademlia
1290 .get_record(key.to_bytes().into());
1291
1292 self.query_id_receivers.insert(
1293 query_id,
1294 QueryResultSender::Value {
1295 sender: result_sender,
1296 _permit: permit,
1297 },
1298 );
1299 }
1300 Command::PutValue {
1301 key,
1302 value,
1303 result_sender,
1304 permit,
1305 } => {
1306 let local_peer_id = *self.swarm.local_peer_id();
1307
1308 let record = Record {
1309 key: key.into(),
1310 value,
1311 publisher: Some(local_peer_id),
1312 expires: None, };
1314 let query_result = self
1315 .swarm
1316 .behaviour_mut()
1317 .kademlia
1318 .put_record(record, Quorum::One);
1319
1320 match query_result {
1321 Ok(query_id) => {
1322 self.query_id_receivers.insert(
1323 query_id,
1324 QueryResultSender::PutValue {
1325 sender: result_sender,
1326 _permit: permit,
1327 },
1328 );
1329 }
1330 Err(err) => {
1331 warn!(?err, "Failed to put value.");
1332 }
1333 }
1334 }
1335 Command::Subscribe {
1336 topic,
1337 result_sender,
1338 } => {
1339 if !self.swarm.behaviour().gossipsub.is_enabled() {
1340 panic!("Gossipsub protocol is disabled.");
1341 }
1342
1343 let topic_hash = topic.hash();
1344 let (sender, receiver) = mpsc::unbounded();
1345
1346 let subscription_id = self.next_subscription_id;
1348 self.next_subscription_id += 1;
1349
1350 let created_subscription = CreatedSubscription {
1351 subscription_id,
1352 receiver,
1353 };
1354
1355 match self.topic_subscription_senders.entry(topic_hash) {
1356 Entry::Occupied(mut entry) => {
1357 if result_sender.send(Ok(created_subscription)).is_ok() {
1359 entry.get_mut().insert(subscription_id, sender);
1360 }
1361 }
1362 Entry::Vacant(entry) => {
1363 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1366 match gossipsub.subscribe(&topic) {
1367 Ok(true) => {
1368 if result_sender.send(Ok(created_subscription)).is_ok() {
1369 entry
1370 .insert(IntMap::from_iter([(subscription_id, sender)]));
1371 }
1372 }
1373 Ok(false) => {
1374 panic!(
1375 "Logic error, topic subscription wasn't created, this \
1376 must never happen"
1377 );
1378 }
1379 Err(error) => {
1380 let _ = result_sender.send(Err(error));
1381 }
1382 }
1383 }
1384 }
1385 }
1386 }
1387 Command::Unsubscribe {
1388 topic,
1389 subscription_id,
1390 } => {
1391 if !self.swarm.behaviour().gossipsub.is_enabled() {
1392 panic!("Gossipsub protocol is disabled.");
1393 }
1394
1395 if let Entry::Occupied(mut entry) =
1396 self.topic_subscription_senders.entry(topic.hash())
1397 {
1398 entry.get_mut().remove(&subscription_id);
1399
1400 if entry.get().is_empty() {
1402 entry.remove_entry();
1403
1404 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut()
1405 && !gossipsub.unsubscribe(&topic)
1406 {
1407 warn!(
1408 "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1409 this is a logic error in the subspace or swarm libraries"
1410 );
1411 }
1412 }
1413 } else {
1414 error!(
1415 "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1416 this is a logic error in the subspace library"
1417 );
1418 }
1419 }
1420 Command::Publish {
1421 topic,
1422 message,
1423 result_sender,
1424 } => {
1425 if !self.swarm.behaviour().gossipsub.is_enabled() {
1426 panic!("Gossipsub protocol is disabled.");
1427 }
1428
1429 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1430 let _ =
1432 result_sender.send(gossipsub.publish(topic, message).map(|_message_id| ()));
1433 }
1434 }
1435 Command::GetClosestPeers {
1436 key,
1437 result_sender,
1438 permit,
1439 } => {
1440 let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(key);
1441
1442 self.query_id_receivers.insert(
1443 query_id,
1444 QueryResultSender::ClosestPeers {
1445 sender: result_sender,
1446 _permit: permit,
1447 },
1448 );
1449 }
1450 Command::GetClosestLocalPeers {
1451 key,
1452 source,
1453 result_sender,
1454 } => {
1455 let source = source.unwrap_or_else(|| *self.swarm.local_peer_id());
1456 let result = self
1457 .swarm
1458 .behaviour_mut()
1459 .kademlia
1460 .find_closest_local_peers(&KBucketKey::from(key), &source)
1461 .filter(|peer| !peer.multiaddrs.is_empty())
1462 .map(|peer| (peer.node_id, peer.multiaddrs))
1463 .collect();
1464
1465 let _ = result_sender.send(result);
1467 }
1468 Command::GenericRequest {
1469 peer_id,
1470 addresses,
1471 protocol_name,
1472 request,
1473 result_sender,
1474 } => {
1475 self.swarm.behaviour_mut().request_response.send_request(
1476 &peer_id,
1477 protocol_name,
1478 request,
1479 result_sender,
1480 IfDisconnected::TryConnect,
1481 addresses,
1482 );
1483 }
1484 Command::GetProviders {
1485 key,
1486 result_sender,
1487 permit,
1488 } => {
1489 let query_id = self
1490 .swarm
1491 .behaviour_mut()
1492 .kademlia
1493 .get_providers(key.clone());
1494
1495 self.query_id_receivers.insert(
1496 query_id,
1497 QueryResultSender::Providers {
1498 key,
1499 sender: result_sender,
1500 _permit: permit,
1501 },
1502 );
1503 }
1504 Command::BanPeer { peer_id } => {
1505 self.ban_peer(peer_id);
1506 }
1507 Command::Dial { address } => {
1508 let _ = self.swarm.dial(address);
1509 }
1510 Command::ConnectedPeers { result_sender } => {
1511 let connected_peers = self.swarm.connected_peers().cloned().collect();
1512
1513 let _ = result_sender.send(connected_peers);
1514 }
1515 Command::ConnectedServers { result_sender } => {
1516 let connected_servers = self.connected_servers.iter().cloned().collect();
1517
1518 let _ = result_sender.send(connected_servers);
1519 }
1520 Command::Bootstrap { result_sender } => {
1521 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
1522
1523 match kademlia.bootstrap() {
1524 Ok(query_id) => {
1525 if let Some(result_sender) = result_sender {
1526 self.query_id_receivers.insert(
1527 query_id,
1528 QueryResultSender::Bootstrap {
1529 sender: result_sender,
1530 },
1531 );
1532 }
1533 }
1534 Err(err) => {
1535 debug!(?err, "Bootstrap error.");
1536 }
1537 }
1538 }
1539 }
1540 }
1541
1542 fn ban_peer(&mut self, peer_id: PeerId) {
1543 self.temporary_bans.lock().remove(&peer_id);
1545
1546 debug!(?peer_id, "Banning peer on network level");
1547
1548 self.swarm.behaviour_mut().block_list.block_peer(peer_id);
1549 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
1550 self.known_peers_registry
1551 .remove_all_known_peer_addresses(peer_id);
1552 }
1553
1554 fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
1555 if let Some(ref mut metrics) = self.libp2p_metrics {
1556 match swarm_event {
1557 SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
1558 metrics.record(ping_event);
1559 }
1560 SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
1561 metrics.record(identify_event);
1562 }
1563 SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
1564 metrics.record(kademlia_event);
1565 }
1566 SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
1567 metrics.record(gossipsub_event);
1568 }
1569 swarm_event => {
1574 metrics.record(swarm_event);
1575 }
1576 }
1577 }
1578 }
1579
1580 fn log_kademlia_stats(&mut self) {
1581 let mut kad_peers = 0;
1582 let mut kad_peers_with_no_address = 0;
1583
1584 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1585 for entry in kbucket.iter() {
1586 kad_peers += 1;
1587 if entry.node.value.len() == 0 {
1588 kad_peers_with_no_address += 1;
1589 }
1590 }
1591 }
1592
1593 let (known_peers, known_peer_addresses) = self.known_peers_registry.count_known_peers();
1594
1595 let connected_peers = self.connected_servers.len();
1596
1597 let peers_with_ip_address = self.peer_ip_addresses.len();
1598 let peer_ip_address_count = self
1599 .peer_ip_addresses
1600 .values()
1601 .map(|addresses| addresses.len())
1602 .sum::<usize>();
1603
1604 debug!(
1605 %connected_peers,
1606 %kad_peers,
1607 %kad_peers_with_no_address,
1608 %known_peers,
1609 %known_peer_addresses,
1610 %peers_with_ip_address,
1611 %peer_ip_address_count,
1612 "dsn peers",
1613 );
1614
1615 if self.last_peer_stats_info_log.elapsed() >= PEER_INFO_LOG_INTERVAL {
1616 self.last_peer_stats_info_log = Instant::now();
1617 info!("dsn: actively using {connected_peers}/{kad_peers} known peers");
1618 }
1619 }
1620}