sc_subspace_block_relay/
utils.rsuse crate::types::RequestResponseErr;
use codec::{Decode, Encode};
use futures::channel::oneshot;
use parking_lot::Mutex;
use sc_network::request_responses::IfDisconnected;
use sc_network::types::ProtocolName;
use sc_network::{NetworkRequest, PeerId};
use std::collections::HashMap;
use std::sync::Arc;
use substrate_prometheus_endpoint::{
register, Counter, CounterVec, Opts, PrometheusError, Registry, U64,
};
type NetworkRequestService = Arc<dyn NetworkRequest + Send + Sync + 'static>;
pub struct NetworkWrapper {
network: Mutex<Option<NetworkRequestService>>,
}
impl Default for NetworkWrapper {
fn default() -> Self {
Self {
network: Mutex::new(None),
}
}
}
impl NetworkWrapper {
pub fn set(&self, network: NetworkRequestService) {
*self.network.lock() = Some(network);
}
pub(crate) fn network_peer_handle(
&self,
protocol_name: ProtocolName,
who: PeerId,
) -> Result<NetworkPeerHandle, RequestResponseErr> {
match self.network.lock().as_ref().cloned() {
Some(network) => Ok(NetworkPeerHandle::new(protocol_name, who, network)),
None => Err(RequestResponseErr::NetworkUninitialized),
}
}
}
#[derive(Clone)]
pub(crate) struct NetworkPeerHandle {
protocol_name: ProtocolName,
who: PeerId,
network: NetworkRequestService,
}
impl NetworkPeerHandle {
fn new(protocol_name: ProtocolName, who: PeerId, network: NetworkRequestService) -> Self {
Self {
protocol_name,
who,
network,
}
}
pub(crate) async fn request<Request, Response>(
&self,
request: Request,
) -> Result<Response, RequestResponseErr>
where
Request: Encode,
Response: Decode,
{
let (tx, rx) = oneshot::channel();
self.network.start_request(
self.who,
self.protocol_name.clone(),
request.encode(),
None,
tx,
IfDisconnected::ImmediateError,
);
let (response_bytes, _protocol_name) = rx
.await
.map_err(|_cancelled| RequestResponseErr::Canceled)?
.map_err(RequestResponseErr::RequestFailure)?;
let response_len = response_bytes.len();
Response::decode(&mut response_bytes.as_ref())
.map_err(|err| RequestResponseErr::DecodeFailed { response_len, err })
}
}
pub(crate) struct RelayCounter(Option<Counter<U64>>);
impl RelayCounter {
pub(crate) fn new(
name: &str,
help: &str,
registry: Option<&Registry>,
) -> Result<Self, PrometheusError> {
let counter = if let Some(registry) = registry {
Some(register(Counter::new(name, help)?, registry)?)
} else {
None
};
Ok(Self(counter))
}
pub(crate) fn inc(&self) {
if let Some(counter) = self.0.as_ref() {
counter.inc()
}
}
}
pub(crate) struct RelayCounterVec(Option<CounterVec<U64>>);
impl RelayCounterVec {
pub(crate) fn new(
name: &str,
help: &str,
labels: &[&str],
registry: Option<&Registry>,
) -> Result<Self, PrometheusError> {
let counter_vec = if let Some(registry) = registry {
Some(register(
CounterVec::new(Opts::new(name, help), labels)?,
registry,
)?)
} else {
None
};
Ok(Self(counter_vec))
}
pub(crate) fn inc(&self, label: &str, label_value: &str) {
if let Some(counter) = self.0.as_ref() {
let mut labels = HashMap::new();
labels.insert(label, label_value);
counter.with(&labels).inc()
}
}
pub(crate) fn inc_by(&self, label: &str, label_value: &str, v: u64) {
if let Some(counter) = self.0.as_ref() {
let mut labels = HashMap::new();
labels.insert(label, label_value);
counter.with(&labels).inc_by(v)
}
}
}