1#![warn(rust_2018_idioms)]
2mod aux_schema;
6pub mod worker;
7
8use crate::aux_schema::{
9 ChannelProcessedState, get_last_processed_nonces, set_channel_inbox_response_processed_state,
10 set_channel_outbox_processed_state,
11};
12use async_channel::TrySendError;
13use cross_domain_message_gossip::{
14 ChannelDetail, Message as GossipMessage, MessageData as GossipMessageData, get_channel_state,
15};
16use parity_scale_codec::{Codec, Encode};
17use rand::seq::SliceRandom;
18use sc_client_api::{AuxStore, HeaderBackend, ProofProvider, StorageProof};
19use sc_utils::mpsc::TracingUnboundedSender;
20use sp_api::{ApiRef, ProvideRuntimeApi};
21use sp_core::H256;
22use sp_domains::{ChannelId, DomainsApi};
23use sp_messenger::messages::{
24 BlockMessagesQuery, ChainId, ChannelState, ChannelStateWithNonce, CrossDomainMessage,
25 MessageNonceWithStorageKey, MessagesWithStorageKey, Nonce, Proof,
26};
27use sp_messenger::{MessengerApi, RelayerApi};
28use sp_mmr_primitives::MmrApi;
29use sp_runtime::ArithmeticError;
30use sp_runtime::traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One};
31use sp_subspace_mmr::ConsensusChainMmrLeafProof;
32use std::cmp::max;
33use std::marker::PhantomData;
34use std::sync::Arc;
35use subspace_runtime_primitives::BlockHashFor;
36
37const CHANNEL_PROCESSED_STATE_CACHE_LIMIT: u32 = 5;
38const MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK: usize = 15;
39
40struct Relayer<Client, Block>(PhantomData<(Client, Block)>);
42
43pub type GossipMessageSink = TracingUnboundedSender<GossipMessage>;
45
46#[derive(Debug)]
48pub enum Error {
49 ConstructStorageProof,
51 FailedToConstructExtrinsic,
53 FetchAssignedMessages,
55 StoreRelayedBlockNumber,
57 UnableToFetchDomainId,
59 UnableToFetchRelayConfirmationDepth,
61 BlockchainError(Box<sp_blockchain::Error>),
63 ArithmeticError(ArithmeticError),
65 ApiError(sp_api::ApiError),
67 UnableToSubmitCrossDomainMessage(TrySendError<GossipMessage>),
69 InvalidChainId,
71 MmrProof(sp_mmr_primitives::Error),
73 MmrLeafMissing,
75 MissingBlockHeader,
77 MissingBlockHash,
79}
80
81impl From<sp_blockchain::Error> for Error {
82 #[inline]
83 fn from(err: sp_blockchain::Error) -> Self {
84 Error::BlockchainError(Box::new(err))
85 }
86}
87
88impl From<ArithmeticError> for Error {
89 #[inline]
90 fn from(err: ArithmeticError) -> Self {
91 Error::ArithmeticError(err)
92 }
93}
94
95impl From<sp_api::ApiError> for Error {
96 #[inline]
97 fn from(err: sp_api::ApiError) -> Self {
98 Error::ApiError(err)
99 }
100}
101
102type ProofOf<Block> = Proof<NumberFor<Block>, BlockHashFor<Block>, H256>;
103
104fn construct_consensus_mmr_proof<Client, Block>(
105 consensus_chain_client: &Arc<Client>,
106 dst_chain_id: ChainId,
107 (block_number, block_hash): (NumberFor<Block>, Block::Hash),
108) -> Result<ConsensusChainMmrLeafProof<NumberFor<Block>, Block::Hash, H256>, Error>
109where
110 Block: BlockT,
111 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
112 Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
113{
114 let api = consensus_chain_client.runtime_api();
115 let best_hash = consensus_chain_client.info().best_hash;
116 let best_number = consensus_chain_client.info().best_number;
117
118 let (prove_at_number, prove_at_hash) = match dst_chain_id {
119 ChainId::Consensus => (best_number, best_hash),
124 ChainId::Domain(_) => (block_number, block_hash),
129 };
130
131 let (mut leaves, proof) = api
132 .generate_proof(best_hash, vec![block_number], Some(prove_at_number))
133 .map_err(Error::ApiError)?
134 .map_err(Error::MmrProof)?;
135 debug_assert!(leaves.len() == 1, "should always be of length 1");
136 let leaf = leaves.pop().ok_or(Error::MmrLeafMissing)?;
137
138 Ok(ConsensusChainMmrLeafProof {
139 consensus_block_number: prove_at_number,
140 consensus_block_hash: prove_at_hash,
141 opaque_mmr_leaf: leaf,
142 proof,
143 })
144}
145
146fn construct_cross_chain_message_and_submit<CNumber, CHash, Submitter, ProofConstructor>(
147 msgs: (ChainId, ChainId, ChannelId, Vec<MessageNonceWithStorageKey>),
148 proof_constructor: ProofConstructor,
149 submitter: Submitter,
150) -> Result<(), Error>
151where
152 Submitter: Fn(CrossDomainMessage<CNumber, CHash, H256>) -> Result<(), Error>,
153 ProofConstructor: Fn(&[u8], ChainId) -> Result<Proof<CNumber, CHash, H256>, Error>,
154{
155 let (src_chain_id, dst_chain_id, channel_id, msgs) = msgs;
156 for msg in msgs {
157 let proof = match proof_constructor(&msg.storage_key, dst_chain_id) {
158 Ok(proof) => proof,
159 Err(err) => {
160 tracing::error!(
161 "Failed to construct storage proof for message: {:?} bound to chain: {:?} with error: {:?}",
162 (channel_id, msg.nonce),
163 dst_chain_id,
164 err
165 );
166 continue;
167 }
168 };
169 let msg = CrossDomainMessage::from_relayer_msg_with_proof(
170 src_chain_id,
171 dst_chain_id,
172 channel_id,
173 msg,
174 proof,
175 );
176 let (dst_domain, msg_id) = (msg.dst_chain_id, (msg.channel_id, msg.nonce));
177 if let Err(err) = submitter(msg) {
178 tracing::error!(
179 ?err,
180 "Failed to submit message: {msg_id:?} to domain: {dst_domain:?}",
181 );
182 }
183 }
184
185 Ok(())
186}
187
188fn gossip_outbox_message<Block, Client, CNumber, CHash>(
190 client: &Arc<Client>,
191 msg: CrossDomainMessage<CNumber, CHash, H256>,
192 sink: &GossipMessageSink,
193) -> Result<(), Error>
194where
195 Block: BlockT,
196 CNumber: Codec,
197 CHash: Codec,
198 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
199 Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
200{
201 let best_hash = client.info().best_hash;
202 let dst_chain_id = msg.dst_chain_id;
203 let msg_id = (msg.dst_chain_id, msg.channel_id, msg.nonce);
204 let ext = client
205 .runtime_api()
206 .outbox_message_unsigned(best_hash, msg)?
207 .ok_or(Error::FailedToConstructExtrinsic)?;
208
209 tracing::trace!("Submitting Outbox message: {:?}", msg_id);
210 sink.unbounded_send(GossipMessage {
211 chain_id: dst_chain_id,
212 data: GossipMessageData::Xdm(ext.encode()),
213 })
214 .map_err(Error::UnableToSubmitCrossDomainMessage)
215}
216
217fn gossip_inbox_message_response<Block, Client, CNumber, CHash>(
221 client: &Arc<Client>,
222 msg: CrossDomainMessage<CNumber, CHash, H256>,
223 sink: &GossipMessageSink,
224) -> Result<(), Error>
225where
226 Block: BlockT,
227 CNumber: Codec,
228 CHash: Codec,
229 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
230 Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
231{
232 let best_hash = client.info().best_hash;
233 let dst_chain_id = msg.dst_chain_id;
234 let msg_id = (msg.dst_chain_id, msg.channel_id, msg.nonce);
235 let ext = client
236 .runtime_api()
237 .inbox_response_message_unsigned(best_hash, msg)?
238 .ok_or(Error::FailedToConstructExtrinsic)?;
239
240 tracing::trace!("Submitting Inbox response message: {:?}", msg_id);
241 sink.unbounded_send(GossipMessage {
242 chain_id: dst_chain_id,
243 data: GossipMessageData::Xdm(ext.encode()),
244 })
245 .map_err(Error::UnableToSubmitCrossDomainMessage)
246}
247
248fn fetch_and_filter_messages<Client, Block, CClient, CBlock>(
250 client: &Arc<Client>,
251 fetch_message_at: Block::Hash,
252 consensus_client: &Arc<CClient>,
253 self_chain_id: ChainId,
254) -> Result<Vec<(ChainId, ChannelId, MessagesWithStorageKey)>, Error>
255where
256 CBlock: BlockT,
257 CClient: AuxStore + HeaderBackend<CBlock>,
258 Block: BlockT,
259 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
260 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
261{
262 fetch_messages::<_, _, Block, CBlock>(
263 &**consensus_client,
264 client,
265 fetch_message_at,
266 self_chain_id,
267 )
268}
269
270#[derive(Clone)]
272enum XDMProofData<CHash, DHash> {
273 Consensus(CHash),
274 Domain {
275 domain_proof: StorageProof,
276 confirmed_domain_block_hash: DHash,
277 },
278}
279
280impl<Client, Block> Relayer<Client, Block>
281where
282 Block: BlockT,
283 Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
284{
285 pub(crate) fn construct_and_submit_xdm<CClient, CBlock>(
286 chain_id: ChainId,
287 domain_client: &Arc<Client>,
288 consensus_chain_client: &Arc<CClient>,
289 confirmed_block_number: NumberFor<CBlock>,
290 gossip_message_sink: &GossipMessageSink,
291 ) -> Result<(), Error>
292 where
293 CBlock: BlockT,
294 CClient:
295 HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock> + AuxStore,
296 CClient::Api: DomainsApi<CBlock, Block::Header>
297 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
298 + MmrApi<CBlock, H256, NumberFor<CBlock>>
299 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
300 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
301 {
302 let mmr_consensus_block = (
305 confirmed_block_number,
306 consensus_chain_client
307 .hash(confirmed_block_number)?
308 .ok_or(Error::MissingBlockHash)?,
309 );
310 let (to_process_consensus_number, to_process_consensus_hash) =
311 match confirmed_block_number.checked_sub(&One::one()) {
312 None => return Ok(()),
313 Some(n) => {
314 let h = consensus_chain_client
315 .hash(n)?
316 .ok_or(Error::MissingBlockHash)?;
317 (n, h)
318 }
319 };
320
321 tracing::debug!(
322 "Checking messages to be submitted from chain: {chain_id:?} at block: ({to_process_consensus_number:?}, {to_process_consensus_hash:?})",
323 );
324
325 let consensus_chain_api = consensus_chain_client.runtime_api();
326 let (block_messages, maybe_domain_data) = match chain_id {
327 ChainId::Consensus => (
328 fetch_and_filter_messages::<_, _, _, CBlock>(
329 consensus_chain_client,
330 to_process_consensus_hash,
331 consensus_chain_client,
332 chain_id,
333 )?,
334 None,
335 ),
336 ChainId::Domain(domain_id) => {
337 let confirmed_domain_block_hash = {
338 match consensus_chain_api
339 .latest_confirmed_domain_block(to_process_consensus_hash, domain_id)?
340 {
341 Some((_, confirmed_domain_block_hash)) => confirmed_domain_block_hash,
342 None => return Ok(()),
344 }
345 };
346 (
347 fetch_and_filter_messages::<_, _, _, CBlock>(
348 domain_client,
349 confirmed_domain_block_hash,
350 consensus_chain_client,
351 chain_id,
352 )?,
353 Some((domain_id, confirmed_domain_block_hash)),
354 )
355 }
356 };
357
358 if block_messages.is_empty() {
360 tracing::debug!("No messages from chain[{:?}]. Skipping..", chain_id);
361 return Ok(());
362 }
363
364 let xdm_proof_data = match maybe_domain_data {
365 None => XDMProofData::Consensus(to_process_consensus_hash),
366 Some((domain_id, confirmed_domain_block_hash)) => {
367 let storage_key = consensus_chain_api
368 .confirmed_domain_block_storage_key(to_process_consensus_hash, domain_id)?;
369
370 let domain_proof = consensus_chain_client.read_proof(
371 to_process_consensus_hash,
372 &mut [storage_key.as_ref()].into_iter(),
373 )?;
374
375 XDMProofData::Domain {
376 domain_proof,
377 confirmed_domain_block_hash,
378 }
379 }
380 };
381
382 for (dst_chain_id, channel_id, messages) in block_messages {
383 tracing::debug!(
384 "Submitting messages to chain[{:?}] on Channel[{:?}] with [{:?}] Outbox messages",
385 dst_chain_id,
386 channel_id,
387 messages.outbox.len()
388 );
389 construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
390 (chain_id, dst_chain_id, channel_id, messages.outbox),
391 |key, dst_chain_id| {
392 Self::construct_xdm_proof(
393 consensus_chain_client,
394 domain_client,
395 dst_chain_id,
396 mmr_consensus_block,
397 key,
398 xdm_proof_data.clone(),
399 )
400 },
401 |msg| gossip_outbox_message(domain_client, msg, gossip_message_sink),
402 )?;
403
404 tracing::debug!(
405 "Submitting messages to chain[{:?}] on Channel[{:?}] with [{:?}] Inbox response messages",
406 dst_chain_id,
407 channel_id,
408 messages.inbox_responses.len()
409 );
410 construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
411 (chain_id, dst_chain_id, channel_id, messages.inbox_responses),
412 |key, dst_chain_id| {
413 Self::construct_xdm_proof(
414 consensus_chain_client,
415 domain_client,
416 dst_chain_id,
417 mmr_consensus_block,
418 key,
419 xdm_proof_data.clone(),
420 )
421 },
422 |msg| gossip_inbox_message_response(domain_client, msg, gossip_message_sink),
423 )?;
424 }
425
426 Ok(())
427 }
428
429 fn construct_xdm_proof<CClient, CBlock>(
431 consensus_chain_client: &Arc<CClient>,
432 domain_client: &Arc<Client>,
433 dst_chain_id: ChainId,
434 mmr_consensus_block: (NumberFor<CBlock>, CBlock::Hash),
435 message_storage_key: &[u8],
436 xdm_proof_data: XDMProofData<CBlock::Hash, Block::Hash>,
437 ) -> Result<ProofOf<CBlock>, Error>
438 where
439 CBlock: BlockT,
440 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock>,
441 CClient::Api: DomainsApi<CBlock, Block::Header>
442 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
443 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
444 {
445 let consensus_chain_mmr_proof = construct_consensus_mmr_proof(
446 consensus_chain_client,
447 dst_chain_id,
448 mmr_consensus_block,
449 )?;
450
451 let proof = match xdm_proof_data {
452 XDMProofData::Consensus(at_consensus_hash) => {
453 let message_proof = consensus_chain_client
454 .read_proof(at_consensus_hash, &mut [message_storage_key].into_iter())
455 .map_err(|_| Error::ConstructStorageProof)?;
456
457 Proof::Consensus {
458 consensus_chain_mmr_proof,
459 message_proof,
460 }
461 }
462 XDMProofData::Domain {
463 domain_proof,
464 confirmed_domain_block_hash,
465 } => {
466 let message_proof = domain_client
467 .read_proof(
468 confirmed_domain_block_hash,
469 &mut [message_storage_key].into_iter(),
470 )
471 .map_err(|_| Error::ConstructStorageProof)?;
472
473 Proof::Domain {
474 consensus_chain_mmr_proof,
475 domain_proof,
476 message_proof,
477 }
478 }
479 };
480 Ok(proof)
481 }
482}
483
484fn filter_block_messages<Client, Block, CBlock>(
485 api: &ApiRef<'_, Client::Api>,
486 best_hash: Block::Hash,
487 query: BlockMessagesQuery,
488 messages: &mut MessagesWithStorageKey,
489) -> Result<(), Error>
490where
491 Block: BlockT,
492 CBlock: BlockT,
493 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
494 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
495{
496 let BlockMessagesQuery {
497 chain_id,
498 channel_id,
499 outbox_from,
500 inbox_responses_from,
501 } = query;
502 let maybe_outbox_nonce =
503 api.first_outbox_message_nonce_to_relay(best_hash, chain_id, channel_id, outbox_from)?;
504 let maybe_inbox_response_nonce = api.first_inbox_message_response_nonce_to_relay(
505 best_hash,
506 chain_id,
507 channel_id,
508 inbox_responses_from,
509 )?;
510
511 if let Some(nonce) = maybe_outbox_nonce {
512 messages.outbox.retain(|msg| msg.nonce >= nonce)
513 } else {
514 messages.outbox.clear()
515 }
516
517 if let Some(nonce) = maybe_inbox_response_nonce {
518 messages.inbox_responses.retain(|msg| msg.nonce >= nonce)
519 } else {
520 messages.inbox_responses.clear();
521 }
522
523 Ok(())
524}
525
526fn fetch_messages<Backend, Client, Block, CBlock>(
528 backend: &Backend,
529 client: &Arc<Client>,
530 fetch_message_at: Block::Hash,
531 self_chain_id: ChainId,
532) -> Result<Vec<(ChainId, ChannelId, MessagesWithStorageKey)>, Error>
533where
534 Block: BlockT,
535 CBlock: BlockT,
536 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
537 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
538 Backend: AuxStore,
539{
540 let runtime_api = client.runtime_api();
541 let fetch_message_at_number = client
542 .number(fetch_message_at)?
543 .ok_or(Error::MissingBlockHeader)?;
544 let mut queries = runtime_api
545 .channels_and_state(fetch_message_at)?
546 .into_iter()
547 .filter_map(|(dst_chain_id, channel_id, channel_state)| {
548 get_channel_state_query(
549 backend,
550 client,
551 fetch_message_at,
552 self_chain_id,
553 dst_chain_id,
554 channel_id,
555 channel_state,
556 )
557 .ok()
558 .flatten()
559 })
560 .collect::<Vec<_>>();
561
562 let queries = if queries.len() <= MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK {
564 queries
565 } else {
566 let mut rng = rand::thread_rng();
567 queries.shuffle(&mut rng);
568 queries.truncate(MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK);
569 queries
570 };
571
572 let best_hash = client.info().best_hash;
573 let runtime_api_best = client.runtime_api();
574 let total_messages = queries
575 .into_iter()
576 .filter_map(|query| {
577 let BlockMessagesQuery {
578 chain_id: dst_chain_id,
579 channel_id,
580 ..
581 } = query;
582 let mut messages = runtime_api
583 .block_messages_with_query(fetch_message_at, query.clone())
584 .ok()?;
585
586 filter_block_messages::<Client, _, CBlock>(
588 &runtime_api_best,
589 best_hash,
590 query,
591 &mut messages,
592 )
593 .ok()?;
594
595 if !messages.outbox.is_empty()
596 && let Some(max_nonce) = messages.outbox.iter().map(|key| key.nonce).max()
597 {
598 set_channel_outbox_processed_state(
599 backend,
600 self_chain_id,
601 dst_chain_id,
602 ChannelProcessedState::<Block> {
603 block_number: fetch_message_at_number,
604 block_hash: fetch_message_at,
605 channel_id,
606 nonce: Some(max_nonce),
607 },
608 )
609 .ok()?;
610 }
611
612 if !messages.inbox_responses.is_empty()
613 && let Some(max_nonce) = messages.inbox_responses.iter().map(|key| key.nonce).max()
614 {
615 set_channel_inbox_response_processed_state(
616 backend,
617 self_chain_id,
618 dst_chain_id,
619 ChannelProcessedState::<Block> {
620 block_number: fetch_message_at_number,
621 block_hash: fetch_message_at,
622 channel_id,
623 nonce: Some(max_nonce),
624 },
625 )
626 .ok()?;
627 }
628
629 Some((dst_chain_id, channel_id, messages))
630 })
631 .collect();
632
633 Ok(total_messages)
634}
635
636fn get_channel_state_query<Backend, Client, Block>(
637 backend: &Backend,
638 client: &Arc<Client>,
639 fetch_message_at: Block::Hash,
640 self_chain_id: ChainId,
641 dst_chain_id: ChainId,
642 channel_id: ChannelId,
643 local_channel_state: ChannelStateWithNonce,
644) -> Result<Option<BlockMessagesQuery>, Error>
645where
646 Backend: AuxStore,
647 Block: BlockT,
648 Client: HeaderBackend<Block>,
649{
650 let maybe_dst_channel_state =
651 get_channel_state(backend, dst_chain_id, self_chain_id, channel_id)
652 .ok()
653 .flatten();
654
655 let last_processed_nonces = get_last_processed_nonces(
656 backend,
657 client,
658 fetch_message_at,
659 self_chain_id,
660 dst_chain_id,
661 channel_id,
662 )?;
663
664 let query = match (
665 maybe_dst_channel_state,
666 last_processed_nonces.outbox_nonce,
667 last_processed_nonces.inbox_response_nonce,
668 ) {
669 (None, None, None) => Some(BlockMessagesQuery {
671 chain_id: dst_chain_id,
672 channel_id,
673 outbox_from: Nonce::zero(),
674 inbox_responses_from: Nonce::zero(),
675 }),
676 (Some(dst_channel_state), None, None) => {
678 should_relay_messages_to_channel(dst_chain_id, &dst_channel_state, local_channel_state)
679 .then_some(BlockMessagesQuery {
680 chain_id: dst_chain_id,
681 channel_id,
682 outbox_from: dst_channel_state.next_inbox_nonce,
683 inbox_responses_from: dst_channel_state
684 .latest_response_received_message_nonce
685 .map(|nonce| nonce.saturating_add(One::one()))
687 .unwrap_or(Nonce::zero()),
688 })
689 }
690 (None, last_outbox_nonce, last_inbox_message_response_nonce) => Some(BlockMessagesQuery {
692 chain_id: dst_chain_id,
693 channel_id,
694 outbox_from: last_outbox_nonce
695 .map(|nonce| nonce.saturating_add(One::one()))
696 .unwrap_or(Nonce::zero()),
697 inbox_responses_from: last_inbox_message_response_nonce
698 .map(|nonce| nonce.saturating_add(One::one()))
699 .unwrap_or(Nonce::zero()),
700 }),
701 (Some(dst_channel_state), last_outbox_nonce, last_inbox_message_response_nonce) => {
702 should_relay_messages_to_channel(dst_chain_id, &dst_channel_state, local_channel_state)
703 .then(|| {
704 let next_outbox_nonce = max(
705 dst_channel_state.next_inbox_nonce,
706 last_outbox_nonce
707 .map(|nonce| nonce.saturating_add(One::one()))
708 .unwrap_or(Nonce::zero()),
709 );
710
711 let next_inbox_response_nonce = max(
712 dst_channel_state
713 .latest_response_received_message_nonce
714 .map(|nonce| nonce.saturating_add(One::one()))
715 .unwrap_or(Nonce::zero()),
716 last_inbox_message_response_nonce
717 .map(|nonce| nonce.saturating_add(One::one()))
718 .unwrap_or(Nonce::zero()),
719 );
720
721 BlockMessagesQuery {
722 chain_id: dst_chain_id,
723 channel_id,
724 outbox_from: next_outbox_nonce,
725 inbox_responses_from: next_inbox_response_nonce,
726 }
727 })
728 }
729 };
730
731 tracing::debug!(
732 "From Chain[{:?}] to Chain[{:?}] and Channel[{:?}] Query: {:?}",
733 self_chain_id,
734 dst_chain_id,
735 channel_id,
736 query
737 );
738
739 Ok(query)
740}
741
742fn should_relay_messages_to_channel(
743 dst_chain_id: ChainId,
744 dst_channel_state: &ChannelDetail,
745 local_channel_state: ChannelStateWithNonce,
746) -> bool {
747 let should_process = if dst_channel_state.state == ChannelState::Closed
748 && let ChannelStateWithNonce::Closed {
749 next_outbox_nonce,
750 next_inbox_nonce,
751 } = local_channel_state
752 {
753 let no_outbox_messages = next_outbox_nonce == dst_channel_state.next_inbox_nonce;
757
758 let no_inbox_responses_messages = dst_channel_state
762 .latest_response_received_message_nonce
763 .map(|nonce| nonce == next_inbox_nonce.saturating_sub(Nonce::one()))
764 .unwrap_or(false);
765
766 !(no_outbox_messages && no_inbox_responses_messages)
767 } else {
768 true
769 };
770
771 if !should_process {
772 tracing::debug!(
773 "Chain[{:?}] for Channel[{:?}] is closed and no messages to process",
774 dst_chain_id,
775 dst_channel_state.channel_id
776 );
777 }
778
779 should_process
780}