subspace_networking/
utils.rs

1//! Miscellaneous utilities for networking.
2
3pub(crate) mod key_with_distance;
4pub mod multihash;
5pub mod piece_provider;
6pub(crate) mod rate_limiter;
7
8use event_listener_primitives::Bag;
9use futures::future::{Fuse, FusedFuture, FutureExt};
10use libp2p::multiaddr::Protocol;
11use libp2p::{Multiaddr, PeerId};
12use prometheus_client::metrics::gauge::Gauge;
13use prometheus_client::registry::Registry;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use tokio::runtime::Handle;
19use tokio::task;
20use tracing::warn;
21
22const NETWORKING_REGISTRY_PREFIX: &str = "subspace";
23
24/// Metrics for Subspace networking
25pub struct SubspaceMetrics {
26    established_connections: Gauge,
27}
28
29impl SubspaceMetrics {
30    /// Constructor
31    pub fn new(registry: &mut Registry) -> Self {
32        let sub_registry = registry.sub_registry_with_prefix(NETWORKING_REGISTRY_PREFIX);
33
34        let gauge = Gauge::default();
35        sub_registry.register(
36            "established_connections",
37            "The current number of established connections",
38            gauge.clone(),
39        );
40
41        Self {
42            established_connections: gauge,
43        }
44    }
45
46    pub(crate) fn inc_established_connections(&self) {
47        self.established_connections.inc();
48    }
49
50    pub(crate) fn dec_established_connections(&self) {
51        self.established_connections.dec();
52    }
53}
54
55/// Joins async join handle on drop
56pub(crate) struct AsyncJoinOnDrop<T>(Option<Fuse<task::JoinHandle<T>>>);
57
58impl<T> Drop for AsyncJoinOnDrop<T> {
59    fn drop(&mut self) {
60        let handle = self.0.take().expect("Always called exactly once; qed");
61        if !handle.is_terminated() {
62            task::block_in_place(move || {
63                let _ = Handle::current().block_on(handle);
64            });
65        }
66    }
67}
68
69impl<T> AsyncJoinOnDrop<T> {
70    // Create new instance
71    pub(crate) fn new(handle: task::JoinHandle<T>) -> Self {
72        Self(Some(handle.fuse()))
73    }
74}
75
76impl<T> Future for AsyncJoinOnDrop<T> {
77    type Output = Result<T, task::JoinError>;
78
79    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80        Pin::new(self.0.as_mut().expect("Only dropped in Drop impl; qed")).poll(cx)
81    }
82}
83
84/// This test is successful only for global IP addresses and DNS names.
85pub(crate) fn is_global_address_or_dns(addr: &Multiaddr) -> bool {
86    match addr.iter().next() {
87        Some(Protocol::Ip4(ip)) => ip.is_global(),
88        Some(Protocol::Ip6(ip)) => ip.is_global(),
89        Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => true,
90        _ => false,
91    }
92}
93
94/// Convenience alias for peer ID and its multiaddresses.
95pub type PeerAddress = (PeerId, Multiaddr);
96
97/// Helper function. Converts multiaddresses to a tuple with peer ID removing the peer Id suffix.
98/// It logs incorrect multiaddresses.
99pub fn strip_peer_id(addresses: Vec<Multiaddr>) -> Vec<PeerAddress> {
100    addresses
101        .into_iter()
102        .filter_map(|multiaddr| {
103            let mut modified_multiaddr = multiaddr.clone();
104
105            let peer_id: Option<PeerId> = modified_multiaddr.pop().and_then(|protocol| {
106                if let Protocol::P2p(peer_id) = protocol {
107                    Some(peer_id)
108                } else {
109                    None
110                }
111            });
112
113            if let Some(peer_id) = peer_id {
114                Some((peer_id, modified_multiaddr))
115            } else {
116                warn!(%multiaddr, "Incorrect multiaddr provided.");
117
118                None
119            }
120        })
121        .collect()
122}
123
124pub(crate) type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
125pub(crate) type Handler<A> = Bag<HandlerFn<A>, A>;