sc_subspace_block_relay/
utils.rs

1//! Common utils.
2
3use crate::types::RequestResponseErr;
4use futures::channel::oneshot;
5use parity_scale_codec::{Decode, Encode};
6use parking_lot::Mutex;
7use sc_network::request_responses::IfDisconnected;
8use sc_network::types::ProtocolName;
9use sc_network::{NetworkRequest, PeerId};
10use std::collections::HashMap;
11use std::sync::Arc;
12use substrate_prometheus_endpoint::{
13    register, Counter, CounterVec, Opts, PrometheusError, Registry, U64,
14};
15
16type NetworkRequestService = Arc<dyn NetworkRequest + Send + Sync + 'static>;
17
18/// Wrapper to work around the circular dependency in substrate.
19///
20/// `build_network()` requires the block relay to be passed in, which internally needs the network
21/// handle. `set()` is used to fill in the network after the network is created.
22pub struct NetworkWrapper {
23    network: Mutex<Option<NetworkRequestService>>,
24}
25
26impl Default for NetworkWrapper {
27    fn default() -> Self {
28        Self {
29            network: Mutex::new(None),
30        }
31    }
32}
33
34impl NetworkWrapper {
35    pub fn set(&self, network: NetworkRequestService) {
36        *self.network.lock() = Some(network);
37    }
38
39    pub(crate) fn network_peer_handle(
40        &self,
41        protocol_name: ProtocolName,
42        who: PeerId,
43    ) -> Result<NetworkPeerHandle, RequestResponseErr> {
44        match self.network.lock().as_ref().cloned() {
45            Some(network) => Ok(NetworkPeerHandle::new(protocol_name, who, network)),
46            None => Err(RequestResponseErr::NetworkUninitialized),
47        }
48    }
49}
50
51/// Network handle that allows making requests to specific peer and protocol.
52/// `Request` is the format of the request message sent on the wire.
53#[derive(Clone)]
54pub(crate) struct NetworkPeerHandle {
55    protocol_name: ProtocolName,
56    who: PeerId,
57    network: NetworkRequestService,
58}
59
60impl NetworkPeerHandle {
61    fn new(protocol_name: ProtocolName, who: PeerId, network: NetworkRequestService) -> Self {
62        Self {
63            protocol_name,
64            who,
65            network,
66        }
67    }
68
69    /// Performs the request
70    pub(crate) async fn request<Request, Response>(
71        &self,
72        request: Request,
73    ) -> Result<Response, RequestResponseErr>
74    where
75        Request: Encode,
76        Response: Decode,
77    {
78        let (tx, rx) = oneshot::channel();
79        self.network.start_request(
80            self.who,
81            self.protocol_name.clone(),
82            request.encode(),
83            None,
84            tx,
85            IfDisconnected::ImmediateError,
86        );
87
88        let (response_bytes, _protocol_name) = rx
89            .await
90            .map_err(|_cancelled| RequestResponseErr::Canceled)?
91            .map_err(RequestResponseErr::RequestFailure)?;
92
93        let response_len = response_bytes.len();
94        Response::decode(&mut response_bytes.as_ref())
95            .map_err(|err| RequestResponseErr::DecodeFailed { response_len, err })
96    }
97}
98
99/// Convenience wrapper around prometheus counter, which can be optional.
100pub(crate) struct RelayCounter(Option<Counter<U64>>);
101
102impl RelayCounter {
103    /// Creates the counter.
104    pub(crate) fn new(
105        name: &str,
106        help: &str,
107        registry: Option<&Registry>,
108    ) -> Result<Self, PrometheusError> {
109        let counter = if let Some(registry) = registry {
110            Some(register(Counter::new(name, help)?, registry)?)
111        } else {
112            None
113        };
114        Ok(Self(counter))
115    }
116
117    /// Increments the counter.
118    pub(crate) fn inc(&self) {
119        if let Some(counter) = self.0.as_ref() {
120            counter.inc()
121        }
122    }
123}
124
125/// Convenience wrapper around prometheus counter vec, which can be optional.
126pub(crate) struct RelayCounterVec(Option<CounterVec<U64>>);
127
128impl RelayCounterVec {
129    /// Creates the counter vec.
130    pub(crate) fn new(
131        name: &str,
132        help: &str,
133        labels: &[&str],
134        registry: Option<&Registry>,
135    ) -> Result<Self, PrometheusError> {
136        let counter_vec = if let Some(registry) = registry {
137            Some(register(
138                CounterVec::new(Opts::new(name, help), labels)?,
139                registry,
140            )?)
141        } else {
142            None
143        };
144        Ok(Self(counter_vec))
145    }
146
147    /// Increments the counter.
148    pub(crate) fn inc(&self, label: &str, label_value: &str) {
149        if let Some(counter) = self.0.as_ref() {
150            let mut labels = HashMap::new();
151            labels.insert(label, label_value);
152            counter.with(&labels).inc()
153        }
154    }
155
156    /// Increments the counter by specified value.
157    pub(crate) fn inc_by(&self, label: &str, label_value: &str, v: u64) {
158        if let Some(counter) = self.0.as_ref() {
159            let mut labels = HashMap::new();
160            labels.insert(label, label_value);
161            counter.with(&labels).inc_by(v)
162        }
163    }
164}