sc_consensus_subspace/
notification.rsuse parking_lot::Mutex;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::fmt;
use std::sync::Arc;
type NotificationStream<T> = TracingUnboundedReceiver<T>;
type SharedNotificationSenders<T> = Arc<Mutex<Vec<TracingUnboundedSender<T>>>>;
#[derive(Clone)]
pub(crate) struct SubspaceNotificationSender<T: Clone + Send + Sync + fmt::Debug + 'static> {
subscribers: SharedNotificationSenders<T>,
}
impl<T: Clone + Send + Sync + fmt::Debug + 'static> SubspaceNotificationSender<T> {
fn new(subscribers: SharedNotificationSenders<T>) -> Self {
Self { subscribers }
}
pub(crate) fn notify<F>(&self, get_value: F)
where
F: FnOnce() -> T,
{
let mut subscribers = self.subscribers.lock();
subscribers.retain(|subscriber| !subscriber.is_closed());
if !subscribers.is_empty() {
let value = get_value();
subscribers.retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
}
}
}
#[derive(Clone)]
pub struct SubspaceNotificationStream<T: Clone + Send + Sync + fmt::Debug + 'static> {
stream_name: &'static str,
subscribers: SharedNotificationSenders<T>,
}
impl<T: Clone + Send + Sync + fmt::Debug + 'static> SubspaceNotificationStream<T> {
fn new(stream_name: &'static str, subscribers: SharedNotificationSenders<T>) -> Self {
Self {
stream_name,
subscribers,
}
}
pub fn subscribe(&self) -> NotificationStream<T> {
let (sender, receiver) = tracing_unbounded(self.stream_name, 100);
self.subscribers.lock().push(sender);
receiver
}
}
pub(crate) fn channel<T>(
stream_name: &'static str,
) -> (SubspaceNotificationSender<T>, SubspaceNotificationStream<T>)
where
T: Clone + Send + Sync + fmt::Debug + 'static,
{
let subscribers = Arc::new(Mutex::new(Vec::new()));
let receiver = SubspaceNotificationStream::new(stream_name, subscribers.clone());
let sender = SubspaceNotificationSender::new(subscribers);
(sender, receiver)
}