1use crate::protocols::request_response::request_response_factory::RequestFailure;
5use crate::utils::multihash::Multihash;
6use crate::utils::rate_limiter::RateLimiter;
7use crate::utils::Handler;
8use bytes::Bytes;
9use futures::channel::{mpsc, oneshot};
10use libp2p::gossipsub::{PublishError, Sha256Topic, SubscriptionError};
11use libp2p::kad::{PeerRecord, RecordKey};
12use libp2p::{Multiaddr, PeerId};
13use parking_lot::Mutex;
14use std::sync::atomic::AtomicUsize;
15use std::sync::Arc;
16use tokio::sync::OwnedSemaphorePermit;
17
18#[derive(Clone, Debug)]
20pub enum PeerDiscovered {
21 UnroutablePeer {
23 peer_id: PeerId,
25 },
26
27 RoutablePeer {
29 peer_id: PeerId,
31 address: Multiaddr,
33 },
34}
35
36impl PeerDiscovered {
37 pub fn peer_id(&self) -> PeerId {
39 match self {
40 PeerDiscovered::UnroutablePeer { peer_id } => *peer_id,
41 PeerDiscovered::RoutablePeer { peer_id, .. } => *peer_id,
42 }
43 }
44}
45
46#[derive(Debug)]
47pub(crate) struct CreatedSubscription {
48 pub(crate) subscription_id: usize,
50 pub(crate) receiver: mpsc::UnboundedReceiver<Bytes>,
52}
53
54#[derive(Debug)]
55pub(crate) enum Command {
56 GetValue {
57 key: Multihash,
58 result_sender: mpsc::UnboundedSender<PeerRecord>,
59 permit: OwnedSemaphorePermit,
60 },
61 PutValue {
62 key: Multihash,
63 value: Vec<u8>,
64 result_sender: mpsc::UnboundedSender<()>,
65 permit: OwnedSemaphorePermit,
66 },
67 Subscribe {
68 topic: Sha256Topic,
69 result_sender: oneshot::Sender<Result<CreatedSubscription, SubscriptionError>>,
70 },
71 Unsubscribe {
72 topic: Sha256Topic,
73 subscription_id: usize,
74 },
75 Publish {
76 topic: Sha256Topic,
77 message: Vec<u8>,
78 result_sender: oneshot::Sender<Result<(), PublishError>>,
79 },
80 GetClosestPeers {
81 key: Multihash,
82 result_sender: mpsc::UnboundedSender<PeerId>,
83 permit: Option<OwnedSemaphorePermit>,
84 },
85 GetClosestLocalPeers {
86 key: Multihash,
87 source: Option<PeerId>,
88 result_sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
89 },
90 GenericRequest {
91 peer_id: PeerId,
92 addresses: Vec<Multiaddr>,
93 protocol_name: &'static str,
94 request: Vec<u8>,
95 result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
96 },
97 GetProviders {
98 key: RecordKey,
99 result_sender: mpsc::UnboundedSender<PeerId>,
100 permit: Option<OwnedSemaphorePermit>,
101 },
102 BanPeer {
103 peer_id: PeerId,
104 },
105 Dial {
106 address: Multiaddr,
107 },
108 ConnectedPeers {
109 result_sender: oneshot::Sender<Vec<PeerId>>,
110 },
111 ConnectedServers {
112 result_sender: oneshot::Sender<Vec<PeerId>>,
113 },
114 Bootstrap {
115 result_sender: Option<mpsc::UnboundedSender<()>>,
117 },
118}
119
120#[derive(Default, Debug)]
121pub(crate) struct Handlers {
122 pub(crate) new_listener: Handler<Multiaddr>,
123 pub(crate) num_established_peer_connections_change: Handler<usize>,
124 pub(crate) connected_peer: Handler<PeerId>,
125 pub(crate) disconnected_peer: Handler<PeerId>,
126 pub(crate) peer_discovered: Handler<PeerDiscovered>,
127}
128
129#[derive(Debug)]
130pub(crate) struct Shared {
131 pub(crate) handlers: Handlers,
132 pub(crate) id: PeerId,
133 pub(crate) listeners: Mutex<Vec<Multiaddr>>,
135 pub(crate) external_addresses: Mutex<Vec<Multiaddr>>,
136 pub(crate) num_established_peer_connections: Arc<AtomicUsize>,
137 pub(crate) command_sender: mpsc::Sender<Command>,
139 pub(crate) rate_limiter: RateLimiter,
140}
141
142impl Shared {
143 pub(crate) fn new(
144 id: PeerId,
145 command_sender: mpsc::Sender<Command>,
146 rate_limiter: RateLimiter,
147 ) -> Self {
148 Self {
149 handlers: Handlers::default(),
150 id,
151 listeners: Mutex::default(),
152 external_addresses: Mutex::default(),
153 num_established_peer_connections: Arc::new(AtomicUsize::new(0)),
154 command_sender,
155 rate_limiter,
156 }
157 }
158}