subspace_networking/protocols/reserved_peers.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
mod handler;
#[cfg(test)]
mod tests;
use futures::FutureExt;
use futures_timer::Delay;
use handler::Handler;
use libp2p::core::transport::PortUse;
use libp2p::core::{Endpoint, Multiaddr};
use libp2p::swarm::behaviour::{ConnectionEstablished, FromSwarm};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{
ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use libp2p::PeerId;
use std::collections::HashMap;
use std::task::{Context, Poll, Waker};
use std::time::Duration;
use tracing::{debug, trace};
use crate::utils::strip_peer_id;
/// `Behaviour` controls and maintains the state of connections to a predefined set of peers.
///
/// The `Behaviour` struct is part of our custom protocol that aims to maintain persistent
/// connections to a predefined set of peers. It encapsulates the logic of managing the connections,
/// dialing, and handling various states of these connections.
///
/// ## How it works
///
/// Each `ReservedPeerState` can be in one of the following states, represented by the
/// `ConnectionStatus` enum:
/// 1. `NotConnected`: This state indicates that the peer is currently not connected.
/// The time for the next connection attempt is scheduled and can be queried.
/// 2. `PendingConnection`: This state means that a connection attempt to the peer is currently
/// in progress.
/// 3. `Connected`: This state signals that the peer is currently connected.
///
/// The protocol will attempt to establish a connection to a `NotConnected` peer after a set delay,
/// specified by configurable dialing interval, to prevent multiple simultaneous connection attempts
/// to offline peers. This delay not only conserves resources, but also reduces the amount of
/// log output.
///
/// ## Comments
///
/// The protocol will establish one or two connections between each pair of reserved peers.
///
/// IMPORTANT NOTE: For the maintenance of a persistent connection, both peers should have each
/// other in their `reserved peers set`. This is necessary because if only one peer has the other
/// in its `reserved peers set`, regular connection attempts will occur, but these connections will
/// be dismissed on the other side due to the `KeepAlive` policy.
///
#[derive(Debug)]
pub struct Behaviour {
/// Protocol configuration.
config: Config,
/// A mapping from `PeerId` to `ReservedPeerState`, where each `ReservedPeerState`
/// represents the current state of the connection to a reserved peer.
reserved_peers_state: HashMap<PeerId, ReservedPeerState>,
/// Delay between dialing attempts.
dialing_delay: Delay,
/// Future waker.
waker: Option<Waker>,
}
/// Reserved peers protocol configuration.
#[derive(Debug, Clone)]
pub struct Config {
/// Predefined set of reserved peers with addresses.
pub reserved_peers: Vec<Multiaddr>,
/// Interval between new dialing attempts.
pub dialing_interval: Duration,
}
/// Reserved peer connection status.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionStatus {
/// Reserved peer is not connected.
NotConnected,
/// Reserved peer dialing is in progress.
PendingConnection,
/// Reserved peer is connected.
Connected,
}
/// Defines the state of a reserved peer connection state.
#[derive(Debug, Clone)]
struct ReservedPeerState {
connection_status: ConnectionStatus,
peer_id: PeerId,
address: Multiaddr,
}
/// Reserved peer connection events.
/// Initially the "reserved peers behaviour" doesn't produce events. However, we could pass
/// reserved peer state changes to the swarm using this struct in the future.
#[derive(Debug, Clone)]
pub struct Event;
impl Behaviour {
/// Creates a new `Behaviour` with a predefined set of reserved peers.
pub fn new(config: Config) -> Self {
debug!(
reserved_peers=?config.reserved_peers,
"Reserved peers protocol initialization...."
);
let peer_addresses = strip_peer_id(config.reserved_peers.clone());
let dialing_delay = Delay::new(config.dialing_interval);
let reserved_peers_state = peer_addresses
.into_iter()
.map(|(peer_id, address)| {
(
peer_id,
ReservedPeerState {
peer_id,
address,
connection_status: ConnectionStatus::NotConnected,
},
)
})
.collect();
Self {
config,
reserved_peers_state,
waker: None,
dialing_delay,
}
}
/// Create a connection handler for the reserved peers protocol.
fn new_reserved_peers_handler(&self, peer_id: &PeerId) -> Handler {
Handler::new(self.reserved_peers_state.contains_key(peer_id))
}
fn wake(&self) {
if let Some(waker) = &self.waker {
waker.wake_by_ref()
}
}
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type ToSwarm = Event;
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
peer_id: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(self.new_reserved_peers_handler(&peer_id))
}
fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
peer_id: PeerId,
_: &Multiaddr,
_: Endpoint,
_: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(self.new_reserved_peers_handler(&peer_id))
}
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
state.connection_status = ConnectionStatus::Connected;
debug!(peer_id=%state.peer_id, "Reserved peer connected.");
self.wake();
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
..
}) => {
if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
if remaining_established == 0 {
state.connection_status = ConnectionStatus::NotConnected;
debug!(%state.peer_id, "Reserved peer disconnected.");
self.wake();
}
}
}
FromSwarm::DialFailure(DialFailure {
peer_id: Some(peer_id),
..
}) => {
if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
if state.connection_status == ConnectionStatus::PendingConnection {
state.connection_status = ConnectionStatus::NotConnected;
};
debug!(peer_id=%state.peer_id, "Reserved peer dialing failed.");
self.wake();
}
}
_ => {}
}
}
fn on_connection_handler_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: THandlerOutEvent<Self>,
) {
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Schedule new peer dialing.
match self.dialing_delay.poll_unpin(cx) {
Poll::Pending => {}
Poll::Ready(()) => {
self.dialing_delay.reset(self.config.dialing_interval);
for (_, state) in self.reserved_peers_state.iter_mut() {
trace!(?state, "Reserved peer state.");
if let ConnectionStatus::NotConnected = state.connection_status {
state.connection_status = ConnectionStatus::PendingConnection;
debug!(peer_id=%state.peer_id, "Dialing the reserved peer....");
let dial_opts =
DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]);
return Poll::Ready(ToSwarm::Dial {
opts: dial_opts.build(),
});
}
}
}
}
self.waker.replace(cx.waker().clone());
Poll::Pending
}
}