1use crate::behavior::persistent_parameters::{
2 append_p2p_suffix, remove_p2p_suffix, KnownPeersRegistry, PeerAddressRemovedEvent,
3};
4use crate::behavior::{Behavior, Event};
5use crate::constructor::temporary_bans::TemporaryBans;
6use crate::constructor::DummyRecordStore;
7use crate::protocols::request_response::request_response_factory::{
8 Event as RequestResponseEvent, IfDisconnected,
9};
10use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
11use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics};
12use async_lock::Mutex as AsyncMutex;
13use bytes::Bytes;
14use event_listener_primitives::HandlerId;
15use futures::channel::mpsc;
16use futures::future::Fuse;
17use futures::{FutureExt, StreamExt};
18use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
19use libp2p::core::ConnectedPoint;
20use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
21use libp2p::identify::Event as IdentifyEvent;
22use libp2p::kad::{
23 Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
24 GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
25 InboundRequest, KBucketKey, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult,
26 Quorum, Record, RecordKey,
27};
28use libp2p::metrics::{Metrics, Recorder};
29use libp2p::multiaddr::Protocol;
30use libp2p::swarm::dial_opts::DialOpts;
31use libp2p::swarm::{DialError, SwarmEvent};
32use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
33use nohash_hasher::IntMap;
34use parking_lot::Mutex;
35use std::collections::hash_map::Entry;
36use std::collections::{HashMap, HashSet};
37use std::fmt;
38use std::net::IpAddr;
39use std::pin::Pin;
40use std::sync::atomic::Ordering;
41use std::sync::{Arc, Weak};
42use std::time::Duration;
43use tokio::sync::OwnedSemaphorePermit;
44use tokio::task::yield_now;
45use tokio::time::Sleep;
46use tracing::{debug, error, trace, warn};
47
48enum QueryResultSender {
49 Value {
50 sender: mpsc::UnboundedSender<PeerRecord>,
51 _permit: OwnedSemaphorePermit,
53 },
54 ClosestPeers {
55 sender: mpsc::UnboundedSender<PeerId>,
56 _permit: Option<OwnedSemaphorePermit>,
58 },
59 Providers {
60 key: RecordKey,
61 sender: mpsc::UnboundedSender<PeerId>,
62 _permit: Option<OwnedSemaphorePermit>,
64 },
65 PutValue {
66 sender: mpsc::UnboundedSender<()>,
67 _permit: OwnedSemaphorePermit,
69 },
70 Bootstrap {
71 sender: mpsc::UnboundedSender<()>,
72 },
73}
74
75#[derive(Debug, Default)]
76enum BootstrapCommandState {
77 #[default]
78 NotStarted,
79 InProgress(mpsc::UnboundedReceiver<()>),
80 Finished,
81}
82
83#[must_use = "Node does not function properly unless its runner is driven forward"]
85pub struct NodeRunner {
86 allow_non_global_addresses_in_dht: bool,
88 is_listening: bool,
90 command_receiver: mpsc::Receiver<Command>,
91 swarm: Swarm<Behavior>,
92 shared_weak: Weak<Shared>,
93 next_random_query_interval: Duration,
95 query_id_receivers: HashMap<QueryId, QueryResultSender>,
96 next_subscription_id: usize,
99 topic_subscription_senders: HashMap<TopicHash, IntMap<usize, mpsc::UnboundedSender<Bytes>>>,
102 random_query_timeout: Pin<Box<Fuse<Sleep>>>,
103 periodical_tasks_interval: Pin<Box<Fuse<Sleep>>>,
105 known_peers_registry: Box<dyn KnownPeersRegistry>,
107 connected_servers: HashSet<PeerId>,
108 reserved_peers: HashMap<PeerId, Multiaddr>,
110 temporary_bans: Arc<Mutex<TemporaryBans>>,
112 libp2p_metrics: Option<Metrics>,
114 metrics: Option<SubspaceMetrics>,
116 peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
118 protocol_version: String,
120 bootstrap_addresses: Vec<Multiaddr>,
122 bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
124 removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
126 _address_removal_task_handler_id: Option<HandlerId>,
129}
130
131impl fmt::Debug for NodeRunner {
132 #[inline]
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("NodeRunner").finish_non_exhaustive()
135 }
136}
137
138pub(crate) struct NodeRunnerConfig {
140 pub(crate) allow_non_global_addresses_in_dht: bool,
141 pub(crate) is_listening: bool,
143 pub(crate) command_receiver: mpsc::Receiver<Command>,
144 pub(crate) swarm: Swarm<Behavior>,
145 pub(crate) shared_weak: Weak<Shared>,
146 pub(crate) next_random_query_interval: Duration,
147 pub(crate) known_peers_registry: Box<dyn KnownPeersRegistry>,
148 pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
149 pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
150 pub(crate) libp2p_metrics: Option<Metrics>,
151 pub(crate) metrics: Option<SubspaceMetrics>,
152 pub(crate) protocol_version: String,
153 pub(crate) bootstrap_addresses: Vec<Multiaddr>,
154}
155
156impl NodeRunner {
157 pub(crate) fn new(
158 NodeRunnerConfig {
159 allow_non_global_addresses_in_dht,
160 is_listening,
161 command_receiver,
162 swarm,
163 shared_weak,
164 next_random_query_interval,
165 mut known_peers_registry,
166 reserved_peers,
167 temporary_bans,
168 libp2p_metrics,
169 metrics,
170 protocol_version,
171 bootstrap_addresses,
172 }: NodeRunnerConfig,
173 ) -> Self {
174 let (removed_addresses_tx, removed_addresses_rx) = mpsc::unbounded();
176 let mut address_removal_task_handler_id = None;
177 if let Some(handler_id) = known_peers_registry.on_unreachable_address({
178 Arc::new(move |event| {
179 if let Err(error) = removed_addresses_tx.unbounded_send(event.clone()) {
180 debug!(?error, ?event, "Cannot send PeerAddressRemovedEvent")
181 };
182 })
183 }) {
184 address_removal_task_handler_id.replace(handler_id);
185 }
186
187 Self {
188 allow_non_global_addresses_in_dht,
189 is_listening,
190 command_receiver,
191 swarm,
192 shared_weak,
193 next_random_query_interval,
194 query_id_receivers: HashMap::default(),
195 next_subscription_id: 0,
196 topic_subscription_senders: HashMap::default(),
197 random_query_timeout: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
199 periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
201 known_peers_registry,
202 connected_servers: HashSet::new(),
203 reserved_peers,
204 temporary_bans,
205 libp2p_metrics,
206 metrics,
207 peer_ip_addresses: HashMap::new(),
208 protocol_version,
209 bootstrap_addresses,
210 bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
211 removed_addresses_rx,
212 _address_removal_task_handler_id: address_removal_task_handler_id,
213 }
214 }
215
216 pub async fn run(&mut self) {
218 if self.is_listening {
219 loop {
222 if self.swarm.listeners().next().is_some() {
223 break;
224 }
225
226 if let Some(swarm_event) = self.swarm.next().await {
227 self.register_event_metrics(&swarm_event);
228 self.handle_swarm_event(swarm_event).await;
229 } else {
230 break;
231 }
232 }
233 }
234
235 self.bootstrap().await;
236
237 loop {
238 futures::select! {
239 _ = &mut self.random_query_timeout => {
240 self.handle_random_query_interval();
241 self.random_query_timeout =
243 Box::pin(tokio::time::sleep(self.next_random_query_interval).fuse());
244 self.next_random_query_interval =
245 (self.next_random_query_interval * 2).min(Duration::from_secs(60));
246 },
247 swarm_event = self.swarm.next() => {
248 if let Some(swarm_event) = swarm_event {
249 self.register_event_metrics(&swarm_event);
250 self.handle_swarm_event(swarm_event).await;
251 } else {
252 break;
253 }
254 },
255 command = self.command_receiver.next() => {
256 if let Some(command) = command {
257 self.handle_command(command);
258 } else {
259 break;
260 }
261 },
262 _ = self.known_peers_registry.run().fuse() => {
263 trace!("Network parameters registry runner exited.")
264 },
265 _ = &mut self.periodical_tasks_interval => {
266 self.handle_periodical_tasks().await;
267
268 self.periodical_tasks_interval =
269 Box::pin(tokio::time::sleep(Duration::from_secs(5)).fuse());
270 },
271 event = self.removed_addresses_rx.select_next_some() => {
272 self.handle_removed_address_event(event);
273 },
274 }
275
276 yield_now().await;
278 }
279 }
280
281 async fn bootstrap(&mut self) {
283 for (peer_id, address) in strip_peer_id(self.bootstrap_addresses.clone()) {
285 self.swarm
286 .behaviour_mut()
287 .kademlia
288 .add_address(&peer_id, address);
289 }
290
291 let known_peers = self.known_peers_registry.all_known_peers().await;
292
293 if !known_peers.is_empty() {
294 for (peer_id, addresses) in known_peers {
295 for address in addresses.clone() {
296 let address = match address.with_p2p(peer_id) {
297 Ok(address) => address,
298 Err(address) => {
299 warn!(%peer_id, %address, "Failed to add peer ID to known peer address");
300 break;
301 }
302 };
303 self.swarm
304 .behaviour_mut()
305 .kademlia
306 .add_address(&peer_id, address);
307 }
308
309 if let Err(error) = self
310 .swarm
311 .dial(DialOpts::peer_id(peer_id).addresses(addresses).build())
312 {
313 warn!(%peer_id, %error, "Failed to dial peer during bootstrapping");
314 }
315 }
316
317 self.handle_command(Command::Bootstrap {
319 result_sender: None,
320 });
321 return;
322 }
323
324 let bootstrap_command_state = self.bootstrap_command_state.clone();
325 let mut bootstrap_command_state = bootstrap_command_state.lock().await;
326 let bootstrap_command_receiver = match &mut *bootstrap_command_state {
327 BootstrapCommandState::NotStarted => {
328 debug!("Bootstrap started.");
329
330 let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
331
332 self.handle_command(Command::Bootstrap {
333 result_sender: Some(bootstrap_command_sender),
334 });
335
336 *bootstrap_command_state =
337 BootstrapCommandState::InProgress(bootstrap_command_receiver);
338 match &mut *bootstrap_command_state {
339 BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
340 bootstrap_command_receiver
341 }
342 _ => {
343 unreachable!("Was just set to that exact value");
344 }
345 }
346 }
347 BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
348 bootstrap_command_receiver
349 }
350 BootstrapCommandState::Finished => {
351 return;
352 }
353 };
354
355 let mut bootstrap_step = 0;
356 loop {
357 futures::select! {
358 swarm_event = self.swarm.next() => {
359 if let Some(swarm_event) = swarm_event {
360 self.register_event_metrics(&swarm_event);
361 self.handle_swarm_event(swarm_event).await;
362 } else {
363 break;
364 }
365 },
366 result = bootstrap_command_receiver.next() => {
367 if result.is_some() {
368 debug!(%bootstrap_step, "Kademlia bootstrapping...");
369 bootstrap_step += 1;
370 } else {
371 break;
372 }
373 }
374 }
375 }
376
377 debug!("Bootstrap finished.");
378 *bootstrap_command_state = BootstrapCommandState::Finished;
379 }
380
381 async fn handle_periodical_tasks(&mut self) {
383 let network_info = self.swarm.network_info();
385 let connections = network_info.connection_counters();
386
387 debug!(?connections, "Current connections and limits.");
388
389 let mut external_addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
391
392 if let Some(shared) = self.shared_weak.upgrade() {
393 debug!(?external_addresses, "Renew external addresses.");
394 let mut addresses = shared.external_addresses.lock();
395 addresses.clear();
396 addresses.append(&mut external_addresses);
397 }
398
399 self.log_kademlia_stats();
400 }
401
402 fn handle_random_query_interval(&mut self) {
403 let random_peer_id = PeerId::random();
404
405 trace!("Starting random Kademlia query for {}", random_peer_id);
406
407 self.swarm
408 .behaviour_mut()
409 .kademlia
410 .get_closest_peers(random_peer_id);
411 }
412
413 fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) {
414 trace!(?event, "Peer address removed event.");
415
416 let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone())
417 .into_iter()
418 .map(|(peer_id, _)| peer_id)
419 .collect::<Vec<_>>();
420
421 if bootstrap_node_ids.contains(&event.peer_id) {
422 debug!(
423 ?event,
424 ?bootstrap_node_ids,
425 "Skipped removing bootstrap node from Kademlia buckets."
426 );
427
428 return;
429 }
430
431 self.swarm.behaviour_mut().kademlia.remove_address(
433 &event.peer_id,
434 &append_p2p_suffix(event.peer_id, event.address.clone()),
435 );
436
437 self.swarm
438 .behaviour_mut()
439 .kademlia
440 .remove_address(&event.peer_id, &remove_p2p_suffix(event.address));
441 }
442
443 fn handle_remove_listeners(&mut self, removed_listeners: &[Multiaddr]) {
444 let shared = match self.shared_weak.upgrade() {
445 Some(shared) => shared,
446 None => {
447 return;
448 }
449 };
450
451 let peer_id = shared.id;
453 shared.listeners.lock().retain(|old_listener| {
454 !removed_listeners.contains(&append_p2p_suffix(peer_id, old_listener.clone()))
455 && !removed_listeners.contains(&remove_p2p_suffix(old_listener.clone()))
456 });
457 }
458
459 async fn handle_swarm_event(&mut self, swarm_event: SwarmEvent<Event>) {
460 match swarm_event {
461 SwarmEvent::Behaviour(Event::Identify(event)) => {
462 self.handle_identify_event(event).await;
463 }
464 SwarmEvent::Behaviour(Event::Kademlia(event)) => {
465 self.handle_kademlia_event(event).await;
466 }
467 SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
468 self.handle_gossipsub_event(event).await;
469 }
470 SwarmEvent::Behaviour(Event::RequestResponse(event)) => {
471 self.handle_request_response_event(event).await;
472 }
473 SwarmEvent::Behaviour(Event::Autonat(event)) => {
474 self.handle_autonat_event(event).await;
475 }
476 ref event @ SwarmEvent::NewListenAddr { ref address, .. } => {
477 trace!(?event, "New local listener event.");
478
479 let shared = match self.shared_weak.upgrade() {
480 Some(shared) => shared,
481 None => {
482 return;
483 }
484 };
485 shared.listeners.lock().push(address.clone());
486 shared.handlers.new_listener.call_simple(address);
487 }
488 ref event @ SwarmEvent::ListenerClosed { ref addresses, .. } => {
489 trace!(?event, "Local listener closed event.");
490 self.handle_remove_listeners(addresses);
491 }
492 ref event @ SwarmEvent::ExpiredListenAddr { ref address, .. } => {
493 trace!(?event, "Local listener expired event.");
494 self.handle_remove_listeners(&[address.clone()]);
495 }
496 SwarmEvent::ConnectionEstablished {
497 peer_id,
498 endpoint,
499 num_established,
500 ..
501 } => {
502 if let ConnectedPoint::Dialer { address, .. } = &endpoint {
504 if self.allow_non_global_addresses_in_dht || is_global_address_or_dns(address) {
506 self.known_peers_registry
507 .add_known_peer(peer_id, vec![address.clone()])
508 .await;
509 }
510 };
511
512 let shared = match self.shared_weak.upgrade() {
513 Some(shared) => shared,
514 None => {
515 return;
516 }
517 };
518
519 let is_reserved_peer = self.reserved_peers.contains_key(&peer_id);
520 debug!(
521 %peer_id,
522 %is_reserved_peer,
523 ?endpoint,
524 %num_established,
525 "Connection established"
526 );
527
528 let maybe_remote_ip =
529 endpoint
530 .get_remote_address()
531 .iter()
532 .find_map(|protocol| match protocol {
533 Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
534 Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
535 _ => None,
536 });
537 if let Some(ip) = maybe_remote_ip {
538 self.peer_ip_addresses
539 .entry(peer_id)
540 .and_modify(|ips| {
541 ips.insert(ip);
542 })
543 .or_insert(HashSet::from([ip]));
544 }
545
546 let num_established_peer_connections = shared
547 .num_established_peer_connections
548 .fetch_add(1, Ordering::SeqCst)
549 + 1;
550
551 shared
552 .handlers
553 .num_established_peer_connections_change
554 .call_simple(&num_established_peer_connections);
555
556 if num_established.get() == 1 {
558 shared.handlers.connected_peer.call_simple(&peer_id);
559 }
560
561 if let Some(metrics) = self.metrics.as_ref() {
562 metrics.inc_established_connections()
563 }
564 }
565 SwarmEvent::ConnectionClosed {
566 peer_id,
567 num_established,
568 cause,
569 ..
570 } => {
571 let shared = match self.shared_weak.upgrade() {
572 Some(shared) => shared,
573 None => {
574 return;
575 }
576 };
577
578 debug!(
579 %peer_id,
580 ?cause,
581 %num_established,
582 "Connection closed with peer"
583 );
584
585 if num_established == 0 {
586 self.peer_ip_addresses.remove(&peer_id);
587 self.connected_servers.remove(&peer_id);
588 }
589 let num_established_peer_connections = shared
590 .num_established_peer_connections
591 .fetch_sub(1, Ordering::SeqCst)
592 - 1;
593
594 shared
595 .handlers
596 .num_established_peer_connections_change
597 .call_simple(&num_established_peer_connections);
598
599 if num_established == 0 {
601 shared.handlers.disconnected_peer.call_simple(&peer_id);
602 }
603
604 if let Some(metrics) = self.metrics.as_ref() {
605 metrics.dec_established_connections()
606 };
607 }
608 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
609 if let Some(peer_id) = &peer_id {
610 let should_ban_temporarily =
611 self.should_temporary_ban_on_dial_error(peer_id, &error);
612
613 trace!(%should_ban_temporarily, "Temporary bans conditions.");
614
615 if should_ban_temporarily {
616 self.temporary_bans.lock().create_or_extend(peer_id);
617 debug!(%peer_id, ?error, "Peer was temporarily banned.");
618 }
619 }
620
621 debug!(
622 ?peer_id,
623 ?error,
624 "SwarmEvent::OutgoingConnectionError for peer."
625 );
626
627 match error {
628 DialError::Transport(ref addresses) => {
629 for (addr, _) in addresses {
630 trace!(?error, ?peer_id, %addr, "SwarmEvent::OutgoingConnectionError (DialError::Transport) for peer.");
631 if let Some(peer_id) = peer_id {
632 self.known_peers_registry
633 .remove_known_peer_addresses(peer_id, vec![addr.clone()])
634 .await;
635 }
636 }
637 }
638 DialError::WrongPeerId { obtained, .. } => {
639 trace!(?error, ?peer_id, obtained_peer_id=?obtained, "SwarmEvent::WrongPeerId (DialError::WrongPeerId) for peer.");
640
641 if let Some(ref peer_id) = peer_id {
642 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
643 let _ = kademlia.remove_peer(peer_id);
644 }
645 }
646 _ => {
647 trace!(?error, ?peer_id, "SwarmEvent::OutgoingConnectionError");
648 }
649 }
650 }
651 SwarmEvent::NewExternalAddrCandidate { address } => {
652 trace!(%address, "External address candidate");
653 }
654 SwarmEvent::ExternalAddrConfirmed { address } => {
655 debug!(%address, "Confirmed external address");
656
657 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
658 self.swarm.behaviour_mut().identify.push(connected_peers);
659 }
660 SwarmEvent::ExternalAddrExpired { address } => {
661 debug!(%address, "External address expired");
662
663 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
664 self.swarm.behaviour_mut().identify.push(connected_peers);
665 }
666 other => {
667 trace!("Other swarm event: {:?}", other);
668 }
669 }
670 }
671
672 fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
673 if true {
675 return false;
676 }
677
678 if self.swarm.is_connected(peer_id) {
680 return false;
681 }
682
683 match &error {
684 DialError::Transport(addresses) => {
685 for (_, error) in addresses {
686 match error {
687 TransportError::MultiaddrNotSupported(_) => {
688 return true;
689 }
690 TransportError::Other(_) => {
691 if self.temporary_bans.lock().is_banned(peer_id) {
693 return false;
694 }
695 }
696 }
697 }
698 true
700 }
701 DialError::LocalPeerId { .. } => {
702 debug!("Local peer dial attempt detected.");
704
705 false
706 }
707 DialError::NoAddresses => {
708 true
710 }
711 DialError::DialPeerConditionFalse(_) => {
712 false
714 }
715 DialError::Aborted => {
716 false
718 }
719 DialError::WrongPeerId { .. } => {
720 false
722 }
723 DialError::Denied { .. } => {
724 false
726 }
727 }
728 }
729
730 async fn handle_identify_event(&mut self, event: IdentifyEvent) {
731 let local_peer_id = *self.swarm.local_peer_id();
732
733 if let IdentifyEvent::Received {
734 peer_id, mut info, ..
735 } = event
736 {
737 debug!(?peer_id, protocols = ?info.protocols, "IdentifyEvent::Received");
738
739 if info.protocol_version != self.protocol_version {
741 debug!(
742 %local_peer_id,
743 %peer_id,
744 local_protocol_version = %self.protocol_version,
745 peer_protocol_version = %info.protocol_version,
746 "Peer has different protocol version, banning temporarily",
747 );
748
749 self.temporary_bans.lock().create_or_extend(&peer_id);
750 let _ = self.swarm.disconnect_peer_id(peer_id);
752 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
753 self.known_peers_registry
754 .remove_all_known_peer_addresses(peer_id);
755
756 return;
757 }
758
759 self.temporary_bans.lock().remove(&peer_id);
761
762 if info.listen_addrs.len() > 30 {
763 debug!(
764 %local_peer_id,
765 %peer_id,
766 "Node has reported more than 30 addresses; it is identified by {} and {}",
767 info.protocol_version, info.agent_version
768 );
769 info.listen_addrs.truncate(30);
770 }
771
772 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
773 let full_kademlia_support = kademlia
774 .protocol_names()
775 .iter()
776 .all(|local_protocol| info.protocols.contains(local_protocol));
777
778 if full_kademlia_support {
779 let received_addresses = info
780 .listen_addrs
781 .into_iter()
782 .filter(|address| {
783 if self.allow_non_global_addresses_in_dht
784 || is_global_address_or_dns(address)
785 {
786 true
787 } else {
788 trace!(
789 %local_peer_id,
790 %peer_id,
791 %address,
792 "Ignoring self-reported non-global address",
793 );
794
795 false
796 }
797 })
798 .collect::<Vec<_>>();
799 let received_address_strings = received_addresses
800 .iter()
801 .map(ToString::to_string)
802 .collect::<Vec<_>>();
803 let old_addresses = kademlia
804 .kbucket(peer_id)
805 .and_then(|peers| {
806 let key = peer_id.into();
807 peers.iter().find_map(|peer| {
808 (peer.node.key == &key).then_some(
809 peer.node
810 .value
811 .iter()
812 .filter(|existing_address| {
813 let existing_address = existing_address.to_string();
814
815 !received_address_strings.iter().any(|received_address| {
816 received_address.starts_with(&existing_address)
817 || existing_address.starts_with(received_address)
818 })
819 })
820 .cloned()
821 .collect::<Vec<_>>(),
822 )
823 })
824 })
825 .unwrap_or_default();
826
827 for address in received_addresses {
828 debug!(
829 %local_peer_id,
830 %peer_id,
831 %address,
832 protocol_names = ?kademlia.protocol_names(),
833 "Adding self-reported address to Kademlia DHT",
834 );
835
836 kademlia.add_address(&peer_id, address);
837 }
838
839 for old_address in old_addresses {
840 trace!(
841 %local_peer_id,
842 %peer_id,
843 %old_address,
844 "Removing old self-reported address from Kademlia DHT",
845 );
846
847 kademlia.remove_address(&peer_id, &old_address);
848 }
849
850 self.connected_servers.insert(peer_id);
851 } else {
852 debug!(
853 %local_peer_id,
854 %peer_id,
855 peer_protocols = ?info.protocols,
856 protocol_names = ?kademlia.protocol_names(),
857 "Peer doesn't support our Kademlia DHT protocol",
858 );
859
860 kademlia.remove_peer(&peer_id);
861 self.connected_servers.remove(&peer_id);
862 }
863 }
864 }
865
866 async fn handle_kademlia_event(&mut self, event: KademliaEvent) {
867 trace!("Kademlia event: {:?}", event);
868
869 match event {
870 KademliaEvent::InboundRequest {
871 request: InboundRequest::AddProvider { record, .. },
872 } => {
873 debug!("Unexpected AddProvider request received: {:?}", record);
874 }
875 KademliaEvent::UnroutablePeer { peer } => {
876 debug!(%peer, "Unroutable peer detected");
877
878 self.swarm.behaviour_mut().kademlia.remove_peer(&peer);
879
880 if let Some(shared) = self.shared_weak.upgrade() {
881 shared
882 .handlers
883 .peer_discovered
884 .call_simple(&PeerDiscovered::UnroutablePeer { peer_id: peer });
885 }
886 }
887 KademliaEvent::RoutablePeer { peer, address } => {
888 debug!(?address, "Routable peer detected: {:?}", peer);
889
890 if let Some(shared) = self.shared_weak.upgrade() {
891 shared
892 .handlers
893 .peer_discovered
894 .call_simple(&PeerDiscovered::RoutablePeer {
895 peer_id: peer,
896 address,
897 });
898 }
899 }
900 KademliaEvent::PendingRoutablePeer { peer, address } => {
901 debug!(?address, "Pending routable peer detected: {:?}", peer);
902
903 if let Some(shared) = self.shared_weak.upgrade() {
904 shared
905 .handlers
906 .peer_discovered
907 .call_simple(&PeerDiscovered::RoutablePeer {
908 peer_id: peer,
909 address,
910 });
911 }
912 }
913 KademliaEvent::OutboundQueryProgressed {
914 step: ProgressStep { last, .. },
915 id,
916 result: QueryResult::GetClosestPeers(result),
917 ..
918 } => {
919 let mut cancelled = false;
920 if let Some(QueryResultSender::ClosestPeers { sender, .. }) =
921 self.query_id_receivers.get(&id)
922 {
923 match result {
924 Ok(GetClosestPeersOk { key, peers }) => {
925 trace!(
926 "Get closest peers query for {} yielded {} results",
927 hex::encode(key),
928 peers.len(),
929 );
930
931 if peers.is_empty()
932 && self.swarm.connected_peers().next().is_some()
934 {
935 debug!("Random Kademlia query has yielded empty list of peers");
936 }
937
938 for peer in peers {
939 cancelled = Self::unbounded_send_and_cancel_on_error(
940 &mut self.swarm.behaviour_mut().kademlia,
941 sender,
942 peer.peer_id,
943 "GetClosestPeersOk",
944 &id,
945 ) || cancelled;
946 }
947 }
948 Err(GetClosestPeersError::Timeout { key, peers }) => {
949 debug!(
950 "Get closest peers query for {} timed out with {} results",
951 hex::encode(key),
952 peers.len(),
953 );
954
955 for peer in peers {
956 cancelled = Self::unbounded_send_and_cancel_on_error(
957 &mut self.swarm.behaviour_mut().kademlia,
958 sender,
959 peer.peer_id,
960 "GetClosestPeersError::Timeout",
961 &id,
962 ) || cancelled;
963 }
964 }
965 }
966 }
967
968 if last || cancelled {
969 self.query_id_receivers.remove(&id);
971 }
972 }
973 KademliaEvent::OutboundQueryProgressed {
974 step: ProgressStep { last, .. },
975 id,
976 result: QueryResult::GetRecord(result),
977 ..
978 } => {
979 let mut cancelled = false;
980 if let Some(QueryResultSender::Value { sender, .. }) =
981 self.query_id_receivers.get(&id)
982 {
983 match result {
984 Ok(GetRecordOk::FoundRecord(rec)) => {
985 trace!(
986 key = hex::encode(&rec.record.key),
987 "Get record query succeeded",
988 );
989
990 cancelled = Self::unbounded_send_and_cancel_on_error(
991 &mut self.swarm.behaviour_mut().kademlia,
992 sender,
993 rec,
994 "GetRecordOk",
995 &id,
996 ) || cancelled;
997 }
998 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
999 trace!("Get record query yielded no results");
1000 }
1001 Err(error) => match error {
1002 GetRecordError::NotFound { key, .. } => {
1003 debug!(
1004 key = hex::encode(&key),
1005 "Get record query failed with no results",
1006 );
1007 }
1008 GetRecordError::QuorumFailed { key, records, .. } => {
1009 debug!(
1010 key = hex::encode(&key),
1011 "Get record query quorum failed with {} results",
1012 records.len(),
1013 );
1014 }
1015 GetRecordError::Timeout { key } => {
1016 debug!(key = hex::encode(&key), "Get record query timed out");
1017 }
1018 },
1019 }
1020 }
1021
1022 if last || cancelled {
1023 self.query_id_receivers.remove(&id);
1025 }
1026 }
1027 KademliaEvent::OutboundQueryProgressed {
1028 step: ProgressStep { last, .. },
1029 id,
1030 result: QueryResult::GetProviders(result),
1031 ..
1032 } => {
1033 let mut cancelled = false;
1034 if let Some(QueryResultSender::Providers { key, sender, .. }) =
1035 self.query_id_receivers.get(&id)
1036 {
1037 match result {
1038 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1039 trace!(
1040 key = hex::encode(&key),
1041 "Get providers query yielded {} results",
1042 providers.len(),
1043 );
1044
1045 for provider in providers {
1046 cancelled = Self::unbounded_send_and_cancel_on_error(
1047 &mut self.swarm.behaviour_mut().kademlia,
1048 sender,
1049 provider,
1050 "GetProvidersOk",
1051 &id,
1052 ) || cancelled;
1053 }
1054 }
1055 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
1056 trace!(
1057 key = hex::encode(key),
1058 closest_peers = %closest_peers.len(),
1059 "Get providers query yielded no results"
1060 );
1061 }
1062 Err(error) => {
1063 let GetProvidersError::Timeout { key, .. } = error;
1064
1065 debug!(
1066 key = hex::encode(&key),
1067 "Get providers query failed with no results",
1068 );
1069 }
1070 }
1071 }
1072
1073 if last || cancelled {
1074 self.query_id_receivers.remove(&id);
1076 }
1077 }
1078 KademliaEvent::OutboundQueryProgressed {
1079 step: ProgressStep { last, .. },
1080 id,
1081 result: QueryResult::PutRecord(result),
1082 ..
1083 } => {
1084 let mut cancelled = false;
1085 if let Some(QueryResultSender::PutValue { sender, .. }) =
1086 self.query_id_receivers.get(&id)
1087 {
1088 match result {
1089 Ok(PutRecordOk { key, .. }) => {
1090 trace!("Put record query for {} succeeded", hex::encode(&key));
1091
1092 cancelled = Self::unbounded_send_and_cancel_on_error(
1093 &mut self.swarm.behaviour_mut().kademlia,
1094 sender,
1095 (),
1096 "PutRecordOk",
1097 &id,
1098 ) || cancelled;
1099 }
1100 Err(error) => {
1101 debug!(?error, "Put record query failed.");
1102 }
1103 }
1104 }
1105
1106 if last || cancelled {
1107 self.query_id_receivers.remove(&id);
1109 }
1110 }
1111 KademliaEvent::OutboundQueryProgressed {
1112 step: ProgressStep { last, count },
1113 id,
1114 result: QueryResult::Bootstrap(result),
1115 stats,
1116 } => {
1117 debug!(?stats, %last, %count, ?id, ?result, "Bootstrap OutboundQueryProgressed step.");
1118
1119 let mut cancelled = false;
1120 if let Some(QueryResultSender::Bootstrap { sender }) =
1121 self.query_id_receivers.get_mut(&id)
1122 {
1123 match result {
1124 Ok(BootstrapOk {
1125 peer,
1126 num_remaining,
1127 }) => {
1128 trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded");
1129
1130 cancelled = Self::unbounded_send_and_cancel_on_error(
1131 &mut self.swarm.behaviour_mut().kademlia,
1132 sender,
1133 (),
1134 "Bootstrap",
1135 &id,
1136 ) || cancelled;
1137 }
1138 Err(error) => {
1139 debug!(?error, "Bootstrap query failed.");
1140 }
1141 }
1142 }
1143
1144 if last || cancelled {
1145 self.query_id_receivers.remove(&id);
1147 }
1148 }
1149 _ => {}
1150 }
1151 }
1152
1153 fn unbounded_send_and_cancel_on_error<T>(
1155 kademlia: &mut Kademlia<DummyRecordStore>,
1156 sender: &mpsc::UnboundedSender<T>,
1157 value: T,
1158 channel: &'static str,
1159 id: &QueryId,
1160 ) -> bool {
1161 if sender.unbounded_send(value).is_err() {
1162 debug!("{} channel was dropped", channel);
1163
1164 if let Some(mut query) = kademlia.query_mut(id) {
1166 query.finish();
1167 }
1168 true
1169 } else {
1170 false
1171 }
1172 }
1173
1174 async fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
1175 if let GossipsubEvent::Message { message, .. } = event {
1176 if let Some(senders) = self.topic_subscription_senders.get(&message.topic) {
1177 let bytes = Bytes::from(message.data);
1178
1179 for sender in senders.values() {
1180 let _ = sender.unbounded_send(bytes.clone());
1182 }
1183 }
1184 }
1185 }
1186
1187 async fn handle_request_response_event(&mut self, event: RequestResponseEvent) {
1188 trace!("Request response event: {:?}", event);
1190 }
1191
1192 async fn handle_autonat_event(&mut self, event: AutonatEvent) {
1193 trace!(?event, "Autonat event received.");
1194 let autonat = &self.swarm.behaviour().autonat;
1195 debug!(
1196 public_address=?autonat.public_address(),
1197 confidence=%autonat.confidence(),
1198 "Current public address confidence."
1199 );
1200
1201 match event {
1202 AutonatEvent::InboundProbe(_inbound_probe_event) => {
1203 }
1205 AutonatEvent::OutboundProbe(outbound_probe_event) => {
1206 match outbound_probe_event {
1207 OutboundProbeEvent::Request { peer, .. } => {
1208 self.swarm
1211 .behaviour_mut()
1212 .connection_limits
1213 .add_to_incoming_allow_list(
1215 peer,
1216 self.peer_ip_addresses
1217 .get(&peer)
1218 .iter()
1219 .flat_map(|ip_addresses| ip_addresses.iter())
1220 .copied(),
1221 1,
1222 );
1223 }
1224 OutboundProbeEvent::Response { peer, .. } => {
1225 self.swarm
1226 .behaviour_mut()
1227 .connection_limits
1228 .remove_from_incoming_allow_list(&peer, Some(1));
1229 }
1230 OutboundProbeEvent::Error { peer, .. } => {
1231 if let Some(peer) = peer {
1232 self.swarm
1233 .behaviour_mut()
1234 .connection_limits
1235 .remove_from_incoming_allow_list(&peer, Some(1));
1236 }
1237 }
1238 }
1239 }
1240 AutonatEvent::StatusChanged { old, new } => {
1241 debug!(?old, ?new, "Public address status changed.");
1242
1243 if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new.clone()) {
1245 self.swarm.remove_external_address(&old_address);
1246 debug!(
1247 ?old_address,
1248 new_status = ?new,
1249 "Removing old external address...",
1250 );
1251
1252 self.swarm.behaviour_mut().kademlia.set_mode(None);
1254 }
1255
1256 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
1257 self.swarm.behaviour_mut().identify.push(connected_peers);
1258 }
1259 }
1260 }
1261
1262 fn handle_command(&mut self, command: Command) {
1263 match command {
1264 Command::GetValue {
1265 key,
1266 result_sender,
1267 permit,
1268 } => {
1269 let query_id = self
1270 .swarm
1271 .behaviour_mut()
1272 .kademlia
1273 .get_record(key.to_bytes().into());
1274
1275 self.query_id_receivers.insert(
1276 query_id,
1277 QueryResultSender::Value {
1278 sender: result_sender,
1279 _permit: permit,
1280 },
1281 );
1282 }
1283 Command::PutValue {
1284 key,
1285 value,
1286 result_sender,
1287 permit,
1288 } => {
1289 let local_peer_id = *self.swarm.local_peer_id();
1290
1291 let record = Record {
1292 key: key.into(),
1293 value,
1294 publisher: Some(local_peer_id),
1295 expires: None, };
1297 let query_result = self
1298 .swarm
1299 .behaviour_mut()
1300 .kademlia
1301 .put_record(record, Quorum::One);
1302
1303 match query_result {
1304 Ok(query_id) => {
1305 self.query_id_receivers.insert(
1306 query_id,
1307 QueryResultSender::PutValue {
1308 sender: result_sender,
1309 _permit: permit,
1310 },
1311 );
1312 }
1313 Err(err) => {
1314 warn!(?err, "Failed to put value.");
1315 }
1316 }
1317 }
1318 Command::Subscribe {
1319 topic,
1320 result_sender,
1321 } => {
1322 if !self.swarm.behaviour().gossipsub.is_enabled() {
1323 panic!("Gossipsub protocol is disabled.");
1324 }
1325
1326 let topic_hash = topic.hash();
1327 let (sender, receiver) = mpsc::unbounded();
1328
1329 let subscription_id = self.next_subscription_id;
1331 self.next_subscription_id += 1;
1332
1333 let created_subscription = CreatedSubscription {
1334 subscription_id,
1335 receiver,
1336 };
1337
1338 match self.topic_subscription_senders.entry(topic_hash) {
1339 Entry::Occupied(mut entry) => {
1340 if result_sender.send(Ok(created_subscription)).is_ok() {
1342 entry.get_mut().insert(subscription_id, sender);
1343 }
1344 }
1345 Entry::Vacant(entry) => {
1346 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1349 match gossipsub.subscribe(&topic) {
1350 Ok(true) => {
1351 if result_sender.send(Ok(created_subscription)).is_ok() {
1352 entry
1353 .insert(IntMap::from_iter([(subscription_id, sender)]));
1354 }
1355 }
1356 Ok(false) => {
1357 panic!(
1358 "Logic error, topic subscription wasn't created, this \
1359 must never happen"
1360 );
1361 }
1362 Err(error) => {
1363 let _ = result_sender.send(Err(error));
1364 }
1365 }
1366 }
1367 }
1368 }
1369 }
1370 Command::Unsubscribe {
1371 topic,
1372 subscription_id,
1373 } => {
1374 if !self.swarm.behaviour().gossipsub.is_enabled() {
1375 panic!("Gossipsub protocol is disabled.");
1376 }
1377
1378 if let Entry::Occupied(mut entry) =
1379 self.topic_subscription_senders.entry(topic.hash())
1380 {
1381 entry.get_mut().remove(&subscription_id);
1382
1383 if entry.get().is_empty() {
1385 entry.remove_entry();
1386
1387 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1388 if !gossipsub.unsubscribe(&topic) {
1389 warn!(
1390 "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1391 this is a logic error in the subspace or swarm libraries"
1392 );
1393 }
1394 }
1395 }
1396 } else {
1397 error!(
1398 "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1399 this is a logic error in the subspace library"
1400 );
1401 }
1402 }
1403 Command::Publish {
1404 topic,
1405 message,
1406 result_sender,
1407 } => {
1408 if !self.swarm.behaviour().gossipsub.is_enabled() {
1409 panic!("Gossipsub protocol is disabled.");
1410 }
1411
1412 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1413 let _ =
1415 result_sender.send(gossipsub.publish(topic, message).map(|_message_id| ()));
1416 }
1417 }
1418 Command::GetClosestPeers {
1419 key,
1420 result_sender,
1421 permit,
1422 } => {
1423 let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(key);
1424
1425 self.query_id_receivers.insert(
1426 query_id,
1427 QueryResultSender::ClosestPeers {
1428 sender: result_sender,
1429 _permit: permit,
1430 },
1431 );
1432 }
1433 Command::GetClosestLocalPeers {
1434 key,
1435 source,
1436 result_sender,
1437 } => {
1438 let source = source.unwrap_or_else(|| *self.swarm.local_peer_id());
1439 let result = self
1440 .swarm
1441 .behaviour_mut()
1442 .kademlia
1443 .find_closest_local_peers(&KBucketKey::from(key), &source)
1444 .filter(|peer| !peer.multiaddrs.is_empty())
1445 .map(|peer| (peer.node_id, peer.multiaddrs))
1446 .collect();
1447
1448 let _ = result_sender.send(result);
1450 }
1451 Command::GenericRequest {
1452 peer_id,
1453 addresses,
1454 protocol_name,
1455 request,
1456 result_sender,
1457 } => {
1458 self.swarm.behaviour_mut().request_response.send_request(
1459 &peer_id,
1460 protocol_name,
1461 request,
1462 result_sender,
1463 IfDisconnected::TryConnect,
1464 addresses,
1465 );
1466 }
1467 Command::GetProviders {
1468 key,
1469 result_sender,
1470 permit,
1471 } => {
1472 let query_id = self
1473 .swarm
1474 .behaviour_mut()
1475 .kademlia
1476 .get_providers(key.clone());
1477
1478 self.query_id_receivers.insert(
1479 query_id,
1480 QueryResultSender::Providers {
1481 key,
1482 sender: result_sender,
1483 _permit: permit,
1484 },
1485 );
1486 }
1487 Command::BanPeer { peer_id } => {
1488 self.ban_peer(peer_id);
1489 }
1490 Command::Dial { address } => {
1491 let _ = self.swarm.dial(address);
1492 }
1493 Command::ConnectedPeers { result_sender } => {
1494 let connected_peers = self.swarm.connected_peers().cloned().collect();
1495
1496 let _ = result_sender.send(connected_peers);
1497 }
1498 Command::ConnectedServers { result_sender } => {
1499 let connected_servers = self.connected_servers.iter().cloned().collect();
1500
1501 let _ = result_sender.send(connected_servers);
1502 }
1503 Command::Bootstrap { result_sender } => {
1504 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
1505
1506 match kademlia.bootstrap() {
1507 Ok(query_id) => {
1508 if let Some(result_sender) = result_sender {
1509 self.query_id_receivers.insert(
1510 query_id,
1511 QueryResultSender::Bootstrap {
1512 sender: result_sender,
1513 },
1514 );
1515 }
1516 }
1517 Err(err) => {
1518 debug!(?err, "Bootstrap error.");
1519 }
1520 }
1521 }
1522 }
1523 }
1524
1525 fn ban_peer(&mut self, peer_id: PeerId) {
1526 self.temporary_bans.lock().remove(&peer_id);
1528
1529 debug!(?peer_id, "Banning peer on network level");
1530
1531 self.swarm.behaviour_mut().block_list.block_peer(peer_id);
1532 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
1533 self.known_peers_registry
1534 .remove_all_known_peer_addresses(peer_id);
1535 }
1536
1537 fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
1538 if let Some(ref mut metrics) = self.libp2p_metrics {
1539 match swarm_event {
1540 SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
1541 metrics.record(ping_event);
1542 }
1543 SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
1544 metrics.record(identify_event);
1545 }
1546 SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
1547 metrics.record(kademlia_event);
1548 }
1549 SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
1550 metrics.record(gossipsub_event);
1551 }
1552 swarm_event => {
1557 metrics.record(swarm_event);
1558 }
1559 }
1560 }
1561 }
1562
1563 fn log_kademlia_stats(&mut self) {
1564 let mut peer_counter = 0;
1565 let mut peer_with_no_address_counter = 0;
1566 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1567 for entry in kbucket.iter() {
1568 peer_counter += 1;
1569 if entry.node.value.len() == 0 {
1570 peer_with_no_address_counter += 1;
1571 }
1572 }
1573 }
1574
1575 debug!(
1576 peers = %peer_counter,
1577 peers_with_no_address = %peer_with_no_address_counter,
1578 "Kademlia stats"
1579 );
1580 }
1581}