sc_subspace_block_relay/
utils.rs1use crate::types::RequestResponseErr;
4use futures::channel::oneshot;
5use parity_scale_codec::{Decode, Encode};
6use parking_lot::Mutex;
7use sc_network::request_responses::IfDisconnected;
8use sc_network::types::ProtocolName;
9use sc_network::{NetworkRequest, PeerId};
10use std::collections::HashMap;
11use std::sync::Arc;
12use substrate_prometheus_endpoint::{
13 register, Counter, CounterVec, Opts, PrometheusError, Registry, U64,
14};
15
16type NetworkRequestService = Arc<dyn NetworkRequest + Send + Sync + 'static>;
17
18pub struct NetworkWrapper {
23 network: Mutex<Option<NetworkRequestService>>,
24}
25
26impl Default for NetworkWrapper {
27 fn default() -> Self {
28 Self {
29 network: Mutex::new(None),
30 }
31 }
32}
33
34impl NetworkWrapper {
35 pub fn set(&self, network: NetworkRequestService) {
36 *self.network.lock() = Some(network);
37 }
38
39 pub(crate) fn network_peer_handle(
40 &self,
41 protocol_name: ProtocolName,
42 who: PeerId,
43 ) -> Result<NetworkPeerHandle, RequestResponseErr> {
44 match self.network.lock().as_ref().cloned() {
45 Some(network) => Ok(NetworkPeerHandle::new(protocol_name, who, network)),
46 None => Err(RequestResponseErr::NetworkUninitialized),
47 }
48 }
49}
50
51#[derive(Clone)]
54pub(crate) struct NetworkPeerHandle {
55 protocol_name: ProtocolName,
56 who: PeerId,
57 network: NetworkRequestService,
58}
59
60impl NetworkPeerHandle {
61 fn new(protocol_name: ProtocolName, who: PeerId, network: NetworkRequestService) -> Self {
62 Self {
63 protocol_name,
64 who,
65 network,
66 }
67 }
68
69 pub(crate) async fn request<Request, Response>(
71 &self,
72 request: Request,
73 ) -> Result<Response, RequestResponseErr>
74 where
75 Request: Encode,
76 Response: Decode,
77 {
78 let (tx, rx) = oneshot::channel();
79 self.network.start_request(
80 self.who,
81 self.protocol_name.clone(),
82 request.encode(),
83 None,
84 tx,
85 IfDisconnected::ImmediateError,
86 );
87
88 let (response_bytes, _protocol_name) = rx
89 .await
90 .map_err(|_cancelled| RequestResponseErr::Canceled)?
91 .map_err(RequestResponseErr::RequestFailure)?;
92
93 let response_len = response_bytes.len();
94 Response::decode(&mut response_bytes.as_ref())
95 .map_err(|err| RequestResponseErr::DecodeFailed { response_len, err })
96 }
97}
98
99pub(crate) struct RelayCounter(Option<Counter<U64>>);
101
102impl RelayCounter {
103 pub(crate) fn new(
105 name: &str,
106 help: &str,
107 registry: Option<&Registry>,
108 ) -> Result<Self, PrometheusError> {
109 let counter = if let Some(registry) = registry {
110 Some(register(Counter::new(name, help)?, registry)?)
111 } else {
112 None
113 };
114 Ok(Self(counter))
115 }
116
117 pub(crate) fn inc(&self) {
119 if let Some(counter) = self.0.as_ref() {
120 counter.inc()
121 }
122 }
123}
124
125pub(crate) struct RelayCounterVec(Option<CounterVec<U64>>);
127
128impl RelayCounterVec {
129 pub(crate) fn new(
131 name: &str,
132 help: &str,
133 labels: &[&str],
134 registry: Option<&Registry>,
135 ) -> Result<Self, PrometheusError> {
136 let counter_vec = if let Some(registry) = registry {
137 Some(register(
138 CounterVec::new(Opts::new(name, help), labels)?,
139 registry,
140 )?)
141 } else {
142 None
143 };
144 Ok(Self(counter_vec))
145 }
146
147 pub(crate) fn inc(&self, label: &str, label_value: &str) {
149 if let Some(counter) = self.0.as_ref() {
150 let mut labels = HashMap::new();
151 labels.insert(label, label_value);
152 counter.with(&labels).inc()
153 }
154 }
155
156 pub(crate) fn inc_by(&self, label: &str, label_value: &str, v: u64) {
158 if let Some(counter) = self.0.as_ref() {
159 let mut labels = HashMap::new();
160 labels.insert(label, label_value);
161 counter.with(&labels).inc_by(v)
162 }
163 }
164}