subspace_networking/
utils.rs1pub(crate) mod key_with_distance;
4pub mod multihash;
5pub mod piece_provider;
6pub(crate) mod rate_limiter;
7
8use event_listener_primitives::Bag;
9use futures::future::{Fuse, FusedFuture, FutureExt};
10use libp2p::multiaddr::Protocol;
11use libp2p::{Multiaddr, PeerId};
12use prometheus_client::metrics::gauge::Gauge;
13use prometheus_client::registry::Registry;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use tokio::runtime::Handle;
19use tokio::task;
20use tracing::warn;
21
22const NETWORKING_REGISTRY_PREFIX: &str = "subspace";
23
24pub struct SubspaceMetrics {
26 established_connections: Gauge,
27}
28
29impl SubspaceMetrics {
30 pub fn new(registry: &mut Registry) -> Self {
32 let sub_registry = registry.sub_registry_with_prefix(NETWORKING_REGISTRY_PREFIX);
33
34 let gauge = Gauge::default();
35 sub_registry.register(
36 "established_connections",
37 "The current number of established connections",
38 gauge.clone(),
39 );
40
41 Self {
42 established_connections: gauge,
43 }
44 }
45
46 pub(crate) fn inc_established_connections(&self) {
47 self.established_connections.inc();
48 }
49
50 pub(crate) fn dec_established_connections(&self) {
51 self.established_connections.dec();
52 }
53}
54
55pub(crate) struct AsyncJoinOnDrop<T>(Option<Fuse<task::JoinHandle<T>>>);
57
58impl<T> Drop for AsyncJoinOnDrop<T> {
59 fn drop(&mut self) {
60 let handle = self.0.take().expect("Always called exactly once; qed");
61 if !handle.is_terminated() {
62 task::block_in_place(move || {
63 let _ = Handle::current().block_on(handle);
64 });
65 }
66 }
67}
68
69impl<T> AsyncJoinOnDrop<T> {
70 pub(crate) fn new(handle: task::JoinHandle<T>) -> Self {
72 Self(Some(handle.fuse()))
73 }
74}
75
76impl<T> Future for AsyncJoinOnDrop<T> {
77 type Output = Result<T, task::JoinError>;
78
79 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 Pin::new(self.0.as_mut().expect("Only dropped in Drop impl; qed")).poll(cx)
81 }
82}
83
84pub(crate) fn is_global_address_or_dns(addr: &Multiaddr) -> bool {
86 match addr.iter().next() {
87 Some(Protocol::Ip4(ip)) => ip.is_global(),
88 Some(Protocol::Ip6(ip)) => ip.is_global(),
89 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => true,
90 _ => false,
91 }
92}
93
94pub type PeerAddress = (PeerId, Multiaddr);
96
97pub fn strip_peer_id(addresses: Vec<Multiaddr>) -> Vec<PeerAddress> {
100 addresses
101 .into_iter()
102 .filter_map(|multiaddr| {
103 let mut modified_multiaddr = multiaddr.clone();
104
105 let peer_id: Option<PeerId> = modified_multiaddr.pop().and_then(|protocol| {
106 if let Protocol::P2p(peer_id) = protocol {
107 Some(peer_id)
108 } else {
109 None
110 }
111 });
112
113 if let Some(peer_id) = peer_id {
114 Some((peer_id, modified_multiaddr))
115 } else {
116 warn!(%multiaddr, "Incorrect multiaddr provided.");
117
118 None
119 }
120 })
121 .collect()
122}
123
124pub(crate) type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
125pub(crate) type Handler<A> = Bag<HandlerFn<A>, A>;