cross_domain_message_gossip/
gossip_worker.rs1use futures::{FutureExt, StreamExt};
2use parity_scale_codec::{Decode, Encode};
3use parking_lot::{Mutex, RwLock};
4use sc_network::config::NonDefaultSetConfig;
5use sc_network::{NetworkPeers, NotificationService, PeerId};
6use sc_network_gossip::{
7 GossipEngine, MessageIntent, Syncing as GossipSyncing, ValidationResult, Validator,
8 ValidatorContext,
9};
10use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
11use sp_api::StorageProof;
12use sp_consensus::SyncOracle;
13use sp_core::twox_256;
14use sp_messenger::messages::{ChainId, ChannelId};
15use sp_runtime::traits::{Block as BlockT, Hash as HashT, HashingFor};
16use std::collections::{BTreeMap, HashSet};
17use std::future::poll_fn;
18use std::pin::pin;
19use std::sync::Arc;
20use subspace_runtime_primitives::BlockNumber;
21
22const LOG_TARGET: &str = "cross_chain_gossip_worker";
23const PROTOCOL_NAME: &str = "/subspace/cross-chain-messages";
24
25pub struct ChainMsg {
27 pub maybe_peer: Option<PeerId>,
28 pub data: MessageData,
29}
30
31pub type ChainSink = TracingUnboundedSender<ChainMsg>;
33type MessageHash = [u8; 32];
34
35#[derive(Debug, Encode, Decode)]
37pub struct ChannelUpdate {
38 pub src_chain_id: ChainId,
40 pub channel_id: ChannelId,
42 pub block_number: BlockNumber,
44 pub storage_proof: StorageProof,
46}
47
48#[derive(Debug, Encode, Decode)]
50pub enum MessageData {
51 Xdm(Vec<u8>),
53 ChannelUpdate(ChannelUpdate),
55}
56
57#[derive(Debug, Encode, Decode)]
59pub struct Message {
60 pub chain_id: ChainId,
61 pub data: MessageData,
62}
63
64pub struct GossipWorkerBuilder {
66 gossip_msg_stream: TracingUnboundedReceiver<Message>,
67 gossip_msg_sink: TracingUnboundedSender<Message>,
68 chain_sinks: BTreeMap<ChainId, ChainSink>,
69}
70
71impl GossipWorkerBuilder {
72 #[allow(clippy::new_without_default)]
74 pub fn new() -> Self {
75 let (gossip_msg_sink, gossip_msg_stream) =
76 tracing_unbounded("cross_chain_gossip_messages", 100);
77 Self {
78 gossip_msg_stream,
79 gossip_msg_sink,
80 chain_sinks: BTreeMap::new(),
81 }
82 }
83
84 pub fn push_chain_sink(&mut self, chain_id: ChainId, sink: ChainSink) {
86 self.chain_sinks.insert(chain_id, sink);
87 }
88
89 pub fn remove_chain_sink(&mut self, chain_id: &ChainId) -> Option<ChainSink> {
91 self.chain_sinks.remove(chain_id)
92 }
93
94 pub fn gossip_msg_sink(&self) -> TracingUnboundedSender<Message> {
96 self.gossip_msg_sink.clone()
97 }
98
99 pub fn build<Block, Network, GossipSync>(
101 self,
102 network: Network,
103 notification_service: Box<dyn NotificationService>,
104 sync: Arc<GossipSync>,
105 ) -> GossipWorker<Block, Network, GossipSync>
106 where
107 Block: BlockT,
108 Network: sc_network_gossip::Network<Block> + Send + Sync + Clone + 'static,
109 GossipSync: GossipSyncing<Block> + SyncOracle + Send + 'static,
110 {
111 let Self {
112 gossip_msg_stream,
113 chain_sinks,
114 ..
115 } = self;
116
117 let gossip_validator = Arc::new(GossipValidator::new(network.clone()));
118 let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
119 network,
120 sync.clone(),
121 notification_service,
122 PROTOCOL_NAME,
123 gossip_validator.clone(),
124 None,
125 )));
126
127 GossipWorker {
128 gossip_engine,
129 gossip_validator,
130 gossip_msg_stream,
131 chain_sinks,
132 sync_oracle: sync,
133 }
134 }
135}
136
137pub struct GossipWorker<Block: BlockT, Network, SO> {
140 gossip_engine: Arc<Mutex<GossipEngine<Block>>>,
141 gossip_validator: Arc<GossipValidator<Network>>,
142 gossip_msg_stream: TracingUnboundedReceiver<Message>,
143 chain_sinks: BTreeMap<ChainId, ChainSink>,
144 sync_oracle: Arc<SO>,
145}
146
147pub fn xdm_gossip_peers_set_config() -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
149 let (mut cfg, notification_service) = NonDefaultSetConfig::new(
150 PROTOCOL_NAME.into(),
151 Vec::new(),
152 5 * 1024 * 1024,
153 None,
154 Default::default(),
155 );
156 cfg.allow_non_reserved(25, 25);
157 (cfg, notification_service)
158}
159
160fn topic<Block: BlockT>() -> Block::Hash {
162 HashingFor::<Block>::hash(b"cross-chain-messages")
163}
164
165impl<Block: BlockT, Network, SO: SyncOracle> GossipWorker<Block, Network, SO> {
166 pub async fn run(mut self) {
168 let incoming_cross_chain_messages = pin!(self
169 .gossip_engine
170 .lock()
171 .messages_for(topic::<Block>())
172 .filter_map(|notification| async move {
173 Message::decode(&mut ¬ification.message[..])
174 .ok()
175 .map(|msg| (notification.sender, msg))
176 }));
177 let mut incoming_cross_chain_messages = incoming_cross_chain_messages.fuse();
178
179 loop {
180 let engine = self.gossip_engine.clone();
181 let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse();
182
183 futures::select! {
184 cross_chain_message = incoming_cross_chain_messages.next() => {
185 if let Some((maybe_peer, msg)) = cross_chain_message {
186 tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Network: {:?}", msg.chain_id);
187 self.handle_cross_chain_message(msg, maybe_peer);
188 }
189 },
190
191 msg = self.gossip_msg_stream.select_next_some() => {
192 tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id);
193 self.handle_cross_chain_message(msg, None);
194 }
195
196 _ = gossip_engine => {
197 tracing::error!(target: LOG_TARGET, "Gossip engine has terminated.");
198 return;
199 }
200 }
201 }
202 }
203
204 fn handle_cross_chain_message(&mut self, msg: Message, maybe_peer: Option<PeerId>) {
205 let encoded_msg = msg.encode();
207 self.gossip_validator.note_broadcast(&encoded_msg);
208 self.gossip_engine
209 .lock()
210 .gossip_message(topic::<Block>(), encoded_msg, false);
211
212 if self.sync_oracle.is_major_syncing() {
214 return;
215 }
216
217 let Message { chain_id, data } = msg;
218 let sink = match self.chain_sinks.get(&chain_id) {
219 Some(sink) => sink,
220 None => return,
221 };
222
223 if !sink.is_closed() && sink.unbounded_send(ChainMsg { data, maybe_peer }).is_ok() {
225 return;
226 }
227
228 tracing::error!(
231 target: LOG_TARGET,
232 "Failed to send incoming chain message: {:?}",
233 chain_id
234 );
235 self.chain_sinks.remove(&chain_id);
236 }
237}
238
239#[derive(Debug)]
241struct GossipValidator<Network> {
242 network: Network,
243 should_broadcast: RwLock<HashSet<MessageHash>>,
244}
245
246impl<Network> GossipValidator<Network> {
247 fn new(network: Network) -> Self {
248 Self {
249 network,
250 should_broadcast: Default::default(),
251 }
252 }
253
254 fn note_broadcast(&self, msg: &[u8]) {
255 let msg_hash = twox_256(msg);
256 let mut msg_set = self.should_broadcast.write();
257 msg_set.insert(msg_hash);
258 }
259
260 fn should_broadcast(&self, msg: &[u8]) -> bool {
261 let msg_hash = twox_256(msg);
262 let msg_set = self.should_broadcast.read();
263 msg_set.contains(&msg_hash)
264 }
265
266 fn note_broadcasted(&self, msg: &[u8]) {
267 let msg_hash = twox_256(msg);
268 let mut msg_set = self.should_broadcast.write();
269 msg_set.remove(&msg_hash);
270 }
271}
272
273impl<Block, Network> Validator<Block> for GossipValidator<Network>
274where
275 Block: BlockT,
276 Network: NetworkPeers + Send + Sync + 'static,
277{
278 fn validate(
279 &self,
280 _context: &mut dyn ValidatorContext<Block>,
281 sender: &PeerId,
282 mut data: &[u8],
283 ) -> ValidationResult<Block::Hash> {
284 match Message::decode(&mut data) {
285 Ok(_) => ValidationResult::ProcessAndKeep(topic::<Block>()),
286 Err(_) => {
287 self.network.report_peer(*sender, rep::GOSSIP_NOT_DECODABLE);
288 ValidationResult::Discard
289 }
290 }
291 }
292
293 fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
294 Box::new(move |_topic, data| !self.should_broadcast(data))
295 }
296
297 fn message_allowed<'a>(
298 &'a self,
299 ) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
300 Box::new(move |_who, _intent, _topic, data| {
301 let should_broadcast = self.should_broadcast(data);
302 if should_broadcast {
303 self.note_broadcasted(data)
304 }
305
306 should_broadcast
307 })
308 }
309}
310
311pub(crate) mod rep {
312 use sc_network::ReputationChange;
313
314 pub(crate) const GOSSIP_NOT_DECODABLE: ReputationChange =
316 ReputationChange::new_fatal("Cross chain message: not decodable");
317
318 pub(crate) const NOT_XDM: ReputationChange =
320 ReputationChange::new_fatal("Cross chain message: not XDM");
321}