1#![feature(let_chains)]
2#![warn(rust_2018_idioms)]
3pub mod worker;
7
8use async_channel::TrySendError;
9use cross_domain_message_gossip::{
10 can_allow_xdm_submission, get_channel_state, get_xdm_processed_block_number,
11 set_xdm_message_processed_at, BlockId, Message as GossipMessage,
12 MessageData as GossipMessageData, RELAYER_PREFIX,
13};
14use parity_scale_codec::{Codec, Encode};
15use sc_client_api::{AuxStore, HeaderBackend, ProofProvider, StorageProof};
16use sc_utils::mpsc::TracingUnboundedSender;
17use sp_api::{ApiRef, ProvideRuntimeApi};
18use sp_core::{H256, U256};
19use sp_domains::DomainsApi;
20use sp_messenger::messages::{
21 BlockMessageWithStorageKey, BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, Proof,
22};
23use sp_messenger::{MessengerApi, RelayerApi, XdmId, MAX_FUTURE_ALLOWED_NONCES};
24use sp_mmr_primitives::MmrApi;
25use sp_runtime::traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One};
26use sp_runtime::ArithmeticError;
27use sp_subspace_mmr::ConsensusChainMmrLeafProof;
28use std::marker::PhantomData;
29use std::sync::Arc;
30use subspace_runtime_primitives::BlockHashFor;
31use tracing::log;
32
33const LOG_TARGET: &str = "message::relayer";
35
36struct Relayer<Client, Block>(PhantomData<(Client, Block)>);
38
39pub type GossipMessageSink = TracingUnboundedSender<GossipMessage>;
41
42#[derive(Debug)]
44pub enum Error {
45 ConstructStorageProof,
47 FailedToConstructExtrinsic,
49 FetchAssignedMessages,
51 StoreRelayedBlockNumber,
53 UnableToFetchDomainId,
55 UnableToFetchRelayConfirmationDepth,
57 BlockchainError(Box<sp_blockchain::Error>),
59 ArithmeticError(ArithmeticError),
61 ApiError(sp_api::ApiError),
63 UnableToSubmitCrossDomainMessage(TrySendError<GossipMessage>),
65 InvalidChainId,
67 MmrProof(sp_mmr_primitives::Error),
69 MmrLeafMissing,
71 MissingBlockHeader,
73 MissingBlockHash,
75}
76
77impl From<sp_blockchain::Error> for Error {
78 #[inline]
79 fn from(err: sp_blockchain::Error) -> Self {
80 Error::BlockchainError(Box::new(err))
81 }
82}
83
84impl From<ArithmeticError> for Error {
85 #[inline]
86 fn from(err: ArithmeticError) -> Self {
87 Error::ArithmeticError(err)
88 }
89}
90
91impl From<sp_api::ApiError> for Error {
92 #[inline]
93 fn from(err: sp_api::ApiError) -> Self {
94 Error::ApiError(err)
95 }
96}
97
98type ProofOf<Block> = Proof<NumberFor<Block>, BlockHashFor<Block>, H256>;
99
100fn construct_consensus_mmr_proof<Client, Block>(
101 consensus_chain_client: &Arc<Client>,
102 dst_chain_id: ChainId,
103 (block_number, block_hash): (NumberFor<Block>, Block::Hash),
104) -> Result<ConsensusChainMmrLeafProof<NumberFor<Block>, Block::Hash, H256>, Error>
105where
106 Block: BlockT,
107 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
108 Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
109{
110 let api = consensus_chain_client.runtime_api();
111 let best_hash = consensus_chain_client.info().best_hash;
112 let best_number = consensus_chain_client.info().best_number;
113
114 let (prove_at_number, prove_at_hash) = match dst_chain_id {
115 ChainId::Consensus => (best_number, best_hash),
120 ChainId::Domain(_) => (block_number, block_hash),
125 };
126
127 let (mut leaves, proof) = api
128 .generate_proof(best_hash, vec![block_number], Some(prove_at_number))
129 .map_err(Error::ApiError)?
130 .map_err(Error::MmrProof)?;
131 debug_assert!(leaves.len() == 1, "should always be of length 1");
132 let leaf = leaves.pop().ok_or(Error::MmrLeafMissing)?;
133
134 Ok(ConsensusChainMmrLeafProof {
135 consensus_block_number: prove_at_number,
136 consensus_block_hash: prove_at_hash,
137 opaque_mmr_leaf: leaf,
138 proof,
139 })
140}
141
142fn construct_cross_chain_message_and_submit<CNumber, CHash, Submitter, ProofConstructor>(
143 msgs: Vec<BlockMessageWithStorageKey>,
144 proof_constructor: ProofConstructor,
145 submitter: Submitter,
146) -> Result<(), Error>
147where
148 Submitter: Fn(CrossDomainMessage<CNumber, CHash, H256>) -> Result<(), Error>,
149 ProofConstructor: Fn(&[u8], ChainId) -> Result<Proof<CNumber, CHash, H256>, Error>,
150{
151 for msg in msgs {
152 let proof = match proof_constructor(&msg.storage_key, msg.dst_chain_id) {
153 Ok(proof) => proof,
154 Err(err) => {
155 tracing::error!(
156 target: LOG_TARGET,
157 "Failed to construct storage proof for message: {:?} bound to chain: {:?} with error: {:?}",
158 (msg.channel_id, msg.nonce),
159 msg.dst_chain_id,
160 err
161 );
162 continue;
163 }
164 };
165 let msg = CrossDomainMessage::from_relayer_msg_with_proof(msg, proof);
166 let (dst_domain, msg_id) = (msg.dst_chain_id, (msg.channel_id, msg.nonce));
167 if let Err(err) = submitter(msg) {
168 tracing::error!(
169 target: LOG_TARGET,
170 ?err,
171 "Failed to submit message: {msg_id:?} to domain: {dst_domain:?}",
172 );
173 }
174 }
175
176 Ok(())
177}
178
179fn gossip_outbox_message<Block, Client, CNumber, CHash>(
181 client: &Arc<Client>,
182 msg: CrossDomainMessage<CNumber, CHash, H256>,
183 sink: &GossipMessageSink,
184) -> Result<(), Error>
185where
186 Block: BlockT,
187 CNumber: Codec,
188 CHash: Codec,
189 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
190 Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
191{
192 let best_hash = client.info().best_hash;
193 let dst_chain_id = msg.dst_chain_id;
194 let ext = client
195 .runtime_api()
196 .outbox_message_unsigned(best_hash, msg)?
197 .ok_or(Error::FailedToConstructExtrinsic)?;
198
199 sink.unbounded_send(GossipMessage {
200 chain_id: dst_chain_id,
201 data: GossipMessageData::Xdm(ext.encode()),
202 })
203 .map_err(Error::UnableToSubmitCrossDomainMessage)
204}
205
206fn gossip_inbox_message_response<Block, Client, CNumber, CHash>(
210 client: &Arc<Client>,
211 msg: CrossDomainMessage<CNumber, CHash, H256>,
212 sink: &GossipMessageSink,
213) -> Result<(), Error>
214where
215 Block: BlockT,
216 CNumber: Codec,
217 CHash: Codec,
218 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
219 Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
220{
221 let best_hash = client.info().best_hash;
222 let dst_chain_id = msg.dst_chain_id;
223 let ext = client
224 .runtime_api()
225 .inbox_response_message_unsigned(best_hash, msg)?
226 .ok_or(Error::FailedToConstructExtrinsic)?;
227
228 sink.unbounded_send(GossipMessage {
229 chain_id: dst_chain_id,
230 data: GossipMessageData::Xdm(ext.encode()),
231 })
232 .map_err(Error::UnableToSubmitCrossDomainMessage)
233}
234
235fn check_and_update_recent_xdm_submission<Backend, Client, Block>(
236 backend: &Backend,
237 client: &Arc<Client>,
238 xdm_id: XdmId,
239 msg: &BlockMessageWithStorageKey,
240) -> bool
241where
242 Backend: AuxStore,
243 Block: BlockT,
244 Client: HeaderBackend<Block>,
245{
246 let prefix = (RELAYER_PREFIX, msg.dst_chain_id, msg.src_chain_id).encode();
247 let current_block_id: BlockId<Block> = client.info().into();
248 if let Ok(maybe_submitted_block_id) =
249 get_xdm_processed_block_number::<_, Block>(backend, &prefix, xdm_id)
250 {
251 if !can_allow_xdm_submission(
252 client,
253 xdm_id,
254 maybe_submitted_block_id,
255 current_block_id.clone(),
256 None,
257 ) {
258 log::debug!(
259 target: LOG_TARGET,
260 "Skipping already submitted message relay from {:?}: {:?}",
261 msg.src_chain_id,
262 xdm_id
263 );
264 return false;
265 }
266 }
267
268 if let Err(err) = set_xdm_message_processed_at(backend, &prefix, xdm_id, current_block_id) {
269 log::error!(
270 target: LOG_TARGET,
271 "Failed to store submitted message from {:?} to {:?}: {:?}",
272 msg.src_chain_id,
273 xdm_id,
274 err
275 );
276 }
277
278 true
279}
280
281fn should_relay_outbox_message<Backend, Client, Block, CBlock>(
282 backend: &Backend,
283 client: &Arc<Client>,
284 api: &ApiRef<'_, Client::Api>,
285 best_hash: Block::Hash,
286 msg: &BlockMessageWithStorageKey,
287) -> bool
288where
289 Backend: AuxStore,
290 Block: BlockT,
291 CBlock: BlockT,
292 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
293 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
294{
295 let id = msg.id();
296 match api.should_relay_outbox_message(best_hash, msg.dst_chain_id, id) {
297 Ok(true) => (),
298 Ok(false) => return false,
299 Err(err) => {
300 tracing::error!(
301 target: LOG_TARGET,
302 ?err,
303 "Failed to fetch validity of outbox message {id:?} for domain {0:?}",
304 msg.dst_chain_id
305 );
306 return false;
307 }
308 };
309
310 if let Some(dst_channel_state) =
311 get_channel_state(backend, msg.dst_chain_id, msg.src_chain_id, msg.channel_id)
312 .ok()
313 .flatten()
314 {
315 let max_messages_nonce_allowed = dst_channel_state
319 .next_inbox_nonce
320 .saturating_add(MAX_FUTURE_ALLOWED_NONCES.into());
321 let relay_message = msg.nonce >= dst_channel_state.next_inbox_nonce
322 && msg.nonce <= max_messages_nonce_allowed;
323 if !relay_message {
324 log::debug!(
325 target: LOG_TARGET,
326 "Skipping Outbox message relay from {:?} to {:?} of XDM[{:?}, {:?}]. Max Nonce allowed: {:?}",
327 msg.src_chain_id,
328 msg.dst_chain_id,
329 msg.channel_id,
330 msg.nonce,
331 max_messages_nonce_allowed
332 );
333 return false;
334 }
335 }
336
337 let xdm_id = XdmId::RelayMessage((msg.dst_chain_id, msg.channel_id, msg.nonce));
338 check_and_update_recent_xdm_submission(backend, client, xdm_id, msg)
339}
340
341fn should_relay_inbox_responses_message<Backend, Client, Block, CBlock>(
342 backend: &Backend,
343 client: &Arc<Client>,
344 api: &ApiRef<'_, Client::Api>,
345 best_hash: Block::Hash,
346 msg: &BlockMessageWithStorageKey,
347) -> bool
348where
349 Backend: AuxStore,
350 Block: BlockT,
351 CBlock: BlockT,
352 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
353 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
354{
355 let id = msg.id();
356 match api.should_relay_inbox_message_response(best_hash, msg.dst_chain_id, id) {
357 Ok(true) => (),
358 Ok(false) => return false,
359 Err(err) => {
360 tracing::error!(
361 target: LOG_TARGET,
362 ?err,
363 "Failed to fetch validity of inbox message response {id:?} for domain {0:?}",
364 msg.dst_chain_id
365 );
366 return false;
367 }
368 };
369
370 if let Some(dst_channel_state) =
371 get_channel_state(backend, msg.dst_chain_id, msg.src_chain_id, msg.channel_id)
372 .ok()
373 .flatten()
374 {
375 let next_nonce = if let Some(dst_chain_outbox_response_nonce) =
376 dst_channel_state.latest_response_received_message_nonce
377 {
378 dst_chain_outbox_response_nonce.saturating_add(U256::one())
381 } else {
382 U256::zero()
385 };
386 let max_msg_nonce_allowed = next_nonce.saturating_add(MAX_FUTURE_ALLOWED_NONCES.into());
388 let relay_message = msg.nonce >= next_nonce && msg.nonce <= max_msg_nonce_allowed;
389 if !relay_message {
390 log::debug!(
391 target: LOG_TARGET,
392 "Skipping Inbox response message relay from {:?} to {:?} of XDM[{:?}, {:?}]. Max nonce allowed: {:?}",
393 msg.src_chain_id,
394 msg.dst_chain_id,
395 msg.channel_id,
396 msg.nonce,
397 max_msg_nonce_allowed
398 );
399 return false;
400 }
401 }
402
403 let xdm_id = XdmId::RelayResponseMessage((msg.dst_chain_id, msg.channel_id, msg.nonce));
404 check_and_update_recent_xdm_submission(backend, client, xdm_id, msg)
405}
406
407fn fetch_and_filter_messages<Client, Block, CClient, CBlock>(
409 client: &Arc<Client>,
410 fetch_message_at: Block::Hash,
411 consensus_client: &Arc<CClient>,
412) -> Result<BlockMessagesWithStorageKey, Error>
413where
414 CBlock: BlockT,
415 CClient: AuxStore + HeaderBackend<CBlock>,
416 Block: BlockT,
417 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
418 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
419{
420 let mut msgs = client
421 .runtime_api()
422 .block_messages(fetch_message_at)
423 .map_err(|_| Error::FetchAssignedMessages)?;
424
425 let api = client.runtime_api();
426 let best_hash = client.info().best_hash;
427 msgs.outbox.retain(|msg| {
428 should_relay_outbox_message::<_, _, _, CBlock>(
429 &**consensus_client,
430 client,
431 &api,
432 best_hash,
433 msg,
434 )
435 });
436
437 msgs.inbox_responses.retain(|msg| {
438 should_relay_inbox_responses_message::<_, _, _, CBlock>(
439 &**consensus_client,
440 client,
441 &api,
442 best_hash,
443 msg,
444 )
445 });
446
447 Ok(msgs)
448}
449
450#[derive(Clone)]
452enum XDMProofData<CHash, DHash> {
453 Consensus(CHash),
454 Domain {
455 domain_proof: StorageProof,
456 confirmed_domain_block_hash: DHash,
457 },
458}
459
460impl<Client, Block> Relayer<Client, Block>
461where
462 Block: BlockT,
463 Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
464{
465 pub(crate) fn construct_and_submit_xdm<CClient, CBlock>(
466 chain_id: ChainId,
467 domain_client: &Arc<Client>,
468 consensus_chain_client: &Arc<CClient>,
469 confirmed_block_number: NumberFor<CBlock>,
470 gossip_message_sink: &GossipMessageSink,
471 ) -> Result<(), Error>
472 where
473 CBlock: BlockT,
474 CClient:
475 HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock> + AuxStore,
476 CClient::Api: DomainsApi<CBlock, Block::Header>
477 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
478 + MmrApi<CBlock, H256, NumberFor<CBlock>>
479 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
480 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
481 {
482 let mmr_consensus_block = (
485 confirmed_block_number,
486 consensus_chain_client
487 .hash(confirmed_block_number)?
488 .ok_or(Error::MissingBlockHash)?,
489 );
490 let (to_process_consensus_number, to_process_consensus_hash) =
491 match confirmed_block_number.checked_sub(&One::one()) {
492 None => return Ok(()),
493 Some(n) => {
494 let h = consensus_chain_client
495 .hash(n)?
496 .ok_or(Error::MissingBlockHash)?;
497 (n, h)
498 }
499 };
500
501 tracing::debug!(
502 target: LOG_TARGET,
503 "Checking messages to be submitted from chain: {chain_id:?} at block: ({to_process_consensus_number:?}, {to_process_consensus_hash:?})",
504 );
505
506 let consensus_chain_api = consensus_chain_client.runtime_api();
507 let (block_messages, maybe_domain_data) = match chain_id {
508 ChainId::Consensus => (
509 fetch_and_filter_messages::<_, _, _, CBlock>(
510 consensus_chain_client,
511 to_process_consensus_hash,
512 consensus_chain_client,
513 )?,
514 None,
515 ),
516 ChainId::Domain(domain_id) => {
517 let confirmed_domain_block_hash = {
518 match consensus_chain_api
519 .latest_confirmed_domain_block(to_process_consensus_hash, domain_id)?
520 {
521 Some((_, confirmed_domain_block_hash)) => confirmed_domain_block_hash,
522 None => return Ok(()),
524 }
525 };
526 (
527 fetch_and_filter_messages::<_, _, _, CBlock>(
528 domain_client,
529 confirmed_domain_block_hash,
530 consensus_chain_client,
531 )?,
532 Some((domain_id, confirmed_domain_block_hash)),
533 )
534 }
535 };
536
537 if block_messages.is_empty() {
539 return Ok(());
540 }
541
542 let xdm_proof_data = match maybe_domain_data {
543 None => XDMProofData::Consensus(to_process_consensus_hash),
544 Some((domain_id, confirmed_domain_block_hash)) => {
545 let storage_key = consensus_chain_api
546 .confirmed_domain_block_storage_key(to_process_consensus_hash, domain_id)?;
547
548 let domain_proof = consensus_chain_client.read_proof(
549 to_process_consensus_hash,
550 &mut [storage_key.as_ref()].into_iter(),
551 )?;
552
553 XDMProofData::Domain {
554 domain_proof,
555 confirmed_domain_block_hash,
556 }
557 }
558 };
559
560 construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
561 block_messages.outbox,
562 |key, dst_chain_id| {
563 Self::construct_xdm_proof(
564 consensus_chain_client,
565 domain_client,
566 dst_chain_id,
567 mmr_consensus_block,
568 key,
569 xdm_proof_data.clone(),
570 )
571 },
572 |msg| gossip_outbox_message(domain_client, msg, gossip_message_sink),
573 )?;
574
575 construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
576 block_messages.inbox_responses,
577 |key, dst_chain_id| {
578 Self::construct_xdm_proof(
579 consensus_chain_client,
580 domain_client,
581 dst_chain_id,
582 mmr_consensus_block,
583 key,
584 xdm_proof_data.clone(),
585 )
586 },
587 |msg| gossip_inbox_message_response(domain_client, msg, gossip_message_sink),
588 )?;
589
590 Ok(())
591 }
592
593 fn construct_xdm_proof<CClient, CBlock>(
595 consensus_chain_client: &Arc<CClient>,
596 domain_client: &Arc<Client>,
597 dst_chain_id: ChainId,
598 mmr_consensus_block: (NumberFor<CBlock>, CBlock::Hash),
599 message_storage_key: &[u8],
600 xdm_proof_data: XDMProofData<CBlock::Hash, Block::Hash>,
601 ) -> Result<ProofOf<CBlock>, Error>
602 where
603 CBlock: BlockT,
604 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock>,
605 CClient::Api: DomainsApi<CBlock, Block::Header>
606 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
607 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
608 {
609 let consensus_chain_mmr_proof = construct_consensus_mmr_proof(
610 consensus_chain_client,
611 dst_chain_id,
612 mmr_consensus_block,
613 )?;
614
615 let proof = match xdm_proof_data {
616 XDMProofData::Consensus(at_consensus_hash) => {
617 let message_proof = consensus_chain_client
618 .read_proof(at_consensus_hash, &mut [message_storage_key].into_iter())
619 .map_err(|_| Error::ConstructStorageProof)?;
620
621 Proof::Consensus {
622 consensus_chain_mmr_proof,
623 message_proof,
624 }
625 }
626 XDMProofData::Domain {
627 domain_proof,
628 confirmed_domain_block_hash,
629 } => {
630 let message_proof = domain_client
631 .read_proof(
632 confirmed_domain_block_hash,
633 &mut [message_storage_key].into_iter(),
634 )
635 .map_err(|_| Error::ConstructStorageProof)?;
636
637 Proof::Domain {
638 consensus_chain_mmr_proof,
639 domain_proof,
640 message_proof,
641 }
642 }
643 };
644 Ok(proof)
645 }
646}