subspace_networking/constructor/
transport.rs1use crate::constructor::temporary_bans::TemporaryBans;
2use libp2p::core::multiaddr::{Multiaddr, Protocol};
3use libp2p::core::muxing::StreamMuxerBox;
4use libp2p::core::transport::{Boxed, DialOpts, ListenerId, TransportError, TransportEvent};
5use libp2p::core::Transport;
6use libp2p::dns::tokio::Transport as TokioTransport;
7use libp2p::tcp::tokio::Transport as TokioTcpTransport;
8use libp2p::tcp::Config as GenTcpConfig;
9use libp2p::yamux::Config as YamuxConfig;
10use libp2p::{core, identity, noise, PeerId};
11use parking_lot::Mutex;
12use std::io;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::Duration;
17use tracing::debug;
18
19pub(super) fn build_transport(
21 allow_non_global_addresses_in_dht: bool,
22 keypair: &identity::Keypair,
23 temporary_bans: Arc<Mutex<TemporaryBans>>,
24 timeout: Duration,
25 yamux_config: YamuxConfig,
26) -> io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
27 let wrapped_tcp = {
28 let tcp_config = GenTcpConfig::default().nodelay(true);
29
30 CustomTransportWrapper::new(
31 TokioTcpTransport::new(tcp_config),
32 allow_non_global_addresses_in_dht,
33 temporary_bans,
34 )
35 };
36
37 let tcp_upgraded = {
38 let noise =
39 noise::Config::new(keypair).expect("Signing libp2p-noise static DH keypair failed.");
40
41 wrapped_tcp
42 .upgrade(core::upgrade::Version::V1Lazy)
43 .authenticate(noise)
44 .multiplex(yamux_config)
45 .timeout(timeout)
46 .boxed()
47 };
48
49 Ok(TokioTransport::system(tcp_upgraded)?.boxed())
50}
51
52#[derive(Debug, Clone)]
53struct CustomTransportWrapper<T> {
54 base_transport: T,
55 allow_non_global_addresses: bool,
56 temporary_bans: Arc<Mutex<TemporaryBans>>,
57}
58
59impl<T> CustomTransportWrapper<T> {
60 fn new(
61 base_transport: T,
62 allow_non_global_addresses: bool,
63 temporary_bans: Arc<Mutex<TemporaryBans>>,
64 ) -> Self {
65 CustomTransportWrapper {
66 base_transport,
67 allow_non_global_addresses,
68 temporary_bans,
69 }
70 }
71}
72
73impl<T> Transport for CustomTransportWrapper<T>
74where
75 T: Transport + Unpin,
76 T::Error: From<io::Error>,
77{
78 type Output = T::Output;
79 type Error = T::Error;
80 type ListenerUpgrade = T::ListenerUpgrade;
81 type Dial = T::Dial;
82
83 fn listen_on(
84 &mut self,
85 id: ListenerId,
86 addr: Multiaddr,
87 ) -> Result<(), TransportError<Self::Error>> {
88 self.base_transport.listen_on(id, addr)
89 }
90
91 fn remove_listener(&mut self, id: ListenerId) -> bool {
92 self.base_transport.remove_listener(id)
93 }
94
95 fn dial(
96 &mut self,
97 addr: Multiaddr,
98 opts: DialOpts,
99 ) -> Result<Self::Dial, TransportError<Self::Error>> {
100 let mut addr_iter = addr.iter();
101
102 match addr_iter.next() {
103 Some(Protocol::Ip4(a)) => {
104 if !(self.allow_non_global_addresses || a.is_global()) {
105 debug!(?a, "Not dialing non global IP address.",);
106 return Err(TransportError::MultiaddrNotSupported(addr));
107 }
108 }
109 Some(Protocol::Ip6(a)) => {
110 if !(self.allow_non_global_addresses || a.is_global()) {
111 debug!(?a, "Not dialing non global IP address.");
112 return Err(TransportError::MultiaddrNotSupported(addr));
113 }
114 }
115 _ => {
116 }
118 }
119
120 {
121 let temporary_bans = self.temporary_bans.lock();
122 for protocol in addr_iter {
123 if let Protocol::P2p(peer_id) = protocol {
124 if temporary_bans.is_banned(&peer_id) {
125 let error = io::Error::other("Peer is temporarily banned");
126 return Err(TransportError::Other(error.into()));
127 }
128 }
129 }
130 }
131
132 self.base_transport.dial(addr, opts)
133 }
134
135 fn poll(
136 mut self: Pin<&mut Self>,
137 cx: &mut Context<'_>,
138 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
139 Pin::new(&mut self.base_transport).poll(cx)
140 }
141}