sc_consensus_subspace/
notification.rs1use parking_lot::Mutex;
4use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
5use std::fmt;
6use std::sync::Arc;
7
8type NotificationStream<T> = TracingUnboundedReceiver<T>;
10
11type SharedNotificationSenders<T> = Arc<Mutex<Vec<TracingUnboundedSender<T>>>>;
14
15#[derive(Clone)]
17pub(crate) struct SubspaceNotificationSender<T: Clone + Send + Sync + fmt::Debug + 'static> {
18 subscribers: SharedNotificationSenders<T>,
19}
20
21impl<T: Clone + Send + Sync + fmt::Debug + 'static> SubspaceNotificationSender<T> {
22 fn new(subscribers: SharedNotificationSenders<T>) -> Self {
24 Self { subscribers }
25 }
26
27 pub(crate) fn notify<F>(&self, get_value: F)
29 where
30 F: FnOnce() -> T,
31 {
32 let mut subscribers = self.subscribers.lock();
33
34 subscribers.retain(|subscriber| !subscriber.is_closed());
36
37 if !subscribers.is_empty() {
38 let value = get_value();
39 subscribers.retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
40 }
41 }
42}
43
44#[derive(Clone)]
46pub struct SubspaceNotificationStream<T: Clone + Send + Sync + fmt::Debug + 'static> {
47 stream_name: &'static str,
48 subscribers: SharedNotificationSenders<T>,
49}
50
51impl<T: Clone + Send + Sync + fmt::Debug + 'static> SubspaceNotificationStream<T> {
52 fn new(stream_name: &'static str, subscribers: SharedNotificationSenders<T>) -> Self {
56 Self {
57 stream_name,
58 subscribers,
59 }
60 }
61
62 pub fn subscribe(&self) -> NotificationStream<T> {
64 let (sender, receiver) = tracing_unbounded(self.stream_name, 100);
65 self.subscribers.lock().push(sender);
66 receiver
67 }
68}
69
70pub(crate) fn channel<T>(
72 stream_name: &'static str,
73) -> (SubspaceNotificationSender<T>, SubspaceNotificationStream<T>)
74where
75 T: Clone + Send + Sync + fmt::Debug + 'static,
76{
77 let subscribers = Arc::new(Mutex::new(Vec::new()));
78 let receiver = SubspaceNotificationStream::new(stream_name, subscribers.clone());
79 let sender = SubspaceNotificationSender::new(subscribers);
80 (sender, receiver)
81}