use crate::protocols::request_response::request_response_factory::RequestFailure;
use crate::utils::multihash::Multihash;
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::Handler;
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
use libp2p::gossipsub::{PublishError, Sha256Topic, SubscriptionError};
use libp2p::kad::{PeerRecord, RecordKey};
use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::OwnedSemaphorePermit;
#[derive(Clone, Debug)]
pub enum PeerDiscovered {
UnroutablePeer {
peer_id: PeerId,
},
RoutablePeer {
peer_id: PeerId,
address: Multiaddr,
},
}
impl PeerDiscovered {
pub fn peer_id(&self) -> PeerId {
match self {
PeerDiscovered::UnroutablePeer { peer_id } => *peer_id,
PeerDiscovered::RoutablePeer { peer_id, .. } => *peer_id,
}
}
}
#[derive(Debug)]
pub(crate) struct CreatedSubscription {
pub(crate) subscription_id: usize,
pub(crate) receiver: mpsc::UnboundedReceiver<Bytes>,
}
#[derive(Debug)]
pub(crate) enum Command {
GetValue {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerRecord>,
permit: OwnedSemaphorePermit,
},
PutValue {
key: Multihash,
value: Vec<u8>,
result_sender: mpsc::UnboundedSender<()>,
permit: OwnedSemaphorePermit,
},
Subscribe {
topic: Sha256Topic,
result_sender: oneshot::Sender<Result<CreatedSubscription, SubscriptionError>>,
},
Unsubscribe {
topic: Sha256Topic,
subscription_id: usize,
},
Publish {
topic: Sha256Topic,
message: Vec<u8>,
result_sender: oneshot::Sender<Result<(), PublishError>>,
},
GetClosestPeers {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: Option<OwnedSemaphorePermit>,
},
GetClosestLocalPeers {
key: Multihash,
source: Option<PeerId>,
result_sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
},
GenericRequest {
peer_id: PeerId,
addresses: Vec<Multiaddr>,
protocol_name: &'static str,
request: Vec<u8>,
result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
GetProviders {
key: RecordKey,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: Option<OwnedSemaphorePermit>,
},
BanPeer {
peer_id: PeerId,
},
Dial {
address: Multiaddr,
},
ConnectedPeers {
result_sender: oneshot::Sender<Vec<PeerId>>,
},
ConnectedServers {
result_sender: oneshot::Sender<Vec<PeerId>>,
},
Bootstrap {
result_sender: Option<mpsc::UnboundedSender<()>>,
},
}
#[derive(Default, Debug)]
pub(crate) struct Handlers {
pub(crate) new_listener: Handler<Multiaddr>,
pub(crate) num_established_peer_connections_change: Handler<usize>,
pub(crate) connected_peer: Handler<PeerId>,
pub(crate) disconnected_peer: Handler<PeerId>,
pub(crate) peer_discovered: Handler<PeerDiscovered>,
}
#[derive(Debug)]
pub(crate) struct Shared {
pub(crate) handlers: Handlers,
pub(crate) id: PeerId,
pub(crate) listeners: Mutex<Vec<Multiaddr>>,
pub(crate) external_addresses: Mutex<Vec<Multiaddr>>,
pub(crate) num_established_peer_connections: Arc<AtomicUsize>,
pub(crate) command_sender: mpsc::Sender<Command>,
pub(crate) rate_limiter: RateLimiter,
}
impl Shared {
pub(crate) fn new(
id: PeerId,
command_sender: mpsc::Sender<Command>,
rate_limiter: RateLimiter,
) -> Self {
Self {
handlers: Handlers::default(),
id,
listeners: Mutex::default(),
external_addresses: Mutex::default(),
num_established_peer_connections: Arc::new(AtomicUsize::new(0)),
command_sender,
rate_limiter,
}
}
}