sc_consensus_subspace/
notification.rs

1//! Utility module for handling Subspace client notifications.
2
3use parking_lot::Mutex;
4use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
5use std::fmt;
6use std::sync::Arc;
7
8// Stream of notifications returned when subscribing.
9type NotificationStream<T> = TracingUnboundedReceiver<T>;
10
11// Collection of channel sending endpoints shared with the receiver side so they can register
12// themselves.
13type SharedNotificationSenders<T> = Arc<Mutex<Vec<TracingUnboundedSender<T>>>>;
14
15/// The sending half of the Subspace notification channel(s).
16#[derive(Clone)]
17pub(crate) struct SubspaceNotificationSender<T: Clone + Send + Sync + fmt::Debug + 'static> {
18    subscribers: SharedNotificationSenders<T>,
19}
20
21impl<T: Clone + Send + Sync + fmt::Debug + 'static> SubspaceNotificationSender<T> {
22    /// The `subscribers` should be shared with a corresponding `SharedNotificationSenders`.
23    fn new(subscribers: SharedNotificationSenders<T>) -> Self {
24        Self { subscribers }
25    }
26
27    /// Send out a notification to all subscribers.
28    pub(crate) fn notify<F>(&self, get_value: F)
29    where
30        F: FnOnce() -> T,
31    {
32        let mut subscribers = self.subscribers.lock();
33
34        // do an initial prune on closed subscriptions
35        subscribers.retain(|subscriber| !subscriber.is_closed());
36
37        if !subscribers.is_empty() {
38            let value = get_value();
39            subscribers.retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
40        }
41    }
42}
43
44/// The receiving half of the Subspace notification channel.
45#[derive(Clone)]
46pub struct SubspaceNotificationStream<T: Clone + Send + Sync + fmt::Debug + 'static> {
47    stream_name: &'static str,
48    subscribers: SharedNotificationSenders<T>,
49}
50
51impl<T: Clone + Send + Sync + fmt::Debug + 'static> SubspaceNotificationStream<T> {
52    /// Create a new receiver of notifications.
53    ///
54    /// The `subscribers` should be shared with a corresponding `SubspaceNotificationSender`.
55    fn new(stream_name: &'static str, subscribers: SharedNotificationSenders<T>) -> Self {
56        Self {
57            stream_name,
58            subscribers,
59        }
60    }
61
62    /// Subscribe to a channel through which notifications are sent.
63    pub fn subscribe(&self) -> NotificationStream<T> {
64        let (sender, receiver) = tracing_unbounded(self.stream_name, 100);
65        self.subscribers.lock().push(sender);
66        receiver
67    }
68}
69
70/// Creates a new pair of receiver and sender of notifications.
71pub(crate) fn channel<T>(
72    stream_name: &'static str,
73) -> (SubspaceNotificationSender<T>, SubspaceNotificationStream<T>)
74where
75    T: Clone + Send + Sync + fmt::Debug + 'static,
76{
77    let subscribers = Arc::new(Mutex::new(Vec::new()));
78    let receiver = SubspaceNotificationStream::new(stream_name, subscribers.clone());
79    let sender = SubspaceNotificationSender::new(subscribers);
80    (sender, receiver)
81}