subspace_networking/protocols/
reserved_peers.rs1mod handler;
2#[cfg(test)]
3mod tests;
4
5use futures::FutureExt;
6use futures_timer::Delay;
7use handler::Handler;
8use libp2p::core::transport::PortUse;
9use libp2p::core::{Endpoint, Multiaddr};
10use libp2p::swarm::behaviour::{ConnectionEstablished, FromSwarm};
11use libp2p::swarm::dial_opts::DialOpts;
12use libp2p::swarm::{
13 ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, NetworkBehaviour, THandler,
14 THandlerInEvent, THandlerOutEvent, ToSwarm,
15};
16use libp2p::PeerId;
17use std::collections::HashMap;
18use std::task::{Context, Poll, Waker};
19use std::time::Duration;
20use tracing::{debug, trace};
21
22use crate::utils::strip_peer_id;
23
24#[derive(Debug)]
55pub struct Behaviour {
56 config: Config,
58 reserved_peers_state: HashMap<PeerId, ReservedPeerState>,
61 dialing_delay: Delay,
63 waker: Option<Waker>,
65}
66
67#[derive(Debug, Clone)]
69pub struct Config {
70 pub reserved_peers: Vec<Multiaddr>,
72 pub dialing_interval: Duration,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum ConnectionStatus {
79 NotConnected,
81 PendingConnection,
83 Connected,
85}
86
87#[derive(Debug, Clone)]
89struct ReservedPeerState {
90 connection_status: ConnectionStatus,
91 peer_id: PeerId,
92 address: Multiaddr,
93}
94
95#[derive(Debug, Clone)]
99pub struct Event;
100
101impl Behaviour {
102 pub fn new(config: Config) -> Self {
104 debug!(
105 reserved_peers=?config.reserved_peers,
106 "Reserved peers protocol initialization...."
107 );
108
109 let peer_addresses = strip_peer_id(config.reserved_peers.clone());
110 let dialing_delay = Delay::new(config.dialing_interval);
111
112 let reserved_peers_state = peer_addresses
113 .into_iter()
114 .map(|(peer_id, address)| {
115 (
116 peer_id,
117 ReservedPeerState {
118 peer_id,
119 address,
120 connection_status: ConnectionStatus::NotConnected,
121 },
122 )
123 })
124 .collect();
125
126 Self {
127 config,
128 reserved_peers_state,
129 waker: None,
130 dialing_delay,
131 }
132 }
133
134 fn new_reserved_peers_handler(&self, peer_id: &PeerId) -> Handler {
136 Handler::new(self.reserved_peers_state.contains_key(peer_id))
137 }
138
139 fn wake(&self) {
140 if let Some(waker) = &self.waker {
141 waker.wake_by_ref()
142 }
143 }
144}
145
146impl NetworkBehaviour for Behaviour {
147 type ConnectionHandler = Handler;
148 type ToSwarm = Event;
149
150 fn handle_established_inbound_connection(
151 &mut self,
152 _: ConnectionId,
153 peer_id: PeerId,
154 _: &Multiaddr,
155 _: &Multiaddr,
156 ) -> Result<THandler<Self>, ConnectionDenied> {
157 Ok(self.new_reserved_peers_handler(&peer_id))
158 }
159
160 fn handle_established_outbound_connection(
161 &mut self,
162 _: ConnectionId,
163 peer_id: PeerId,
164 _: &Multiaddr,
165 _: Endpoint,
166 _: PortUse,
167 ) -> Result<THandler<Self>, ConnectionDenied> {
168 Ok(self.new_reserved_peers_handler(&peer_id))
169 }
170
171 fn on_swarm_event(&mut self, event: FromSwarm) {
172 match event {
173 FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
174 if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
175 state.connection_status = ConnectionStatus::Connected;
176
177 debug!(peer_id=%state.peer_id, "Reserved peer connected.");
178 self.wake();
179 }
180 }
181 FromSwarm::ConnectionClosed(ConnectionClosed {
182 peer_id,
183 remaining_established,
184 ..
185 }) => {
186 if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
187 if remaining_established == 0 {
188 state.connection_status = ConnectionStatus::NotConnected;
189
190 debug!(%state.peer_id, "Reserved peer disconnected.");
191 self.wake();
192 }
193 }
194 }
195 FromSwarm::DialFailure(DialFailure {
196 peer_id: Some(peer_id),
197 ..
198 }) => {
199 if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
200 if state.connection_status == ConnectionStatus::PendingConnection {
201 state.connection_status = ConnectionStatus::NotConnected;
202 };
203
204 debug!(peer_id=%state.peer_id, "Reserved peer dialing failed.");
205 self.wake();
206 }
207 }
208 _ => {}
209 }
210 }
211
212 fn on_connection_handler_event(
213 &mut self,
214 _: PeerId,
215 _: ConnectionId,
216 _: THandlerOutEvent<Self>,
217 ) {
218 }
219
220 fn poll(
221 &mut self,
222 cx: &mut Context<'_>,
223 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
224 match self.dialing_delay.poll_unpin(cx) {
226 Poll::Pending => {}
227 Poll::Ready(()) => {
228 self.dialing_delay.reset(self.config.dialing_interval);
229
230 for (_, state) in self.reserved_peers_state.iter_mut() {
231 trace!(?state, "Reserved peer state.");
232
233 if let ConnectionStatus::NotConnected = state.connection_status {
234 state.connection_status = ConnectionStatus::PendingConnection;
235
236 debug!(peer_id=%state.peer_id, "Dialing the reserved peer....");
237
238 let dial_opts =
239 DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]);
240
241 return Poll::Ready(ToSwarm::Dial {
242 opts: dial_opts.build(),
243 });
244 }
245 }
246 }
247 }
248
249 self.waker.replace(cx.waker().clone());
250 Poll::Pending
251 }
252}