subspace_networking/
shared.rs

1//! Data structures shared between node and node runner, facilitating exchange and creation of
2//! queries, subscriptions, various events and shared information.
3
4use crate::protocols::request_response::request_response_factory::RequestFailure;
5use crate::utils::multihash::Multihash;
6use crate::utils::rate_limiter::RateLimiter;
7use crate::utils::Handler;
8use bytes::Bytes;
9use futures::channel::{mpsc, oneshot};
10use libp2p::gossipsub::{PublishError, Sha256Topic, SubscriptionError};
11use libp2p::kad::{PeerRecord, RecordKey};
12use libp2p::{Multiaddr, PeerId};
13use parking_lot::Mutex;
14use std::sync::atomic::AtomicUsize;
15use std::sync::Arc;
16use tokio::sync::OwnedSemaphorePermit;
17
18/// Represents Kademlia events (RoutablePeer, PendingRoutablePeer, UnroutablePeer).
19#[derive(Clone, Debug)]
20pub enum PeerDiscovered {
21    /// Kademlia's unroutable peer event.
22    UnroutablePeer {
23        /// Peer ID
24        peer_id: PeerId,
25    },
26
27    /// Kademlia's routable or pending routable peer event.
28    RoutablePeer {
29        /// Peer ID
30        peer_id: PeerId,
31        /// Peer address
32        address: Multiaddr,
33    },
34}
35
36impl PeerDiscovered {
37    /// Extracts peer ID from event.
38    pub fn peer_id(&self) -> PeerId {
39        match self {
40            PeerDiscovered::UnroutablePeer { peer_id } => *peer_id,
41            PeerDiscovered::RoutablePeer { peer_id, .. } => *peer_id,
42        }
43    }
44}
45
46#[derive(Debug)]
47pub(crate) struct CreatedSubscription {
48    /// Subscription ID to be used for unsubscribing.
49    pub(crate) subscription_id: usize,
50    /// Receiver side of the channel with new messages.
51    pub(crate) receiver: mpsc::UnboundedReceiver<Bytes>,
52}
53
54#[derive(Debug)]
55pub(crate) enum Command {
56    GetValue {
57        key: Multihash,
58        result_sender: mpsc::UnboundedSender<PeerRecord>,
59        permit: OwnedSemaphorePermit,
60    },
61    PutValue {
62        key: Multihash,
63        value: Vec<u8>,
64        result_sender: mpsc::UnboundedSender<()>,
65        permit: OwnedSemaphorePermit,
66    },
67    Subscribe {
68        topic: Sha256Topic,
69        result_sender: oneshot::Sender<Result<CreatedSubscription, SubscriptionError>>,
70    },
71    Unsubscribe {
72        topic: Sha256Topic,
73        subscription_id: usize,
74    },
75    Publish {
76        topic: Sha256Topic,
77        message: Vec<u8>,
78        result_sender: oneshot::Sender<Result<(), PublishError>>,
79    },
80    GetClosestPeers {
81        key: Multihash,
82        result_sender: mpsc::UnboundedSender<PeerId>,
83        permit: Option<OwnedSemaphorePermit>,
84    },
85    GetClosestLocalPeers {
86        key: Multihash,
87        source: Option<PeerId>,
88        result_sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
89    },
90    GenericRequest {
91        peer_id: PeerId,
92        addresses: Vec<Multiaddr>,
93        protocol_name: &'static str,
94        request: Vec<u8>,
95        result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
96    },
97    GetProviders {
98        key: RecordKey,
99        result_sender: mpsc::UnboundedSender<PeerId>,
100        permit: Option<OwnedSemaphorePermit>,
101    },
102    BanPeer {
103        peer_id: PeerId,
104    },
105    Dial {
106        address: Multiaddr,
107    },
108    ConnectedPeers {
109        result_sender: oneshot::Sender<Vec<PeerId>>,
110    },
111    ConnectedServers {
112        result_sender: oneshot::Sender<Vec<PeerId>>,
113    },
114    Bootstrap {
115        // No result sender means background async bootstrapping
116        result_sender: Option<mpsc::UnboundedSender<()>>,
117    },
118}
119
120#[derive(Default, Debug)]
121pub(crate) struct Handlers {
122    pub(crate) new_listener: Handler<Multiaddr>,
123    pub(crate) num_established_peer_connections_change: Handler<usize>,
124    pub(crate) connected_peer: Handler<PeerId>,
125    pub(crate) disconnected_peer: Handler<PeerId>,
126    pub(crate) peer_discovered: Handler<PeerDiscovered>,
127}
128
129#[derive(Debug)]
130pub(crate) struct Shared {
131    pub(crate) handlers: Handlers,
132    pub(crate) id: PeerId,
133    /// Addresses on which node is listening for incoming requests.
134    pub(crate) listeners: Mutex<Vec<Multiaddr>>,
135    pub(crate) external_addresses: Mutex<Vec<Multiaddr>>,
136    pub(crate) num_established_peer_connections: Arc<AtomicUsize>,
137    /// Sender end of the channel for sending commands to the swarm.
138    pub(crate) command_sender: mpsc::Sender<Command>,
139    pub(crate) rate_limiter: RateLimiter,
140}
141
142impl Shared {
143    pub(crate) fn new(
144        id: PeerId,
145        command_sender: mpsc::Sender<Command>,
146        rate_limiter: RateLimiter,
147    ) -> Self {
148        Self {
149            handlers: Handlers::default(),
150            id,
151            listeners: Mutex::default(),
152            external_addresses: Mutex::default(),
153            num_established_peer_connections: Arc::new(AtomicUsize::new(0)),
154            command_sender,
155            rate_limiter,
156        }
157    }
158}