1#![warn(rust_2018_idioms)]
2mod aux_schema;
6pub mod worker;
7
8use crate::aux_schema::{
9 ChannelProcessedState, get_last_processed_nonces, set_channel_inbox_response_processed_state,
10 set_channel_outbox_processed_state,
11};
12use async_channel::TrySendError;
13use cross_domain_message_gossip::{
14 ChannelDetail, Message as GossipMessage, MessageData as GossipMessageData, get_channel_state,
15};
16use parity_scale_codec::{Codec, Encode};
17use rand::seq::SliceRandom;
18use sc_client_api::{AuxStore, HeaderBackend, ProofProvider, StorageProof};
19use sc_utils::mpsc::TracingUnboundedSender;
20use sp_api::{ApiExt, ApiRef, ProvideRuntimeApi};
21use sp_core::H256;
22use sp_domains::{ChannelId, DomainsApi};
23use sp_messenger::messages::{
24 BlockMessagesQuery, ChainId, ChannelState, ChannelStateWithNonce, CrossDomainMessage,
25 MessageNonceWithStorageKey, MessagesWithStorageKey, Nonce, Proof,
26};
27use sp_messenger::{MessengerApi, RelayerApi};
28use sp_mmr_primitives::MmrApi;
29use sp_runtime::ArithmeticError;
30use sp_runtime::traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One};
31use sp_subspace_mmr::ConsensusChainMmrLeafProof;
32use std::cmp::max;
33use std::marker::PhantomData;
34use std::sync::Arc;
35use subspace_runtime_primitives::BlockHashFor;
36
37const CHANNEL_PROCESSED_STATE_CACHE_LIMIT: u32 = 5;
38const MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK: usize = 15;
39
40struct Relayer<Client, Block>(PhantomData<(Client, Block)>);
42
43pub type GossipMessageSink = TracingUnboundedSender<GossipMessage>;
45
46#[derive(Debug)]
48pub enum Error {
49 ConstructStorageProof,
51 FailedToConstructExtrinsic,
53 FetchAssignedMessages,
55 StoreRelayedBlockNumber,
57 UnableToFetchDomainId,
59 UnableToFetchRelayConfirmationDepth,
61 BlockchainError(Box<sp_blockchain::Error>),
63 ArithmeticError(ArithmeticError),
65 ApiError(sp_api::ApiError),
67 UnableToSubmitCrossDomainMessage(TrySendError<GossipMessage>),
69 InvalidChainId,
71 MmrProof(sp_mmr_primitives::Error),
73 MmrLeafMissing,
75 MissingBlockHeader,
77 MissingBlockHash,
79}
80
81impl From<sp_blockchain::Error> for Error {
82 #[inline]
83 fn from(err: sp_blockchain::Error) -> Self {
84 Error::BlockchainError(Box::new(err))
85 }
86}
87
88impl From<ArithmeticError> for Error {
89 #[inline]
90 fn from(err: ArithmeticError) -> Self {
91 Error::ArithmeticError(err)
92 }
93}
94
95impl From<sp_api::ApiError> for Error {
96 #[inline]
97 fn from(err: sp_api::ApiError) -> Self {
98 Error::ApiError(err)
99 }
100}
101
102type ProofOf<Block> = Proof<NumberFor<Block>, BlockHashFor<Block>, H256>;
103
104fn construct_consensus_mmr_proof<Client, Block>(
105 consensus_chain_client: &Arc<Client>,
106 dst_chain_id: ChainId,
107 (block_number, block_hash): (NumberFor<Block>, Block::Hash),
108) -> Result<ConsensusChainMmrLeafProof<NumberFor<Block>, Block::Hash, H256>, Error>
109where
110 Block: BlockT,
111 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
112 Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
113{
114 let api = consensus_chain_client.runtime_api();
115 let best_hash = consensus_chain_client.info().best_hash;
116 let best_number = consensus_chain_client.info().best_number;
117
118 let (prove_at_number, prove_at_hash) = match dst_chain_id {
119 ChainId::Consensus => (best_number, best_hash),
124 ChainId::Domain(_) => (block_number, block_hash),
129 };
130
131 let (mut leaves, proof) = api
132 .generate_proof(best_hash, vec![block_number], Some(prove_at_number))
133 .map_err(Error::ApiError)?
134 .map_err(Error::MmrProof)?;
135 debug_assert!(leaves.len() == 1, "should always be of length 1");
136 let leaf = leaves.pop().ok_or(Error::MmrLeafMissing)?;
137
138 Ok(ConsensusChainMmrLeafProof {
139 consensus_block_number: prove_at_number,
140 consensus_block_hash: prove_at_hash,
141 opaque_mmr_leaf: leaf,
142 proof,
143 })
144}
145
146fn construct_cross_chain_message_and_submit<CNumber, CHash, Submitter, ProofConstructor>(
147 msgs: (ChainId, ChainId, ChannelId, Vec<MessageNonceWithStorageKey>),
148 proof_constructor: ProofConstructor,
149 submitter: Submitter,
150) -> Result<(), Error>
151where
152 Submitter: Fn(CrossDomainMessage<CNumber, CHash, H256>) -> Result<(), Error>,
153 ProofConstructor: Fn(&[u8], ChainId) -> Result<Proof<CNumber, CHash, H256>, Error>,
154{
155 let (src_chain_id, dst_chain_id, channel_id, msgs) = msgs;
156 for msg in msgs {
157 let proof = match proof_constructor(&msg.storage_key, dst_chain_id) {
158 Ok(proof) => proof,
159 Err(err) => {
160 tracing::error!(
161 "Failed to construct storage proof for message: {:?} bound to chain: {:?} with error: {:?}",
162 (channel_id, msg.nonce),
163 dst_chain_id,
164 err
165 );
166 continue;
167 }
168 };
169 let msg = CrossDomainMessage::from_relayer_msg_with_proof(
170 src_chain_id,
171 dst_chain_id,
172 channel_id,
173 msg,
174 proof,
175 );
176 let (dst_domain, msg_id) = (msg.dst_chain_id, (msg.channel_id, msg.nonce));
177 if let Err(err) = submitter(msg) {
178 tracing::error!(
179 ?err,
180 "Failed to submit message: {msg_id:?} to domain: {dst_domain:?}",
181 );
182 }
183 }
184
185 Ok(())
186}
187
188fn gossip_outbox_message<Block, Client, CNumber, CHash>(
190 client: &Arc<Client>,
191 msg: CrossDomainMessage<CNumber, CHash, H256>,
192 sink: &GossipMessageSink,
193) -> Result<(), Error>
194where
195 Block: BlockT,
196 CNumber: Codec,
197 CHash: Codec,
198 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
199 Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
200{
201 let best_hash = client.info().best_hash;
202 let dst_chain_id = msg.dst_chain_id;
203 let msg_id = (msg.dst_chain_id, msg.channel_id, msg.nonce);
204 let ext = client
205 .runtime_api()
206 .outbox_message_unsigned(best_hash, msg)?
207 .ok_or(Error::FailedToConstructExtrinsic)?;
208
209 tracing::trace!("Submitting Outbox message: {:?}", msg_id);
210 sink.unbounded_send(GossipMessage {
211 chain_id: dst_chain_id,
212 data: GossipMessageData::Xdm(ext.encode()),
213 })
214 .map_err(Error::UnableToSubmitCrossDomainMessage)
215}
216
217fn gossip_inbox_message_response<Block, Client, CNumber, CHash>(
221 client: &Arc<Client>,
222 msg: CrossDomainMessage<CNumber, CHash, H256>,
223 sink: &GossipMessageSink,
224) -> Result<(), Error>
225where
226 Block: BlockT,
227 CNumber: Codec,
228 CHash: Codec,
229 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
230 Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
231{
232 let best_hash = client.info().best_hash;
233 let dst_chain_id = msg.dst_chain_id;
234 let msg_id = (msg.dst_chain_id, msg.channel_id, msg.nonce);
235 let ext = client
236 .runtime_api()
237 .inbox_response_message_unsigned(best_hash, msg)?
238 .ok_or(Error::FailedToConstructExtrinsic)?;
239
240 tracing::trace!("Submitting Inbox response message: {:?}", msg_id);
241 sink.unbounded_send(GossipMessage {
242 chain_id: dst_chain_id,
243 data: GossipMessageData::Xdm(ext.encode()),
244 })
245 .map_err(Error::UnableToSubmitCrossDomainMessage)
246}
247
248fn fetch_and_filter_messages<Client, Block, CClient, CBlock>(
250 client: &Arc<Client>,
251 fetch_message_at: Block::Hash,
252 consensus_client: &Arc<CClient>,
253 self_chain_id: ChainId,
254) -> Result<Vec<(ChainId, ChannelId, MessagesWithStorageKey)>, Error>
255where
256 CBlock: BlockT,
257 CClient: AuxStore + HeaderBackend<CBlock>,
258 Block: BlockT,
259 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
260 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
261{
262 if !is_relayer_api_version_available::<_, Block, CBlock>(client, 3, fetch_message_at) {
264 return Ok(vec![]);
265 }
266
267 fetch_messages::<_, _, Block, CBlock>(
268 &**consensus_client,
269 client,
270 fetch_message_at,
271 self_chain_id,
272 )
273}
274
275#[derive(Clone)]
277enum XDMProofData<CHash, DHash> {
278 Consensus(CHash),
279 Domain {
280 domain_proof: StorageProof,
281 confirmed_domain_block_hash: DHash,
282 },
283}
284
285impl<Client, Block> Relayer<Client, Block>
286where
287 Block: BlockT,
288 Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
289{
290 pub(crate) fn construct_and_submit_xdm<CClient, CBlock>(
291 chain_id: ChainId,
292 domain_client: &Arc<Client>,
293 consensus_chain_client: &Arc<CClient>,
294 confirmed_block_number: NumberFor<CBlock>,
295 gossip_message_sink: &GossipMessageSink,
296 ) -> Result<(), Error>
297 where
298 CBlock: BlockT,
299 CClient:
300 HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock> + AuxStore,
301 CClient::Api: DomainsApi<CBlock, Block::Header>
302 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
303 + MmrApi<CBlock, H256, NumberFor<CBlock>>
304 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
305 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
306 {
307 let mmr_consensus_block = (
310 confirmed_block_number,
311 consensus_chain_client
312 .hash(confirmed_block_number)?
313 .ok_or(Error::MissingBlockHash)?,
314 );
315 let (to_process_consensus_number, to_process_consensus_hash) =
316 match confirmed_block_number.checked_sub(&One::one()) {
317 None => return Ok(()),
318 Some(n) => {
319 let h = consensus_chain_client
320 .hash(n)?
321 .ok_or(Error::MissingBlockHash)?;
322 (n, h)
323 }
324 };
325
326 tracing::debug!(
327 "Checking messages to be submitted from chain: {chain_id:?} at block: ({to_process_consensus_number:?}, {to_process_consensus_hash:?})",
328 );
329
330 let consensus_chain_api = consensus_chain_client.runtime_api();
331 let (block_messages, maybe_domain_data) = match chain_id {
332 ChainId::Consensus => (
333 fetch_and_filter_messages::<_, _, _, CBlock>(
334 consensus_chain_client,
335 to_process_consensus_hash,
336 consensus_chain_client,
337 chain_id,
338 )?,
339 None,
340 ),
341 ChainId::Domain(domain_id) => {
342 let confirmed_domain_block_hash = {
343 match consensus_chain_api
344 .latest_confirmed_domain_block(to_process_consensus_hash, domain_id)?
345 {
346 Some((_, confirmed_domain_block_hash)) => confirmed_domain_block_hash,
347 None => return Ok(()),
349 }
350 };
351 (
352 fetch_and_filter_messages::<_, _, _, CBlock>(
353 domain_client,
354 confirmed_domain_block_hash,
355 consensus_chain_client,
356 chain_id,
357 )?,
358 Some((domain_id, confirmed_domain_block_hash)),
359 )
360 }
361 };
362
363 if block_messages.is_empty() {
365 tracing::debug!("No messages from chain[{:?}]. Skipping..", chain_id);
366 return Ok(());
367 }
368
369 let xdm_proof_data = match maybe_domain_data {
370 None => XDMProofData::Consensus(to_process_consensus_hash),
371 Some((domain_id, confirmed_domain_block_hash)) => {
372 let storage_key = consensus_chain_api
373 .confirmed_domain_block_storage_key(to_process_consensus_hash, domain_id)?;
374
375 let domain_proof = consensus_chain_client.read_proof(
376 to_process_consensus_hash,
377 &mut [storage_key.as_ref()].into_iter(),
378 )?;
379
380 XDMProofData::Domain {
381 domain_proof,
382 confirmed_domain_block_hash,
383 }
384 }
385 };
386
387 for (dst_chain_id, channel_id, messages) in block_messages {
388 tracing::debug!(
389 "Submitting messages to chain[{:?}] on Channel[{:?}] with [{:?}] Outbox messages",
390 dst_chain_id,
391 channel_id,
392 messages.outbox.len()
393 );
394 construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
395 (chain_id, dst_chain_id, channel_id, messages.outbox),
396 |key, dst_chain_id| {
397 Self::construct_xdm_proof(
398 consensus_chain_client,
399 domain_client,
400 dst_chain_id,
401 mmr_consensus_block,
402 key,
403 xdm_proof_data.clone(),
404 )
405 },
406 |msg| gossip_outbox_message(domain_client, msg, gossip_message_sink),
407 )?;
408
409 tracing::debug!(
410 "Submitting messages to chain[{:?}] on Channel[{:?}] with [{:?}] Inbox response messages",
411 dst_chain_id,
412 channel_id,
413 messages.inbox_responses.len()
414 );
415 construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
416 (chain_id, dst_chain_id, channel_id, messages.inbox_responses),
417 |key, dst_chain_id| {
418 Self::construct_xdm_proof(
419 consensus_chain_client,
420 domain_client,
421 dst_chain_id,
422 mmr_consensus_block,
423 key,
424 xdm_proof_data.clone(),
425 )
426 },
427 |msg| gossip_inbox_message_response(domain_client, msg, gossip_message_sink),
428 )?;
429 }
430
431 Ok(())
432 }
433
434 fn construct_xdm_proof<CClient, CBlock>(
436 consensus_chain_client: &Arc<CClient>,
437 domain_client: &Arc<Client>,
438 dst_chain_id: ChainId,
439 mmr_consensus_block: (NumberFor<CBlock>, CBlock::Hash),
440 message_storage_key: &[u8],
441 xdm_proof_data: XDMProofData<CBlock::Hash, Block::Hash>,
442 ) -> Result<ProofOf<CBlock>, Error>
443 where
444 CBlock: BlockT,
445 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock>,
446 CClient::Api: DomainsApi<CBlock, Block::Header>
447 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
448 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
449 {
450 let consensus_chain_mmr_proof = construct_consensus_mmr_proof(
451 consensus_chain_client,
452 dst_chain_id,
453 mmr_consensus_block,
454 )?;
455
456 let proof = match xdm_proof_data {
457 XDMProofData::Consensus(at_consensus_hash) => {
458 let message_proof = consensus_chain_client
459 .read_proof(at_consensus_hash, &mut [message_storage_key].into_iter())
460 .map_err(|_| Error::ConstructStorageProof)?;
461
462 Proof::Consensus {
463 consensus_chain_mmr_proof,
464 message_proof,
465 }
466 }
467 XDMProofData::Domain {
468 domain_proof,
469 confirmed_domain_block_hash,
470 } => {
471 let message_proof = domain_client
472 .read_proof(
473 confirmed_domain_block_hash,
474 &mut [message_storage_key].into_iter(),
475 )
476 .map_err(|_| Error::ConstructStorageProof)?;
477
478 Proof::Domain {
479 consensus_chain_mmr_proof,
480 domain_proof,
481 message_proof,
482 }
483 }
484 };
485 Ok(proof)
486 }
487}
488
489fn filter_block_messages<Client, Block, CBlock>(
490 api: &ApiRef<'_, Client::Api>,
491 best_hash: Block::Hash,
492 query: BlockMessagesQuery,
493 messages: &mut MessagesWithStorageKey,
494) -> Result<(), Error>
495where
496 Block: BlockT,
497 CBlock: BlockT,
498 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
499 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
500{
501 let BlockMessagesQuery {
502 chain_id,
503 channel_id,
504 outbox_from,
505 inbox_responses_from,
506 } = query;
507 let maybe_outbox_nonce =
508 api.first_outbox_message_nonce_to_relay(best_hash, chain_id, channel_id, outbox_from)?;
509 let maybe_inbox_response_nonce = api.first_inbox_message_response_nonce_to_relay(
510 best_hash,
511 chain_id,
512 channel_id,
513 inbox_responses_from,
514 )?;
515
516 if let Some(nonce) = maybe_outbox_nonce {
517 messages.outbox.retain(|msg| msg.nonce >= nonce)
518 } else {
519 messages.outbox.clear()
520 }
521
522 if let Some(nonce) = maybe_inbox_response_nonce {
523 messages.inbox_responses.retain(|msg| msg.nonce >= nonce)
524 } else {
525 messages.inbox_responses.clear();
526 }
527
528 Ok(())
529}
530
531fn fetch_messages<Backend, Client, Block, CBlock>(
533 backend: &Backend,
534 client: &Arc<Client>,
535 fetch_message_at: Block::Hash,
536 self_chain_id: ChainId,
537) -> Result<Vec<(ChainId, ChannelId, MessagesWithStorageKey)>, Error>
538where
539 Block: BlockT,
540 CBlock: BlockT,
541 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
542 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
543 Backend: AuxStore,
544{
545 let runtime_api = client.runtime_api();
546 let fetch_message_at_number = client
547 .number(fetch_message_at)?
548 .ok_or(Error::MissingBlockHeader)?;
549 let mut queries = runtime_api
550 .channels_and_state(fetch_message_at)?
551 .into_iter()
552 .filter_map(|(dst_chain_id, channel_id, channel_state)| {
553 get_channel_state_query(
554 backend,
555 client,
556 fetch_message_at,
557 self_chain_id,
558 dst_chain_id,
559 channel_id,
560 channel_state,
561 )
562 .ok()
563 .flatten()
564 })
565 .collect::<Vec<_>>();
566
567 let queries = if queries.len() <= MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK {
569 queries
570 } else {
571 let mut rng = rand::thread_rng();
572 queries.shuffle(&mut rng);
573 queries.truncate(MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK);
574 queries
575 };
576
577 let best_hash = client.info().best_hash;
578 let runtime_api_best = client.runtime_api();
579 let total_messages = queries
580 .into_iter()
581 .filter_map(|query| {
582 let BlockMessagesQuery {
583 chain_id: dst_chain_id,
584 channel_id,
585 ..
586 } = query;
587 let mut messages = runtime_api
588 .block_messages_with_query(fetch_message_at, query.clone())
589 .ok()?;
590
591 filter_block_messages::<Client, _, CBlock>(
593 &runtime_api_best,
594 best_hash,
595 query,
596 &mut messages,
597 )
598 .ok()?;
599
600 if !messages.outbox.is_empty()
601 && let Some(max_nonce) = messages.outbox.iter().map(|key| key.nonce).max()
602 {
603 set_channel_outbox_processed_state(
604 backend,
605 self_chain_id,
606 dst_chain_id,
607 ChannelProcessedState::<Block> {
608 block_number: fetch_message_at_number,
609 block_hash: fetch_message_at,
610 channel_id,
611 nonce: Some(max_nonce),
612 },
613 )
614 .ok()?;
615 }
616
617 if !messages.inbox_responses.is_empty()
618 && let Some(max_nonce) = messages.inbox_responses.iter().map(|key| key.nonce).max()
619 {
620 set_channel_inbox_response_processed_state(
621 backend,
622 self_chain_id,
623 dst_chain_id,
624 ChannelProcessedState::<Block> {
625 block_number: fetch_message_at_number,
626 block_hash: fetch_message_at,
627 channel_id,
628 nonce: Some(max_nonce),
629 },
630 )
631 .ok()?;
632 }
633
634 Some((dst_chain_id, channel_id, messages))
635 })
636 .collect();
637
638 Ok(total_messages)
639}
640
641fn get_channel_state_query<Backend, Client, Block>(
642 backend: &Backend,
643 client: &Arc<Client>,
644 fetch_message_at: Block::Hash,
645 self_chain_id: ChainId,
646 dst_chain_id: ChainId,
647 channel_id: ChannelId,
648 local_channel_state: ChannelStateWithNonce,
649) -> Result<Option<BlockMessagesQuery>, Error>
650where
651 Backend: AuxStore,
652 Block: BlockT,
653 Client: HeaderBackend<Block>,
654{
655 let maybe_dst_channel_state =
656 get_channel_state(backend, dst_chain_id, self_chain_id, channel_id)
657 .ok()
658 .flatten();
659
660 let last_processed_nonces = get_last_processed_nonces(
661 backend,
662 client,
663 fetch_message_at,
664 self_chain_id,
665 dst_chain_id,
666 channel_id,
667 )?;
668
669 let query = match (
670 maybe_dst_channel_state,
671 last_processed_nonces.outbox_nonce,
672 last_processed_nonces.inbox_response_nonce,
673 ) {
674 (None, None, None) => Some(BlockMessagesQuery {
676 chain_id: dst_chain_id,
677 channel_id,
678 outbox_from: Nonce::zero(),
679 inbox_responses_from: Nonce::zero(),
680 }),
681 (Some(dst_channel_state), None, None) => {
683 should_relay_messages_to_channel(dst_chain_id, &dst_channel_state, local_channel_state)
684 .then_some(BlockMessagesQuery {
685 chain_id: dst_chain_id,
686 channel_id,
687 outbox_from: dst_channel_state.next_inbox_nonce,
688 inbox_responses_from: dst_channel_state
689 .latest_response_received_message_nonce
690 .map(|nonce| nonce.saturating_add(One::one()))
692 .unwrap_or(Nonce::zero()),
693 })
694 }
695 (None, last_outbox_nonce, last_inbox_message_response_nonce) => Some(BlockMessagesQuery {
697 chain_id: dst_chain_id,
698 channel_id,
699 outbox_from: last_outbox_nonce
700 .map(|nonce| nonce.saturating_add(One::one()))
701 .unwrap_or(Nonce::zero()),
702 inbox_responses_from: last_inbox_message_response_nonce
703 .map(|nonce| nonce.saturating_add(One::one()))
704 .unwrap_or(Nonce::zero()),
705 }),
706 (Some(dst_channel_state), last_outbox_nonce, last_inbox_message_response_nonce) => {
707 should_relay_messages_to_channel(dst_chain_id, &dst_channel_state, local_channel_state)
708 .then(|| {
709 let next_outbox_nonce = max(
710 dst_channel_state.next_inbox_nonce,
711 last_outbox_nonce
712 .map(|nonce| nonce.saturating_add(One::one()))
713 .unwrap_or(Nonce::zero()),
714 );
715
716 let next_inbox_response_nonce = max(
717 dst_channel_state
718 .latest_response_received_message_nonce
719 .map(|nonce| nonce.saturating_add(One::one()))
720 .unwrap_or(Nonce::zero()),
721 last_inbox_message_response_nonce
722 .map(|nonce| nonce.saturating_add(One::one()))
723 .unwrap_or(Nonce::zero()),
724 );
725
726 BlockMessagesQuery {
727 chain_id: dst_chain_id,
728 channel_id,
729 outbox_from: next_outbox_nonce,
730 inbox_responses_from: next_inbox_response_nonce,
731 }
732 })
733 }
734 };
735
736 tracing::debug!(
737 "From Chain[{:?}] to Chain[{:?}] and Channel[{:?}] Query: {:?}",
738 self_chain_id,
739 dst_chain_id,
740 channel_id,
741 query
742 );
743
744 Ok(query)
745}
746
747fn should_relay_messages_to_channel(
748 dst_chain_id: ChainId,
749 dst_channel_state: &ChannelDetail,
750 local_channel_state: ChannelStateWithNonce,
751) -> bool {
752 let should_process = if dst_channel_state.state == ChannelState::Closed
753 && let ChannelStateWithNonce::Closed {
754 next_outbox_nonce,
755 next_inbox_nonce,
756 } = local_channel_state
757 {
758 let no_outbox_messages = next_outbox_nonce == dst_channel_state.next_inbox_nonce;
762
763 let no_inbox_responses_messages = dst_channel_state
767 .latest_response_received_message_nonce
768 .map(|nonce| nonce == next_inbox_nonce.saturating_sub(Nonce::one()))
769 .unwrap_or(false);
770
771 !(no_outbox_messages && no_inbox_responses_messages)
772 } else {
773 true
774 };
775
776 if !should_process {
777 tracing::debug!(
778 "Chain[{:?}] for Channel[{:?}] is closed and no messages to process",
779 dst_chain_id,
780 dst_channel_state.channel_id
781 );
782 }
783
784 should_process
785}
786
787fn is_relayer_api_version_available<Client, Block, CBlock>(
788 client: &Arc<Client>,
789 version: u32,
790 block_hash: Block::Hash,
791) -> bool
792where
793 Block: BlockT,
794 CBlock: BlockT,
795 Client: ProvideRuntimeApi<Block>,
796 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
797{
798 let relayer_api_version = client
799 .runtime_api()
800 .api_version::<dyn RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>>(
801 block_hash,
802 )
803 .ok()
804 .flatten()
805 .unwrap_or(1);
807
808 relayer_api_version >= version
809}