subspace_networking/
shared.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//! Data structures shared between node and node runner, facilitating exchange and creation of
//! queries, subscriptions, various events and shared information.

use crate::protocols::request_response::request_response_factory::RequestFailure;
use crate::utils::multihash::Multihash;
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::Handler;
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
use libp2p::gossipsub::{PublishError, Sha256Topic, SubscriptionError};
use libp2p::kad::{PeerRecord, RecordKey};
use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::OwnedSemaphorePermit;

/// Represents Kademlia events (RoutablePeer, PendingRoutablePeer, UnroutablePeer).
#[derive(Clone, Debug)]
pub enum PeerDiscovered {
    /// Kademlia's unroutable peer event.
    UnroutablePeer {
        /// Peer ID
        peer_id: PeerId,
    },

    /// Kademlia's routable or pending routable peer event.
    RoutablePeer {
        /// Peer ID
        peer_id: PeerId,
        /// Peer address
        address: Multiaddr,
    },
}

impl PeerDiscovered {
    /// Extracts peer ID from event.
    pub fn peer_id(&self) -> PeerId {
        match self {
            PeerDiscovered::UnroutablePeer { peer_id } => *peer_id,
            PeerDiscovered::RoutablePeer { peer_id, .. } => *peer_id,
        }
    }
}

#[derive(Debug)]
pub(crate) struct CreatedSubscription {
    /// Subscription ID to be used for unsubscribing.
    pub(crate) subscription_id: usize,
    /// Receiver side of the channel with new messages.
    pub(crate) receiver: mpsc::UnboundedReceiver<Bytes>,
}

#[derive(Debug)]
pub(crate) enum Command {
    GetValue {
        key: Multihash,
        result_sender: mpsc::UnboundedSender<PeerRecord>,
        permit: OwnedSemaphorePermit,
    },
    PutValue {
        key: Multihash,
        value: Vec<u8>,
        result_sender: mpsc::UnboundedSender<()>,
        permit: OwnedSemaphorePermit,
    },
    Subscribe {
        topic: Sha256Topic,
        result_sender: oneshot::Sender<Result<CreatedSubscription, SubscriptionError>>,
    },
    Unsubscribe {
        topic: Sha256Topic,
        subscription_id: usize,
    },
    Publish {
        topic: Sha256Topic,
        message: Vec<u8>,
        result_sender: oneshot::Sender<Result<(), PublishError>>,
    },
    GetClosestPeers {
        key: Multihash,
        result_sender: mpsc::UnboundedSender<PeerId>,
        permit: Option<OwnedSemaphorePermit>,
    },
    GetClosestLocalPeers {
        key: Multihash,
        source: Option<PeerId>,
        result_sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
    },
    GenericRequest {
        peer_id: PeerId,
        addresses: Vec<Multiaddr>,
        protocol_name: &'static str,
        request: Vec<u8>,
        result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
    },
    GetProviders {
        key: RecordKey,
        result_sender: mpsc::UnboundedSender<PeerId>,
        permit: Option<OwnedSemaphorePermit>,
    },
    BanPeer {
        peer_id: PeerId,
    },
    Dial {
        address: Multiaddr,
    },
    ConnectedPeers {
        result_sender: oneshot::Sender<Vec<PeerId>>,
    },
    Bootstrap {
        // No result sender means background async bootstrapping
        result_sender: Option<mpsc::UnboundedSender<()>>,
    },
}

#[derive(Default, Debug)]
pub(crate) struct Handlers {
    pub(crate) new_listener: Handler<Multiaddr>,
    pub(crate) num_established_peer_connections_change: Handler<usize>,
    pub(crate) connected_peer: Handler<PeerId>,
    pub(crate) disconnected_peer: Handler<PeerId>,
    pub(crate) peer_discovered: Handler<PeerDiscovered>,
}

#[derive(Debug)]
pub(crate) struct Shared {
    pub(crate) handlers: Handlers,
    pub(crate) id: PeerId,
    /// Addresses on which node is listening for incoming requests.
    pub(crate) listeners: Mutex<Vec<Multiaddr>>,
    pub(crate) external_addresses: Mutex<Vec<Multiaddr>>,
    pub(crate) num_established_peer_connections: Arc<AtomicUsize>,
    /// Sender end of the channel for sending commands to the swarm.
    pub(crate) command_sender: mpsc::Sender<Command>,
    pub(crate) rate_limiter: RateLimiter,
}

impl Shared {
    pub(crate) fn new(
        id: PeerId,
        command_sender: mpsc::Sender<Command>,
        rate_limiter: RateLimiter,
    ) -> Self {
        Self {
            handlers: Handlers::default(),
            id,
            listeners: Mutex::default(),
            external_addresses: Mutex::default(),
            num_established_peer_connections: Arc::new(AtomicUsize::new(0)),
            command_sender,
            rate_limiter,
        }
    }
}