subspace_networking/constructor/
transport.rs

1use crate::constructor::temporary_bans::TemporaryBans;
2use libp2p::core::multiaddr::{Multiaddr, Protocol};
3use libp2p::core::muxing::StreamMuxerBox;
4use libp2p::core::transport::{Boxed, DialOpts, ListenerId, TransportError, TransportEvent};
5use libp2p::core::Transport;
6use libp2p::dns::tokio::Transport as TokioTransport;
7use libp2p::tcp::tokio::Transport as TokioTcpTransport;
8use libp2p::tcp::Config as GenTcpConfig;
9use libp2p::yamux::Config as YamuxConfig;
10use libp2p::{core, identity, noise, PeerId};
11use parking_lot::Mutex;
12use std::io;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::Duration;
17use tracing::debug;
18
19// Builds the transport stack that LibP2P will communicate over along with a relay client.
20pub(super) fn build_transport(
21    allow_non_global_addresses_in_dht: bool,
22    keypair: &identity::Keypair,
23    temporary_bans: Arc<Mutex<TemporaryBans>>,
24    timeout: Duration,
25    yamux_config: YamuxConfig,
26) -> io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
27    let wrapped_tcp = {
28        let tcp_config = GenTcpConfig::default().nodelay(true);
29
30        CustomTransportWrapper::new(
31            TokioTcpTransport::new(tcp_config),
32            allow_non_global_addresses_in_dht,
33            temporary_bans,
34        )
35    };
36
37    let tcp_upgraded = {
38        let noise =
39            noise::Config::new(keypair).expect("Signing libp2p-noise static DH keypair failed.");
40
41        wrapped_tcp
42            .upgrade(core::upgrade::Version::V1Lazy)
43            .authenticate(noise)
44            .multiplex(yamux_config)
45            .timeout(timeout)
46            .boxed()
47    };
48
49    Ok(TokioTransport::system(tcp_upgraded)?.boxed())
50}
51
52#[derive(Debug, Clone)]
53struct CustomTransportWrapper<T> {
54    base_transport: T,
55    allow_non_global_addresses: bool,
56    temporary_bans: Arc<Mutex<TemporaryBans>>,
57}
58
59impl<T> CustomTransportWrapper<T> {
60    fn new(
61        base_transport: T,
62        allow_non_global_addresses: bool,
63        temporary_bans: Arc<Mutex<TemporaryBans>>,
64    ) -> Self {
65        CustomTransportWrapper {
66            base_transport,
67            allow_non_global_addresses,
68            temporary_bans,
69        }
70    }
71}
72
73impl<T> Transport for CustomTransportWrapper<T>
74where
75    T: Transport + Unpin,
76    T::Error: From<io::Error>,
77{
78    type Output = T::Output;
79    type Error = T::Error;
80    type ListenerUpgrade = T::ListenerUpgrade;
81    type Dial = T::Dial;
82
83    fn listen_on(
84        &mut self,
85        id: ListenerId,
86        addr: Multiaddr,
87    ) -> Result<(), TransportError<Self::Error>> {
88        self.base_transport.listen_on(id, addr)
89    }
90
91    fn remove_listener(&mut self, id: ListenerId) -> bool {
92        self.base_transport.remove_listener(id)
93    }
94
95    fn dial(
96        &mut self,
97        addr: Multiaddr,
98        opts: DialOpts,
99    ) -> Result<Self::Dial, TransportError<Self::Error>> {
100        let mut addr_iter = addr.iter();
101
102        match addr_iter.next() {
103            Some(Protocol::Ip4(a)) => {
104                if !(self.allow_non_global_addresses || a.is_global()) {
105                    debug!(?a, "Not dialing non global IP address.",);
106                    return Err(TransportError::MultiaddrNotSupported(addr));
107                }
108            }
109            Some(Protocol::Ip6(a)) => {
110                if !(self.allow_non_global_addresses || a.is_global()) {
111                    debug!(?a, "Not dialing non global IP address.");
112                    return Err(TransportError::MultiaddrNotSupported(addr));
113                }
114            }
115            _ => {
116                // TODO: This will not catch DNS records pointing to private addresses
117            }
118        }
119
120        {
121            let temporary_bans = self.temporary_bans.lock();
122            for protocol in addr_iter {
123                if let Protocol::P2p(peer_id) = protocol {
124                    if temporary_bans.is_banned(&peer_id) {
125                        let error = io::Error::other("Peer is temporarily banned");
126                        return Err(TransportError::Other(error.into()));
127                    }
128                }
129            }
130        }
131
132        self.base_transport.dial(addr, opts)
133    }
134
135    fn poll(
136        mut self: Pin<&mut Self>,
137        cx: &mut Context<'_>,
138    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
139        Pin::new(&mut self.base_transport).poll(cx)
140    }
141}