cross_domain_message_gossip/
gossip_worker.rs

1use futures::{FutureExt, StreamExt};
2use parity_scale_codec::{Decode, Encode};
3use parking_lot::{Mutex, RwLock};
4use sc_network::config::NonDefaultSetConfig;
5use sc_network::{NetworkPeers, NotificationService, PeerId};
6use sc_network_gossip::{
7    GossipEngine, MessageIntent, Syncing as GossipSyncing, ValidationResult, Validator,
8    ValidatorContext,
9};
10use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
11use sp_api::StorageProof;
12use sp_consensus::SyncOracle;
13use sp_core::twox_256;
14use sp_messenger::messages::{ChainId, ChannelId};
15use sp_runtime::traits::{Block as BlockT, Hash as HashT, HashingFor};
16use std::collections::{BTreeMap, HashSet};
17use std::future::poll_fn;
18use std::pin::pin;
19use std::sync::Arc;
20use subspace_runtime_primitives::BlockNumber;
21
22const PROTOCOL_NAME: &str = "/subspace/cross-chain-messages";
23
24/// Encoded message with sender info if available.
25pub struct ChainMsg {
26    pub maybe_peer: Option<PeerId>,
27    pub data: MessageData,
28}
29
30/// Unbounded sender to send encoded message to listeners.
31pub type ChainSink = TracingUnboundedSender<ChainMsg>;
32type MessageHash = [u8; 32];
33
34/// Channel update message.
35#[derive(Debug, Encode, Decode)]
36pub struct ChannelUpdate {
37    /// Message is coming from src_chain.
38    pub src_chain_id: ChainId,
39    /// Channel id.
40    pub channel_id: ChannelId,
41    /// Block number at which storage proof was generated.
42    pub block_number: BlockNumber,
43    /// Storage proof of the channel on src_chain.
44    pub storage_proof: StorageProof,
45}
46
47/// A type of cross chain message
48#[derive(Debug, Encode, Decode)]
49pub enum MessageData {
50    /// Encoded XDM message
51    Xdm(Vec<u8>),
52    /// Encoded channel update message.
53    ChannelUpdate(ChannelUpdate),
54}
55
56/// A cross chain message with encoded data.
57#[derive(Debug, Encode, Decode)]
58pub struct Message {
59    pub chain_id: ChainId,
60    pub data: MessageData,
61}
62
63/// Gossip worker builder
64pub struct GossipWorkerBuilder {
65    gossip_msg_stream: TracingUnboundedReceiver<Message>,
66    gossip_msg_sink: TracingUnboundedSender<Message>,
67    chain_sinks: BTreeMap<ChainId, ChainSink>,
68}
69
70impl GossipWorkerBuilder {
71    /// Construct a gossip worker builder
72    #[allow(clippy::new_without_default)]
73    pub fn new() -> Self {
74        let (gossip_msg_sink, gossip_msg_stream) =
75            tracing_unbounded("cross_chain_gossip_messages", 100);
76        Self {
77            gossip_msg_stream,
78            gossip_msg_sink,
79            chain_sinks: BTreeMap::new(),
80        }
81    }
82
83    /// Collect the chain sink that will be used by the gossip message worker later.
84    pub fn push_chain_sink(&mut self, chain_id: ChainId, sink: ChainSink) {
85        self.chain_sinks.insert(chain_id, sink);
86    }
87
88    // Remove the chain sink
89    pub fn remove_chain_sink(&mut self, chain_id: &ChainId) -> Option<ChainSink> {
90        self.chain_sinks.remove(chain_id)
91    }
92
93    /// Get the gossip message sink
94    pub fn gossip_msg_sink(&self) -> TracingUnboundedSender<Message> {
95        self.gossip_msg_sink.clone()
96    }
97
98    /// Build gossip worker
99    pub fn build<Block, Network, GossipSync>(
100        self,
101        network: Network,
102        notification_service: Box<dyn NotificationService>,
103        sync: Arc<GossipSync>,
104    ) -> GossipWorker<Block, Network, GossipSync>
105    where
106        Block: BlockT,
107        Network: sc_network_gossip::Network<Block> + Send + Sync + Clone + 'static,
108        GossipSync: GossipSyncing<Block> + SyncOracle + Send + 'static,
109    {
110        let Self {
111            gossip_msg_stream,
112            chain_sinks,
113            ..
114        } = self;
115
116        let gossip_validator = Arc::new(GossipValidator::new(network.clone()));
117        let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
118            network,
119            sync.clone(),
120            notification_service,
121            PROTOCOL_NAME,
122            gossip_validator.clone(),
123            None,
124        )));
125
126        GossipWorker {
127            gossip_engine,
128            gossip_validator,
129            gossip_msg_stream,
130            chain_sinks,
131            sync_oracle: sync,
132        }
133    }
134}
135
136/// Gossip worker to gossip incoming and outgoing messages to other peers.
137/// Also, streams the decoded extrinsics to destination chain tx pool if available.
138pub struct GossipWorker<Block: BlockT, Network, SO> {
139    gossip_engine: Arc<Mutex<GossipEngine<Block>>>,
140    gossip_validator: Arc<GossipValidator<Network>>,
141    gossip_msg_stream: TracingUnboundedReceiver<Message>,
142    chain_sinks: BTreeMap<ChainId, ChainSink>,
143    sync_oracle: Arc<SO>,
144}
145
146/// Returns the network configuration for cross chain message gossip.
147pub fn xdm_gossip_peers_set_config() -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
148    let (mut cfg, notification_service) = NonDefaultSetConfig::new(
149        PROTOCOL_NAME.into(),
150        Vec::new(),
151        5 * 1024 * 1024,
152        None,
153        Default::default(),
154    );
155    cfg.allow_non_reserved(25, 25);
156    (cfg, notification_service)
157}
158
159/// Cross chain message topic.
160fn topic<Block: BlockT>() -> Block::Hash {
161    HashingFor::<Block>::hash(b"cross-chain-messages")
162}
163
164impl<Block: BlockT, Network, SO: SyncOracle> GossipWorker<Block, Network, SO> {
165    /// Starts the Gossip message worker.
166    pub async fn run(mut self) {
167        let incoming_cross_chain_messages = pin!(
168            self.gossip_engine
169                .lock()
170                .messages_for(topic::<Block>())
171                .filter_map(|notification| async move {
172                    Message::decode(&mut &notification.message[..])
173                        .ok()
174                        .map(|msg| (notification.sender, msg))
175                })
176        );
177        let mut incoming_cross_chain_messages = incoming_cross_chain_messages.fuse();
178
179        loop {
180            let engine = self.gossip_engine.clone();
181            let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse();
182
183            futures::select! {
184                cross_chain_message = incoming_cross_chain_messages.next() => {
185                    if let Some((maybe_peer, msg)) = cross_chain_message {
186                        tracing::debug!("Incoming cross chain message for chain from Network: {:?}", msg.chain_id);
187                        self.handle_cross_chain_message(msg, maybe_peer);
188                    }
189                },
190
191                msg = self.gossip_msg_stream.select_next_some() => {
192                    tracing::debug!("Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id);
193                    self.handle_cross_chain_message(msg, None);
194                }
195
196                _ = gossip_engine => {
197                    tracing::error!("Gossip engine has terminated.");
198                    return;
199                }
200            }
201        }
202    }
203
204    fn handle_cross_chain_message(&mut self, msg: Message, maybe_peer: Option<PeerId>) {
205        // mark and rebroadcast message
206        let encoded_msg = msg.encode();
207        self.gossip_validator.note_broadcast(&encoded_msg);
208        self.gossip_engine
209            .lock()
210            .gossip_message(topic::<Block>(), encoded_msg, false);
211
212        // Skip sending the message since the node unable to verify the message before synced
213        if self.sync_oracle.is_major_syncing() {
214            return;
215        }
216
217        let Message { chain_id, data } = msg;
218        let sink = match self.chain_sinks.get(&chain_id) {
219            Some(sink) => sink,
220            None => return,
221        };
222
223        // send the message to the open and ready channel
224        if !sink.is_closed() && sink.unbounded_send(ChainMsg { data, maybe_peer }).is_ok() {
225            return;
226        }
227
228        // sink is either closed or failed to send unbounded message
229        // consider it closed and remove the sink.
230        tracing::error!("Failed to send incoming chain message: {:?}", chain_id);
231        self.chain_sinks.remove(&chain_id);
232    }
233}
234
235/// Gossip validator to retain or clean up Gossiped messages.
236#[derive(Debug)]
237struct GossipValidator<Network> {
238    network: Network,
239    should_broadcast: RwLock<HashSet<MessageHash>>,
240}
241
242impl<Network> GossipValidator<Network> {
243    fn new(network: Network) -> Self {
244        Self {
245            network,
246            should_broadcast: Default::default(),
247        }
248    }
249
250    fn note_broadcast(&self, msg: &[u8]) {
251        let msg_hash = twox_256(msg);
252        let mut msg_set = self.should_broadcast.write();
253        msg_set.insert(msg_hash);
254    }
255
256    fn should_broadcast(&self, msg: &[u8]) -> bool {
257        let msg_hash = twox_256(msg);
258        let msg_set = self.should_broadcast.read();
259        msg_set.contains(&msg_hash)
260    }
261
262    fn note_broadcasted(&self, msg: &[u8]) {
263        let msg_hash = twox_256(msg);
264        let mut msg_set = self.should_broadcast.write();
265        msg_set.remove(&msg_hash);
266    }
267}
268
269impl<Block, Network> Validator<Block> for GossipValidator<Network>
270where
271    Block: BlockT,
272    Network: NetworkPeers + Send + Sync + 'static,
273{
274    fn validate(
275        &self,
276        _context: &mut dyn ValidatorContext<Block>,
277        sender: &PeerId,
278        mut data: &[u8],
279    ) -> ValidationResult<Block::Hash> {
280        match Message::decode(&mut data) {
281            Ok(_) => ValidationResult::ProcessAndKeep(topic::<Block>()),
282            Err(_) => {
283                self.network.report_peer(*sender, rep::GOSSIP_NOT_DECODABLE);
284                ValidationResult::Discard
285            }
286        }
287    }
288
289    fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
290        Box::new(move |_topic, data| !self.should_broadcast(data))
291    }
292
293    fn message_allowed<'a>(
294        &'a self,
295    ) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
296        Box::new(move |_who, _intent, _topic, data| {
297            let should_broadcast = self.should_broadcast(data);
298            if should_broadcast {
299                self.note_broadcasted(data)
300            }
301
302            should_broadcast
303        })
304    }
305}
306
307pub(crate) mod rep {
308    use sc_network::ReputationChange;
309
310    /// Reputation change when a peer sends us a gossip message that can't be decoded.
311    pub(crate) const GOSSIP_NOT_DECODABLE: ReputationChange =
312        ReputationChange::new_fatal("Cross chain message: not decodable");
313
314    /// Reputation change when a peer sends us a non XDM message
315    pub(crate) const NOT_XDM: ReputationChange =
316        ReputationChange::new_fatal("Cross chain message: not XDM");
317}