cross_domain_message_gossip/
gossip_worker.rs1use futures::{FutureExt, StreamExt};
2use parity_scale_codec::{Decode, Encode};
3use parking_lot::{Mutex, RwLock};
4use sc_network::config::NonDefaultSetConfig;
5use sc_network::{NetworkPeers, NotificationService, PeerId};
6use sc_network_gossip::{
7 GossipEngine, MessageIntent, Syncing as GossipSyncing, ValidationResult, Validator,
8 ValidatorContext,
9};
10use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
11use sp_api::StorageProof;
12use sp_consensus::SyncOracle;
13use sp_core::twox_256;
14use sp_messenger::messages::{ChainId, ChannelId};
15use sp_runtime::traits::{Block as BlockT, Hash as HashT, HashingFor};
16use std::collections::{BTreeMap, HashSet};
17use std::future::poll_fn;
18use std::pin::pin;
19use std::sync::Arc;
20use subspace_runtime_primitives::BlockNumber;
21
22const PROTOCOL_NAME: &str = "/subspace/cross-chain-messages";
23
24pub struct ChainMsg {
26 pub maybe_peer: Option<PeerId>,
27 pub data: MessageData,
28}
29
30pub type ChainSink = TracingUnboundedSender<ChainMsg>;
32type MessageHash = [u8; 32];
33
34#[derive(Debug, Encode, Decode)]
36pub struct ChannelUpdate {
37 pub src_chain_id: ChainId,
39 pub channel_id: ChannelId,
41 pub block_number: BlockNumber,
43 pub storage_proof: StorageProof,
45}
46
47#[derive(Debug, Encode, Decode)]
49pub enum MessageData {
50 Xdm(Vec<u8>),
52 ChannelUpdate(ChannelUpdate),
54}
55
56#[derive(Debug, Encode, Decode)]
58pub struct Message {
59 pub chain_id: ChainId,
60 pub data: MessageData,
61}
62
63pub struct GossipWorkerBuilder {
65 gossip_msg_stream: TracingUnboundedReceiver<Message>,
66 gossip_msg_sink: TracingUnboundedSender<Message>,
67 chain_sinks: BTreeMap<ChainId, ChainSink>,
68}
69
70impl GossipWorkerBuilder {
71 #[allow(clippy::new_without_default)]
73 pub fn new() -> Self {
74 let (gossip_msg_sink, gossip_msg_stream) =
75 tracing_unbounded("cross_chain_gossip_messages", 100);
76 Self {
77 gossip_msg_stream,
78 gossip_msg_sink,
79 chain_sinks: BTreeMap::new(),
80 }
81 }
82
83 pub fn push_chain_sink(&mut self, chain_id: ChainId, sink: ChainSink) {
85 self.chain_sinks.insert(chain_id, sink);
86 }
87
88 pub fn remove_chain_sink(&mut self, chain_id: &ChainId) -> Option<ChainSink> {
90 self.chain_sinks.remove(chain_id)
91 }
92
93 pub fn gossip_msg_sink(&self) -> TracingUnboundedSender<Message> {
95 self.gossip_msg_sink.clone()
96 }
97
98 pub fn build<Block, Network, GossipSync>(
100 self,
101 network: Network,
102 notification_service: Box<dyn NotificationService>,
103 sync: Arc<GossipSync>,
104 ) -> GossipWorker<Block, Network, GossipSync>
105 where
106 Block: BlockT,
107 Network: sc_network_gossip::Network<Block> + Send + Sync + Clone + 'static,
108 GossipSync: GossipSyncing<Block> + SyncOracle + Send + 'static,
109 {
110 let Self {
111 gossip_msg_stream,
112 chain_sinks,
113 ..
114 } = self;
115
116 let gossip_validator = Arc::new(GossipValidator::new(network.clone()));
117 let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
118 network,
119 sync.clone(),
120 notification_service,
121 PROTOCOL_NAME,
122 gossip_validator.clone(),
123 None,
124 )));
125
126 GossipWorker {
127 gossip_engine,
128 gossip_validator,
129 gossip_msg_stream,
130 chain_sinks,
131 sync_oracle: sync,
132 }
133 }
134}
135
136pub struct GossipWorker<Block: BlockT, Network, SO> {
139 gossip_engine: Arc<Mutex<GossipEngine<Block>>>,
140 gossip_validator: Arc<GossipValidator<Network>>,
141 gossip_msg_stream: TracingUnboundedReceiver<Message>,
142 chain_sinks: BTreeMap<ChainId, ChainSink>,
143 sync_oracle: Arc<SO>,
144}
145
146pub fn xdm_gossip_peers_set_config() -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
148 let (mut cfg, notification_service) = NonDefaultSetConfig::new(
149 PROTOCOL_NAME.into(),
150 Vec::new(),
151 5 * 1024 * 1024,
152 None,
153 Default::default(),
154 );
155 cfg.allow_non_reserved(25, 25);
156 (cfg, notification_service)
157}
158
159fn topic<Block: BlockT>() -> Block::Hash {
161 HashingFor::<Block>::hash(b"cross-chain-messages")
162}
163
164impl<Block: BlockT, Network, SO: SyncOracle> GossipWorker<Block, Network, SO> {
165 pub async fn run(mut self) {
167 let incoming_cross_chain_messages = pin!(
168 self.gossip_engine
169 .lock()
170 .messages_for(topic::<Block>())
171 .filter_map(|notification| async move {
172 Message::decode(&mut ¬ification.message[..])
173 .ok()
174 .map(|msg| (notification.sender, msg))
175 })
176 );
177 let mut incoming_cross_chain_messages = incoming_cross_chain_messages.fuse();
178
179 loop {
180 let engine = self.gossip_engine.clone();
181 let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse();
182
183 futures::select! {
184 cross_chain_message = incoming_cross_chain_messages.next() => {
185 if let Some((maybe_peer, msg)) = cross_chain_message {
186 tracing::debug!("Incoming cross chain message for chain from Network: {:?}", msg.chain_id);
187 self.handle_cross_chain_message(msg, maybe_peer);
188 }
189 },
190
191 msg = self.gossip_msg_stream.select_next_some() => {
192 tracing::debug!("Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id);
193 self.handle_cross_chain_message(msg, None);
194 }
195
196 _ = gossip_engine => {
197 tracing::error!("Gossip engine has terminated.");
198 return;
199 }
200 }
201 }
202 }
203
204 fn handle_cross_chain_message(&mut self, msg: Message, maybe_peer: Option<PeerId>) {
205 let encoded_msg = msg.encode();
207 self.gossip_validator.note_broadcast(&encoded_msg);
208 self.gossip_engine
209 .lock()
210 .gossip_message(topic::<Block>(), encoded_msg, false);
211
212 if self.sync_oracle.is_major_syncing() {
214 return;
215 }
216
217 let Message { chain_id, data } = msg;
218 let sink = match self.chain_sinks.get(&chain_id) {
219 Some(sink) => sink,
220 None => return,
221 };
222
223 if !sink.is_closed() && sink.unbounded_send(ChainMsg { data, maybe_peer }).is_ok() {
225 return;
226 }
227
228 tracing::error!("Failed to send incoming chain message: {:?}", chain_id);
231 self.chain_sinks.remove(&chain_id);
232 }
233}
234
235#[derive(Debug)]
237struct GossipValidator<Network> {
238 network: Network,
239 should_broadcast: RwLock<HashSet<MessageHash>>,
240}
241
242impl<Network> GossipValidator<Network> {
243 fn new(network: Network) -> Self {
244 Self {
245 network,
246 should_broadcast: Default::default(),
247 }
248 }
249
250 fn note_broadcast(&self, msg: &[u8]) {
251 let msg_hash = twox_256(msg);
252 let mut msg_set = self.should_broadcast.write();
253 msg_set.insert(msg_hash);
254 }
255
256 fn should_broadcast(&self, msg: &[u8]) -> bool {
257 let msg_hash = twox_256(msg);
258 let msg_set = self.should_broadcast.read();
259 msg_set.contains(&msg_hash)
260 }
261
262 fn note_broadcasted(&self, msg: &[u8]) {
263 let msg_hash = twox_256(msg);
264 let mut msg_set = self.should_broadcast.write();
265 msg_set.remove(&msg_hash);
266 }
267}
268
269impl<Block, Network> Validator<Block> for GossipValidator<Network>
270where
271 Block: BlockT,
272 Network: NetworkPeers + Send + Sync + 'static,
273{
274 fn validate(
275 &self,
276 _context: &mut dyn ValidatorContext<Block>,
277 sender: &PeerId,
278 mut data: &[u8],
279 ) -> ValidationResult<Block::Hash> {
280 match Message::decode(&mut data) {
281 Ok(_) => ValidationResult::ProcessAndKeep(topic::<Block>()),
282 Err(_) => {
283 self.network.report_peer(*sender, rep::GOSSIP_NOT_DECODABLE);
284 ValidationResult::Discard
285 }
286 }
287 }
288
289 fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
290 Box::new(move |_topic, data| !self.should_broadcast(data))
291 }
292
293 fn message_allowed<'a>(
294 &'a self,
295 ) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
296 Box::new(move |_who, _intent, _topic, data| {
297 let should_broadcast = self.should_broadcast(data);
298 if should_broadcast {
299 self.note_broadcasted(data)
300 }
301
302 should_broadcast
303 })
304 }
305}
306
307pub(crate) mod rep {
308 use sc_network::ReputationChange;
309
310 pub(crate) const GOSSIP_NOT_DECODABLE: ReputationChange =
312 ReputationChange::new_fatal("Cross chain message: not decodable");
313
314 pub(crate) const NOT_XDM: ReputationChange =
316 ReputationChange::new_fatal("Cross chain message: not XDM");
317}