cross_domain_message_gossip/
gossip_worker.rs

1use futures::{FutureExt, StreamExt};
2use parity_scale_codec::{Decode, Encode};
3use parking_lot::{Mutex, RwLock};
4use sc_network::config::NonDefaultSetConfig;
5use sc_network::{NetworkPeers, NotificationService, PeerId};
6use sc_network_gossip::{
7    GossipEngine, MessageIntent, Syncing as GossipSyncing, ValidationResult, Validator,
8    ValidatorContext,
9};
10use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
11use sp_api::StorageProof;
12use sp_consensus::SyncOracle;
13use sp_core::twox_256;
14use sp_messenger::messages::{ChainId, ChannelId};
15use sp_runtime::traits::{Block as BlockT, Hash as HashT, HashingFor};
16use std::collections::{BTreeMap, HashSet};
17use std::future::poll_fn;
18use std::pin::pin;
19use std::sync::Arc;
20use subspace_runtime_primitives::BlockNumber;
21
22const LOG_TARGET: &str = "cross_chain_gossip_worker";
23const PROTOCOL_NAME: &str = "/subspace/cross-chain-messages";
24
25/// Encoded message with sender info if available.
26pub struct ChainMsg {
27    pub maybe_peer: Option<PeerId>,
28    pub data: MessageData,
29}
30
31/// Unbounded sender to send encoded message to listeners.
32pub type ChainSink = TracingUnboundedSender<ChainMsg>;
33type MessageHash = [u8; 32];
34
35/// Channel update message.
36#[derive(Debug, Encode, Decode)]
37pub struct ChannelUpdate {
38    /// Message is coming from src_chain.
39    pub src_chain_id: ChainId,
40    /// Channel id.
41    pub channel_id: ChannelId,
42    /// Block number at which storage proof was generated.
43    pub block_number: BlockNumber,
44    /// Storage proof of the channel on src_chain.
45    pub storage_proof: StorageProof,
46}
47
48/// A type of cross chain message
49#[derive(Debug, Encode, Decode)]
50pub enum MessageData {
51    /// Encoded XDM message
52    Xdm(Vec<u8>),
53    /// Encoded channel update message.
54    ChannelUpdate(ChannelUpdate),
55}
56
57/// A cross chain message with encoded data.
58#[derive(Debug, Encode, Decode)]
59pub struct Message {
60    pub chain_id: ChainId,
61    pub data: MessageData,
62}
63
64/// Gossip worker builder
65pub struct GossipWorkerBuilder {
66    gossip_msg_stream: TracingUnboundedReceiver<Message>,
67    gossip_msg_sink: TracingUnboundedSender<Message>,
68    chain_sinks: BTreeMap<ChainId, ChainSink>,
69}
70
71impl GossipWorkerBuilder {
72    /// Construct a gossip worker builder
73    #[allow(clippy::new_without_default)]
74    pub fn new() -> Self {
75        let (gossip_msg_sink, gossip_msg_stream) =
76            tracing_unbounded("cross_chain_gossip_messages", 100);
77        Self {
78            gossip_msg_stream,
79            gossip_msg_sink,
80            chain_sinks: BTreeMap::new(),
81        }
82    }
83
84    /// Collect the chain sink that will be used by the gossip message worker later.
85    pub fn push_chain_sink(&mut self, chain_id: ChainId, sink: ChainSink) {
86        self.chain_sinks.insert(chain_id, sink);
87    }
88
89    // Remove the chain sink
90    pub fn remove_chain_sink(&mut self, chain_id: &ChainId) -> Option<ChainSink> {
91        self.chain_sinks.remove(chain_id)
92    }
93
94    /// Get the gossip message sink
95    pub fn gossip_msg_sink(&self) -> TracingUnboundedSender<Message> {
96        self.gossip_msg_sink.clone()
97    }
98
99    /// Build gossip worker
100    pub fn build<Block, Network, GossipSync>(
101        self,
102        network: Network,
103        notification_service: Box<dyn NotificationService>,
104        sync: Arc<GossipSync>,
105    ) -> GossipWorker<Block, Network, GossipSync>
106    where
107        Block: BlockT,
108        Network: sc_network_gossip::Network<Block> + Send + Sync + Clone + 'static,
109        GossipSync: GossipSyncing<Block> + SyncOracle + Send + 'static,
110    {
111        let Self {
112            gossip_msg_stream,
113            chain_sinks,
114            ..
115        } = self;
116
117        let gossip_validator = Arc::new(GossipValidator::new(network.clone()));
118        let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
119            network,
120            sync.clone(),
121            notification_service,
122            PROTOCOL_NAME,
123            gossip_validator.clone(),
124            None,
125        )));
126
127        GossipWorker {
128            gossip_engine,
129            gossip_validator,
130            gossip_msg_stream,
131            chain_sinks,
132            sync_oracle: sync,
133        }
134    }
135}
136
137/// Gossip worker to gossip incoming and outgoing messages to other peers.
138/// Also, streams the decoded extrinsics to destination chain tx pool if available.
139pub struct GossipWorker<Block: BlockT, Network, SO> {
140    gossip_engine: Arc<Mutex<GossipEngine<Block>>>,
141    gossip_validator: Arc<GossipValidator<Network>>,
142    gossip_msg_stream: TracingUnboundedReceiver<Message>,
143    chain_sinks: BTreeMap<ChainId, ChainSink>,
144    sync_oracle: Arc<SO>,
145}
146
147/// Returns the network configuration for cross chain message gossip.
148pub fn xdm_gossip_peers_set_config() -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
149    let (mut cfg, notification_service) = NonDefaultSetConfig::new(
150        PROTOCOL_NAME.into(),
151        Vec::new(),
152        5 * 1024 * 1024,
153        None,
154        Default::default(),
155    );
156    cfg.allow_non_reserved(25, 25);
157    (cfg, notification_service)
158}
159
160/// Cross chain message topic.
161fn topic<Block: BlockT>() -> Block::Hash {
162    HashingFor::<Block>::hash(b"cross-chain-messages")
163}
164
165impl<Block: BlockT, Network, SO: SyncOracle> GossipWorker<Block, Network, SO> {
166    /// Starts the Gossip message worker.
167    pub async fn run(mut self) {
168        let incoming_cross_chain_messages = pin!(self
169            .gossip_engine
170            .lock()
171            .messages_for(topic::<Block>())
172            .filter_map(|notification| async move {
173                Message::decode(&mut &notification.message[..])
174                    .ok()
175                    .map(|msg| (notification.sender, msg))
176            }));
177        let mut incoming_cross_chain_messages = incoming_cross_chain_messages.fuse();
178
179        loop {
180            let engine = self.gossip_engine.clone();
181            let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse();
182
183            futures::select! {
184                cross_chain_message = incoming_cross_chain_messages.next() => {
185                    if let Some((maybe_peer, msg)) = cross_chain_message {
186                        tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Network: {:?}", msg.chain_id);
187                        self.handle_cross_chain_message(msg, maybe_peer);
188                    }
189                },
190
191                msg = self.gossip_msg_stream.select_next_some() => {
192                    tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id);
193                    self.handle_cross_chain_message(msg, None);
194                }
195
196                _ = gossip_engine => {
197                    tracing::error!(target: LOG_TARGET, "Gossip engine has terminated.");
198                    return;
199                }
200            }
201        }
202    }
203
204    fn handle_cross_chain_message(&mut self, msg: Message, maybe_peer: Option<PeerId>) {
205        // mark and rebroadcast message
206        let encoded_msg = msg.encode();
207        self.gossip_validator.note_broadcast(&encoded_msg);
208        self.gossip_engine
209            .lock()
210            .gossip_message(topic::<Block>(), encoded_msg, false);
211
212        // Skip sending the message since the node unable to verify the message before synced
213        if self.sync_oracle.is_major_syncing() {
214            return;
215        }
216
217        let Message { chain_id, data } = msg;
218        let sink = match self.chain_sinks.get(&chain_id) {
219            Some(sink) => sink,
220            None => return,
221        };
222
223        // send the message to the open and ready channel
224        if !sink.is_closed() && sink.unbounded_send(ChainMsg { data, maybe_peer }).is_ok() {
225            return;
226        }
227
228        // sink is either closed or failed to send unbounded message
229        // consider it closed and remove the sink.
230        tracing::error!(
231            target: LOG_TARGET,
232            "Failed to send incoming chain message: {:?}",
233            chain_id
234        );
235        self.chain_sinks.remove(&chain_id);
236    }
237}
238
239/// Gossip validator to retain or clean up Gossiped messages.
240#[derive(Debug)]
241struct GossipValidator<Network> {
242    network: Network,
243    should_broadcast: RwLock<HashSet<MessageHash>>,
244}
245
246impl<Network> GossipValidator<Network> {
247    fn new(network: Network) -> Self {
248        Self {
249            network,
250            should_broadcast: Default::default(),
251        }
252    }
253
254    fn note_broadcast(&self, msg: &[u8]) {
255        let msg_hash = twox_256(msg);
256        let mut msg_set = self.should_broadcast.write();
257        msg_set.insert(msg_hash);
258    }
259
260    fn should_broadcast(&self, msg: &[u8]) -> bool {
261        let msg_hash = twox_256(msg);
262        let msg_set = self.should_broadcast.read();
263        msg_set.contains(&msg_hash)
264    }
265
266    fn note_broadcasted(&self, msg: &[u8]) {
267        let msg_hash = twox_256(msg);
268        let mut msg_set = self.should_broadcast.write();
269        msg_set.remove(&msg_hash);
270    }
271}
272
273impl<Block, Network> Validator<Block> for GossipValidator<Network>
274where
275    Block: BlockT,
276    Network: NetworkPeers + Send + Sync + 'static,
277{
278    fn validate(
279        &self,
280        _context: &mut dyn ValidatorContext<Block>,
281        sender: &PeerId,
282        mut data: &[u8],
283    ) -> ValidationResult<Block::Hash> {
284        match Message::decode(&mut data) {
285            Ok(_) => ValidationResult::ProcessAndKeep(topic::<Block>()),
286            Err(_) => {
287                self.network.report_peer(*sender, rep::GOSSIP_NOT_DECODABLE);
288                ValidationResult::Discard
289            }
290        }
291    }
292
293    fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
294        Box::new(move |_topic, data| !self.should_broadcast(data))
295    }
296
297    fn message_allowed<'a>(
298        &'a self,
299    ) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
300        Box::new(move |_who, _intent, _topic, data| {
301            let should_broadcast = self.should_broadcast(data);
302            if should_broadcast {
303                self.note_broadcasted(data)
304            }
305
306            should_broadcast
307        })
308    }
309}
310
311pub(crate) mod rep {
312    use sc_network::ReputationChange;
313
314    /// Reputation change when a peer sends us a gossip message that can't be decoded.
315    pub(crate) const GOSSIP_NOT_DECODABLE: ReputationChange =
316        ReputationChange::new_fatal("Cross chain message: not decodable");
317
318    /// Reputation change when a peer sends us a non XDM message
319    pub(crate) const NOT_XDM: ReputationChange =
320        ReputationChange::new_fatal("Cross chain message: not XDM");
321}