subspace_networking/protocols/
reserved_peers.rs

1mod handler;
2#[cfg(test)]
3mod tests;
4
5use futures::FutureExt;
6use futures_timer::Delay;
7use handler::Handler;
8use libp2p::core::transport::PortUse;
9use libp2p::core::{Endpoint, Multiaddr};
10use libp2p::swarm::behaviour::{ConnectionEstablished, FromSwarm};
11use libp2p::swarm::dial_opts::DialOpts;
12use libp2p::swarm::{
13    ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, NetworkBehaviour, THandler,
14    THandlerInEvent, THandlerOutEvent, ToSwarm,
15};
16use libp2p::PeerId;
17use std::collections::HashMap;
18use std::task::{Context, Poll, Waker};
19use std::time::Duration;
20use tracing::{debug, trace};
21
22use crate::utils::strip_peer_id;
23
24/// `Behaviour` controls and maintains the state of connections to a predefined set of peers.
25///
26/// The `Behaviour` struct is part of our custom protocol that aims to maintain persistent
27/// connections to a predefined set of peers. It encapsulates the logic of managing the connections,
28/// dialing, and handling various states of these connections.
29///
30/// ## How it works
31///
32/// Each `ReservedPeerState` can be in one of the following states, represented by the
33/// `ConnectionStatus` enum:
34/// 1. `NotConnected`: This state indicates that the peer is currently not connected.
35///    The time for the next connection attempt is scheduled and can be queried.
36/// 2. `PendingConnection`: This state means that a connection attempt to the peer is currently
37///    in progress.
38/// 3. `Connected`: This state signals that the peer is currently connected.
39///
40/// The protocol will attempt to establish a connection to a `NotConnected` peer after a set delay,
41/// specified by configurable dialing interval, to prevent multiple simultaneous connection attempts
42/// to offline peers. This delay not only conserves resources, but also reduces the amount of
43/// log output.
44///
45/// ## Comments
46///
47/// The protocol will establish one or two connections between each pair of reserved peers.
48///
49/// IMPORTANT NOTE: For the maintenance of a persistent connection, both peers should have each
50/// other in their `reserved peers set`. This is necessary because if only one peer has the other
51/// in its `reserved peers set`, regular connection attempts will occur, but these connections will
52/// be dismissed on the other side due to the `KeepAlive` policy.
53///
54#[derive(Debug)]
55pub struct Behaviour {
56    /// Protocol configuration.
57    config: Config,
58    /// A mapping from `PeerId` to `ReservedPeerState`, where each `ReservedPeerState`
59    /// represents the current state of the connection to a reserved peer.
60    reserved_peers_state: HashMap<PeerId, ReservedPeerState>,
61    /// Delay between dialing attempts.
62    dialing_delay: Delay,
63    /// Future waker.
64    waker: Option<Waker>,
65}
66
67/// Reserved peers protocol configuration.
68#[derive(Debug, Clone)]
69pub struct Config {
70    /// Predefined set of reserved peers with addresses.
71    pub reserved_peers: Vec<Multiaddr>,
72    /// Interval between new dialing attempts.
73    pub dialing_interval: Duration,
74}
75
76/// Reserved peer connection status.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum ConnectionStatus {
79    /// Reserved peer is not connected.
80    NotConnected,
81    /// Reserved peer dialing is in progress.
82    PendingConnection,
83    /// Reserved peer is connected.
84    Connected,
85}
86
87/// Defines the state of a reserved peer connection state.
88#[derive(Debug, Clone)]
89struct ReservedPeerState {
90    connection_status: ConnectionStatus,
91    peer_id: PeerId,
92    address: Multiaddr,
93}
94
95/// Reserved peer connection events.
96/// Initially the "reserved peers behaviour" doesn't produce events. However, we could pass
97/// reserved peer state changes to the swarm using this struct in the future.
98#[derive(Debug, Clone)]
99pub struct Event;
100
101impl Behaviour {
102    /// Creates a new `Behaviour` with a predefined set of reserved peers.
103    pub fn new(config: Config) -> Self {
104        debug!(
105            reserved_peers=?config.reserved_peers,
106            "Reserved peers protocol initialization...."
107        );
108
109        let peer_addresses = strip_peer_id(config.reserved_peers.clone());
110        let dialing_delay = Delay::new(config.dialing_interval);
111
112        let reserved_peers_state = peer_addresses
113            .into_iter()
114            .map(|(peer_id, address)| {
115                (
116                    peer_id,
117                    ReservedPeerState {
118                        peer_id,
119                        address,
120                        connection_status: ConnectionStatus::NotConnected,
121                    },
122                )
123            })
124            .collect();
125
126        Self {
127            config,
128            reserved_peers_state,
129            waker: None,
130            dialing_delay,
131        }
132    }
133
134    /// Create a connection handler for the reserved peers protocol.
135    fn new_reserved_peers_handler(&self, peer_id: &PeerId) -> Handler {
136        Handler::new(self.reserved_peers_state.contains_key(peer_id))
137    }
138
139    fn wake(&self) {
140        if let Some(waker) = &self.waker {
141            waker.wake_by_ref()
142        }
143    }
144}
145
146impl NetworkBehaviour for Behaviour {
147    type ConnectionHandler = Handler;
148    type ToSwarm = Event;
149
150    fn handle_established_inbound_connection(
151        &mut self,
152        _: ConnectionId,
153        peer_id: PeerId,
154        _: &Multiaddr,
155        _: &Multiaddr,
156    ) -> Result<THandler<Self>, ConnectionDenied> {
157        Ok(self.new_reserved_peers_handler(&peer_id))
158    }
159
160    fn handle_established_outbound_connection(
161        &mut self,
162        _: ConnectionId,
163        peer_id: PeerId,
164        _: &Multiaddr,
165        _: Endpoint,
166        _: PortUse,
167    ) -> Result<THandler<Self>, ConnectionDenied> {
168        Ok(self.new_reserved_peers_handler(&peer_id))
169    }
170
171    fn on_swarm_event(&mut self, event: FromSwarm) {
172        match event {
173            FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
174                if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
175                    state.connection_status = ConnectionStatus::Connected;
176
177                    debug!(peer_id=%state.peer_id, "Reserved peer connected.");
178                    self.wake();
179                }
180            }
181            FromSwarm::ConnectionClosed(ConnectionClosed {
182                peer_id,
183                remaining_established,
184                ..
185            }) => {
186                if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
187                    if remaining_established == 0 {
188                        state.connection_status = ConnectionStatus::NotConnected;
189
190                        debug!(%state.peer_id, "Reserved peer disconnected.");
191                        self.wake();
192                    }
193                }
194            }
195            FromSwarm::DialFailure(DialFailure {
196                peer_id: Some(peer_id),
197                ..
198            }) => {
199                if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
200                    if state.connection_status == ConnectionStatus::PendingConnection {
201                        state.connection_status = ConnectionStatus::NotConnected;
202                    };
203
204                    debug!(peer_id=%state.peer_id, "Reserved peer dialing failed.");
205                    self.wake();
206                }
207            }
208            _ => {}
209        }
210    }
211
212    fn on_connection_handler_event(
213        &mut self,
214        _: PeerId,
215        _: ConnectionId,
216        _: THandlerOutEvent<Self>,
217    ) {
218    }
219
220    fn poll(
221        &mut self,
222        cx: &mut Context<'_>,
223    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
224        // Schedule new peer dialing.
225        match self.dialing_delay.poll_unpin(cx) {
226            Poll::Pending => {}
227            Poll::Ready(()) => {
228                self.dialing_delay.reset(self.config.dialing_interval);
229
230                for (_, state) in self.reserved_peers_state.iter_mut() {
231                    trace!(?state, "Reserved peer state.");
232
233                    if let ConnectionStatus::NotConnected = state.connection_status {
234                        state.connection_status = ConnectionStatus::PendingConnection;
235
236                        debug!(peer_id=%state.peer_id, "Dialing the reserved peer....");
237
238                        let dial_opts =
239                            DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]);
240
241                        return Poll::Ready(ToSwarm::Dial {
242                            opts: dial_opts.build(),
243                        });
244                    }
245                }
246            }
247        }
248
249        self.waker.replace(cx.waker().clone());
250        Poll::Pending
251    }
252}