domain_client_message_relayer/
lib.rs

1#![warn(rust_2018_idioms)]
2// TODO: Restore once https://github.com/rust-lang/rust/issues/122105 is resolved
3// #![deny(unused_crate_dependencies)]
4
5mod aux_schema;
6pub mod worker;
7
8use crate::aux_schema::{
9    ChannelProcessedState, get_last_processed_nonces, set_channel_inbox_response_processed_state,
10    set_channel_outbox_processed_state,
11};
12use async_channel::TrySendError;
13use cross_domain_message_gossip::{
14    ChannelDetail, Message as GossipMessage, MessageData as GossipMessageData, get_channel_state,
15};
16use parity_scale_codec::{Codec, Encode};
17use rand::seq::SliceRandom;
18use sc_client_api::{AuxStore, HeaderBackend, ProofProvider, StorageProof};
19use sc_utils::mpsc::TracingUnboundedSender;
20use sp_api::{ApiRef, ProvideRuntimeApi};
21use sp_core::H256;
22use sp_domains::{ChannelId, DomainsApi};
23use sp_messenger::messages::{
24    BlockMessagesQuery, ChainId, ChannelState, ChannelStateWithNonce, CrossDomainMessage,
25    MessageNonceWithStorageKey, MessagesWithStorageKey, Nonce, Proof,
26};
27use sp_messenger::{MessengerApi, RelayerApi};
28use sp_mmr_primitives::MmrApi;
29use sp_runtime::ArithmeticError;
30use sp_runtime::traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One};
31use sp_subspace_mmr::ConsensusChainMmrLeafProof;
32use std::cmp::max;
33use std::marker::PhantomData;
34use std::sync::Arc;
35use subspace_runtime_primitives::BlockHashFor;
36
37const CHANNEL_PROCESSED_STATE_CACHE_LIMIT: u32 = 5;
38const MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK: usize = 15;
39
40/// Relayer relays messages between domains using consensus chain as trusted third party.
41struct Relayer<Client, Block>(PhantomData<(Client, Block)>);
42
43/// Sink used to submit all the gossip messages.
44pub type GossipMessageSink = TracingUnboundedSender<GossipMessage>;
45
46/// Relayer error types.
47#[derive(Debug)]
48pub enum Error {
49    /// Emits when storage proof construction fails.
50    ConstructStorageProof,
51    /// Emits when unsigned extrinsic construction fails.
52    FailedToConstructExtrinsic,
53    /// Emits when failed to fetch assigned messages for a given relayer.
54    FetchAssignedMessages,
55    /// Emits when failed to store the processed block number.
56    StoreRelayedBlockNumber,
57    /// Emits when unable to fetch domain_id.
58    UnableToFetchDomainId,
59    /// Emits when unable to fetch relay confirmation depth.
60    UnableToFetchRelayConfirmationDepth,
61    /// Blockchain related error.
62    BlockchainError(Box<sp_blockchain::Error>),
63    /// Arithmetic related error.
64    ArithmeticError(ArithmeticError),
65    /// Api related error.
66    ApiError(sp_api::ApiError),
67    /// Failed to submit a cross domain message
68    UnableToSubmitCrossDomainMessage(TrySendError<GossipMessage>),
69    /// Invalid ChainId
70    InvalidChainId,
71    /// Failed to generate MMR proof
72    MmrProof(sp_mmr_primitives::Error),
73    /// MMR Leaf missing
74    MmrLeafMissing,
75    /// Missing block header
76    MissingBlockHeader,
77    /// Missing block hash
78    MissingBlockHash,
79}
80
81impl From<sp_blockchain::Error> for Error {
82    #[inline]
83    fn from(err: sp_blockchain::Error) -> Self {
84        Error::BlockchainError(Box::new(err))
85    }
86}
87
88impl From<ArithmeticError> for Error {
89    #[inline]
90    fn from(err: ArithmeticError) -> Self {
91        Error::ArithmeticError(err)
92    }
93}
94
95impl From<sp_api::ApiError> for Error {
96    #[inline]
97    fn from(err: sp_api::ApiError) -> Self {
98        Error::ApiError(err)
99    }
100}
101
102type ProofOf<Block> = Proof<NumberFor<Block>, BlockHashFor<Block>, H256>;
103
104fn construct_consensus_mmr_proof<Client, Block>(
105    consensus_chain_client: &Arc<Client>,
106    dst_chain_id: ChainId,
107    (block_number, block_hash): (NumberFor<Block>, Block::Hash),
108) -> Result<ConsensusChainMmrLeafProof<NumberFor<Block>, Block::Hash, H256>, Error>
109where
110    Block: BlockT,
111    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
112    Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
113{
114    let api = consensus_chain_client.runtime_api();
115    let best_hash = consensus_chain_client.info().best_hash;
116    let best_number = consensus_chain_client.info().best_number;
117
118    let (prove_at_number, prove_at_hash) = match dst_chain_id {
119        // The consensus chain will verify the MMR proof statelessly with the MMR root
120        // stored in the runtime, we need to generate the proof with the best block
121        // with the latest MMR root in the runtime so the proof will be valid as long
122        // as the MMR root is available when verifying the proof.
123        ChainId::Consensus => (best_number, best_hash),
124        // The domain chain will verify the MMR proof with the offchain MMR leaf data
125        // in the consensus client, we need to generate the proof with the proving block
126        // (i.e. finalized block) to avoid potential consensus fork and ensure the verification
127        // result is deterministic.
128        ChainId::Domain(_) => (block_number, block_hash),
129    };
130
131    let (mut leaves, proof) = api
132        .generate_proof(best_hash, vec![block_number], Some(prove_at_number))
133        .map_err(Error::ApiError)?
134        .map_err(Error::MmrProof)?;
135    debug_assert!(leaves.len() == 1, "should always be of length 1");
136    let leaf = leaves.pop().ok_or(Error::MmrLeafMissing)?;
137
138    Ok(ConsensusChainMmrLeafProof {
139        consensus_block_number: prove_at_number,
140        consensus_block_hash: prove_at_hash,
141        opaque_mmr_leaf: leaf,
142        proof,
143    })
144}
145
146fn construct_cross_chain_message_and_submit<CNumber, CHash, Submitter, ProofConstructor>(
147    msgs: (ChainId, ChainId, ChannelId, Vec<MessageNonceWithStorageKey>),
148    proof_constructor: ProofConstructor,
149    submitter: Submitter,
150) -> Result<(), Error>
151where
152    Submitter: Fn(CrossDomainMessage<CNumber, CHash, H256>) -> Result<(), Error>,
153    ProofConstructor: Fn(&[u8], ChainId) -> Result<Proof<CNumber, CHash, H256>, Error>,
154{
155    let (src_chain_id, dst_chain_id, channel_id, msgs) = msgs;
156    for msg in msgs {
157        let proof = match proof_constructor(&msg.storage_key, dst_chain_id) {
158            Ok(proof) => proof,
159            Err(err) => {
160                tracing::error!(
161                    "Failed to construct storage proof for message: {:?} bound to chain: {:?} with error: {:?}",
162                    (channel_id, msg.nonce),
163                    dst_chain_id,
164                    err
165                );
166                continue;
167            }
168        };
169        let msg = CrossDomainMessage::from_relayer_msg_with_proof(
170            src_chain_id,
171            dst_chain_id,
172            channel_id,
173            msg,
174            proof,
175        );
176        let (dst_domain, msg_id) = (msg.dst_chain_id, (msg.channel_id, msg.nonce));
177        if let Err(err) = submitter(msg) {
178            tracing::error!(
179                ?err,
180                "Failed to submit message: {msg_id:?} to domain: {dst_domain:?}",
181            );
182        }
183    }
184
185    Ok(())
186}
187
188/// Sends an Outbox message from src_domain to dst_domain.
189fn gossip_outbox_message<Block, Client, CNumber, CHash>(
190    client: &Arc<Client>,
191    msg: CrossDomainMessage<CNumber, CHash, H256>,
192    sink: &GossipMessageSink,
193) -> Result<(), Error>
194where
195    Block: BlockT,
196    CNumber: Codec,
197    CHash: Codec,
198    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
199    Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
200{
201    let best_hash = client.info().best_hash;
202    let dst_chain_id = msg.dst_chain_id;
203    let msg_id = (msg.dst_chain_id, msg.channel_id, msg.nonce);
204    let ext = client
205        .runtime_api()
206        .outbox_message_unsigned(best_hash, msg)?
207        .ok_or(Error::FailedToConstructExtrinsic)?;
208
209    tracing::trace!("Submitting Outbox message: {:?}", msg_id);
210    sink.unbounded_send(GossipMessage {
211        chain_id: dst_chain_id,
212        data: GossipMessageData::Xdm(ext.encode()),
213    })
214    .map_err(Error::UnableToSubmitCrossDomainMessage)
215}
216
217/// Sends an Inbox message response from src_domain to dst_domain
218/// Inbox message was earlier sent by dst_domain to src_domain and
219/// this message is the response of the Inbox message execution.
220fn gossip_inbox_message_response<Block, Client, CNumber, CHash>(
221    client: &Arc<Client>,
222    msg: CrossDomainMessage<CNumber, CHash, H256>,
223    sink: &GossipMessageSink,
224) -> Result<(), Error>
225where
226    Block: BlockT,
227    CNumber: Codec,
228    CHash: Codec,
229    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
230    Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
231{
232    let best_hash = client.info().best_hash;
233    let dst_chain_id = msg.dst_chain_id;
234    let msg_id = (msg.dst_chain_id, msg.channel_id, msg.nonce);
235    let ext = client
236        .runtime_api()
237        .inbox_response_message_unsigned(best_hash, msg)?
238        .ok_or(Error::FailedToConstructExtrinsic)?;
239
240    tracing::trace!("Submitting Inbox response message: {:?}", msg_id);
241    sink.unbounded_send(GossipMessage {
242        chain_id: dst_chain_id,
243        data: GossipMessageData::Xdm(ext.encode()),
244    })
245    .map_err(Error::UnableToSubmitCrossDomainMessage)
246}
247
248// Fetch the XDM at the given block and filter any already relayed XDM according to the best block
249fn fetch_and_filter_messages<Client, Block, CClient, CBlock>(
250    client: &Arc<Client>,
251    fetch_message_at: Block::Hash,
252    consensus_client: &Arc<CClient>,
253    self_chain_id: ChainId,
254) -> Result<Vec<(ChainId, ChannelId, MessagesWithStorageKey)>, Error>
255where
256    CBlock: BlockT,
257    CClient: AuxStore + HeaderBackend<CBlock>,
258    Block: BlockT,
259    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
260    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
261{
262    fetch_messages::<_, _, Block, CBlock>(
263        &**consensus_client,
264        client,
265        fetch_message_at,
266        self_chain_id,
267    )
268}
269
270// A helper struct used when constructing XDM proof
271#[derive(Clone)]
272enum XDMProofData<CHash, DHash> {
273    Consensus(CHash),
274    Domain {
275        domain_proof: StorageProof,
276        confirmed_domain_block_hash: DHash,
277    },
278}
279
280impl<Client, Block> Relayer<Client, Block>
281where
282    Block: BlockT,
283    Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
284{
285    pub(crate) fn construct_and_submit_xdm<CClient, CBlock>(
286        chain_id: ChainId,
287        domain_client: &Arc<Client>,
288        consensus_chain_client: &Arc<CClient>,
289        confirmed_block_number: NumberFor<CBlock>,
290        gossip_message_sink: &GossipMessageSink,
291    ) -> Result<(), Error>
292    where
293        CBlock: BlockT,
294        CClient:
295            HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock> + AuxStore,
296        CClient::Api: DomainsApi<CBlock, Block::Header>
297            + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
298            + MmrApi<CBlock, H256, NumberFor<CBlock>>
299            + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
300        Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
301    {
302        // Since the block MMR leaf is included in the next block, we procees the XDM of block `confirmed_block_number - 1`
303        // and use the block `confirmed_block_number` to generate the MMR proof of block `confirmed_block_number - 1`
304        let mmr_consensus_block = (
305            confirmed_block_number,
306            consensus_chain_client
307                .hash(confirmed_block_number)?
308                .ok_or(Error::MissingBlockHash)?,
309        );
310        let (to_process_consensus_number, to_process_consensus_hash) =
311            match confirmed_block_number.checked_sub(&One::one()) {
312                None => return Ok(()),
313                Some(n) => {
314                    let h = consensus_chain_client
315                        .hash(n)?
316                        .ok_or(Error::MissingBlockHash)?;
317                    (n, h)
318                }
319            };
320
321        tracing::debug!(
322            "Checking messages to be submitted from chain: {chain_id:?} at block: ({to_process_consensus_number:?}, {to_process_consensus_hash:?})",
323        );
324
325        let consensus_chain_api = consensus_chain_client.runtime_api();
326        let (block_messages, maybe_domain_data) = match chain_id {
327            ChainId::Consensus => (
328                fetch_and_filter_messages::<_, _, _, CBlock>(
329                    consensus_chain_client,
330                    to_process_consensus_hash,
331                    consensus_chain_client,
332                    chain_id,
333                )?,
334                None,
335            ),
336            ChainId::Domain(domain_id) => {
337                let confirmed_domain_block_hash = {
338                    match consensus_chain_api
339                        .latest_confirmed_domain_block(to_process_consensus_hash, domain_id)?
340                    {
341                        Some((_, confirmed_domain_block_hash)) => confirmed_domain_block_hash,
342                        // No domain block confirmed yet so just return
343                        None => return Ok(()),
344                    }
345                };
346                (
347                    fetch_and_filter_messages::<_, _, _, CBlock>(
348                        domain_client,
349                        confirmed_domain_block_hash,
350                        consensus_chain_client,
351                        chain_id,
352                    )?,
353                    Some((domain_id, confirmed_domain_block_hash)),
354                )
355            }
356        };
357
358        // short circuit if the there are no messages to relay
359        if block_messages.is_empty() {
360            tracing::debug!("No messages from chain[{:?}]. Skipping..", chain_id);
361            return Ok(());
362        }
363
364        let xdm_proof_data = match maybe_domain_data {
365            None => XDMProofData::Consensus(to_process_consensus_hash),
366            Some((domain_id, confirmed_domain_block_hash)) => {
367                let storage_key = consensus_chain_api
368                    .confirmed_domain_block_storage_key(to_process_consensus_hash, domain_id)?;
369
370                let domain_proof = consensus_chain_client.read_proof(
371                    to_process_consensus_hash,
372                    &mut [storage_key.as_ref()].into_iter(),
373                )?;
374
375                XDMProofData::Domain {
376                    domain_proof,
377                    confirmed_domain_block_hash,
378                }
379            }
380        };
381
382        for (dst_chain_id, channel_id, messages) in block_messages {
383            tracing::debug!(
384                "Submitting messages to chain[{:?}] on Channel[{:?}] with [{:?}] Outbox messages",
385                dst_chain_id,
386                channel_id,
387                messages.outbox.len()
388            );
389            construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
390                (chain_id, dst_chain_id, channel_id, messages.outbox),
391                |key, dst_chain_id| {
392                    Self::construct_xdm_proof(
393                        consensus_chain_client,
394                        domain_client,
395                        dst_chain_id,
396                        mmr_consensus_block,
397                        key,
398                        xdm_proof_data.clone(),
399                    )
400                },
401                |msg| gossip_outbox_message(domain_client, msg, gossip_message_sink),
402            )?;
403
404            tracing::debug!(
405                "Submitting messages to chain[{:?}] on Channel[{:?}] with [{:?}] Inbox response messages",
406                dst_chain_id,
407                channel_id,
408                messages.inbox_responses.len()
409            );
410            construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
411                (chain_id, dst_chain_id, channel_id, messages.inbox_responses),
412                |key, dst_chain_id| {
413                    Self::construct_xdm_proof(
414                        consensus_chain_client,
415                        domain_client,
416                        dst_chain_id,
417                        mmr_consensus_block,
418                        key,
419                        xdm_proof_data.clone(),
420                    )
421                },
422                |msg| gossip_inbox_message_response(domain_client, msg, gossip_message_sink),
423            )?;
424        }
425
426        Ok(())
427    }
428
429    /// Constructs the proof for the given key using the domain backend.
430    fn construct_xdm_proof<CClient, CBlock>(
431        consensus_chain_client: &Arc<CClient>,
432        domain_client: &Arc<Client>,
433        dst_chain_id: ChainId,
434        mmr_consensus_block: (NumberFor<CBlock>, CBlock::Hash),
435        message_storage_key: &[u8],
436        xdm_proof_data: XDMProofData<CBlock::Hash, Block::Hash>,
437    ) -> Result<ProofOf<CBlock>, Error>
438    where
439        CBlock: BlockT,
440        CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock>,
441        CClient::Api: DomainsApi<CBlock, Block::Header>
442            + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
443            + MmrApi<CBlock, H256, NumberFor<CBlock>>,
444    {
445        let consensus_chain_mmr_proof = construct_consensus_mmr_proof(
446            consensus_chain_client,
447            dst_chain_id,
448            mmr_consensus_block,
449        )?;
450
451        let proof = match xdm_proof_data {
452            XDMProofData::Consensus(at_consensus_hash) => {
453                let message_proof = consensus_chain_client
454                    .read_proof(at_consensus_hash, &mut [message_storage_key].into_iter())
455                    .map_err(|_| Error::ConstructStorageProof)?;
456
457                Proof::Consensus {
458                    consensus_chain_mmr_proof,
459                    message_proof,
460                }
461            }
462            XDMProofData::Domain {
463                domain_proof,
464                confirmed_domain_block_hash,
465            } => {
466                let message_proof = domain_client
467                    .read_proof(
468                        confirmed_domain_block_hash,
469                        &mut [message_storage_key].into_iter(),
470                    )
471                    .map_err(|_| Error::ConstructStorageProof)?;
472
473                Proof::Domain {
474                    consensus_chain_mmr_proof,
475                    domain_proof,
476                    message_proof,
477                }
478            }
479        };
480        Ok(proof)
481    }
482}
483
484fn filter_block_messages<Client, Block, CBlock>(
485    api: &ApiRef<'_, Client::Api>,
486    best_hash: Block::Hash,
487    query: BlockMessagesQuery,
488    messages: &mut MessagesWithStorageKey,
489) -> Result<(), Error>
490where
491    Block: BlockT,
492    CBlock: BlockT,
493    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
494    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
495{
496    let BlockMessagesQuery {
497        chain_id,
498        channel_id,
499        outbox_from,
500        inbox_responses_from,
501    } = query;
502    let maybe_outbox_nonce =
503        api.first_outbox_message_nonce_to_relay(best_hash, chain_id, channel_id, outbox_from)?;
504    let maybe_inbox_response_nonce = api.first_inbox_message_response_nonce_to_relay(
505        best_hash,
506        chain_id,
507        channel_id,
508        inbox_responses_from,
509    )?;
510
511    if let Some(nonce) = maybe_outbox_nonce {
512        messages.outbox.retain(|msg| msg.nonce >= nonce)
513    } else {
514        messages.outbox.clear()
515    }
516
517    if let Some(nonce) = maybe_inbox_response_nonce {
518        messages.inbox_responses.retain(|msg| msg.nonce >= nonce)
519    } else {
520        messages.inbox_responses.clear();
521    }
522
523    Ok(())
524}
525
526// Fetch the unprocessed XDMs at a given block
527fn fetch_messages<Backend, Client, Block, CBlock>(
528    backend: &Backend,
529    client: &Arc<Client>,
530    fetch_message_at: Block::Hash,
531    self_chain_id: ChainId,
532) -> Result<Vec<(ChainId, ChannelId, MessagesWithStorageKey)>, Error>
533where
534    Block: BlockT,
535    CBlock: BlockT,
536    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
537    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
538    Backend: AuxStore,
539{
540    let runtime_api = client.runtime_api();
541    let fetch_message_at_number = client
542        .number(fetch_message_at)?
543        .ok_or(Error::MissingBlockHeader)?;
544    let mut queries = runtime_api
545        .channels_and_state(fetch_message_at)?
546        .into_iter()
547        .filter_map(|(dst_chain_id, channel_id, channel_state)| {
548            get_channel_state_query(
549                backend,
550                client,
551                fetch_message_at,
552                self_chain_id,
553                dst_chain_id,
554                channel_id,
555                channel_state,
556            )
557            .ok()
558            .flatten()
559        })
560        .collect::<Vec<_>>();
561
562    // pick random 15 queries
563    let queries = if queries.len() <= MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK {
564        queries
565    } else {
566        let mut rng = rand::thread_rng();
567        queries.shuffle(&mut rng);
568        queries.truncate(MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK);
569        queries
570    };
571
572    let best_hash = client.info().best_hash;
573    let runtime_api_best = client.runtime_api();
574    let total_messages = queries
575        .into_iter()
576        .filter_map(|query| {
577            let BlockMessagesQuery {
578                chain_id: dst_chain_id,
579                channel_id,
580                ..
581            } = query;
582            let mut messages = runtime_api
583                .block_messages_with_query(fetch_message_at, query.clone())
584                .ok()?;
585
586            // filter messages with best hash
587            filter_block_messages::<Client, _, CBlock>(
588                &runtime_api_best,
589                best_hash,
590                query,
591                &mut messages,
592            )
593            .ok()?;
594
595            if !messages.outbox.is_empty()
596                && let Some(max_nonce) = messages.outbox.iter().map(|key| key.nonce).max()
597            {
598                set_channel_outbox_processed_state(
599                    backend,
600                    self_chain_id,
601                    dst_chain_id,
602                    ChannelProcessedState::<Block> {
603                        block_number: fetch_message_at_number,
604                        block_hash: fetch_message_at,
605                        channel_id,
606                        nonce: Some(max_nonce),
607                    },
608                )
609                .ok()?;
610            }
611
612            if !messages.inbox_responses.is_empty()
613                && let Some(max_nonce) = messages.inbox_responses.iter().map(|key| key.nonce).max()
614            {
615                set_channel_inbox_response_processed_state(
616                    backend,
617                    self_chain_id,
618                    dst_chain_id,
619                    ChannelProcessedState::<Block> {
620                        block_number: fetch_message_at_number,
621                        block_hash: fetch_message_at,
622                        channel_id,
623                        nonce: Some(max_nonce),
624                    },
625                )
626                .ok()?;
627            }
628
629            Some((dst_chain_id, channel_id, messages))
630        })
631        .collect();
632
633    Ok(total_messages)
634}
635
636fn get_channel_state_query<Backend, Client, Block>(
637    backend: &Backend,
638    client: &Arc<Client>,
639    fetch_message_at: Block::Hash,
640    self_chain_id: ChainId,
641    dst_chain_id: ChainId,
642    channel_id: ChannelId,
643    local_channel_state: ChannelStateWithNonce,
644) -> Result<Option<BlockMessagesQuery>, Error>
645where
646    Backend: AuxStore,
647    Block: BlockT,
648    Client: HeaderBackend<Block>,
649{
650    let maybe_dst_channel_state =
651        get_channel_state(backend, dst_chain_id, self_chain_id, channel_id)
652            .ok()
653            .flatten();
654
655    let last_processed_nonces = get_last_processed_nonces(
656        backend,
657        client,
658        fetch_message_at,
659        self_chain_id,
660        dst_chain_id,
661        channel_id,
662    )?;
663
664    let query = match (
665        maybe_dst_channel_state,
666        last_processed_nonces.outbox_nonce,
667        last_processed_nonces.inbox_response_nonce,
668    ) {
669        // don't have any info on channel, so assume from the beginning
670        (None, None, None) => Some(BlockMessagesQuery {
671            chain_id: dst_chain_id,
672            channel_id,
673            outbox_from: Nonce::zero(),
674            inbox_responses_from: Nonce::zero(),
675        }),
676        // don't have channel processed state, so use the dst_channel state for query
677        (Some(dst_channel_state), None, None) => {
678            should_relay_messages_to_channel(dst_chain_id, &dst_channel_state, local_channel_state)
679                .then_some(BlockMessagesQuery {
680                    chain_id: dst_chain_id,
681                    channel_id,
682                    outbox_from: dst_channel_state.next_inbox_nonce,
683                    inbox_responses_from: dst_channel_state
684                        .latest_response_received_message_nonce
685                        // pick the next inbox message response nonce or default to zero
686                        .map(|nonce| nonce.saturating_add(One::one()))
687                        .unwrap_or(Nonce::zero()),
688                })
689        }
690        // don't have dst channel state, so use the last processed channel state
691        (None, last_outbox_nonce, last_inbox_message_response_nonce) => Some(BlockMessagesQuery {
692            chain_id: dst_chain_id,
693            channel_id,
694            outbox_from: last_outbox_nonce
695                .map(|nonce| nonce.saturating_add(One::one()))
696                .unwrap_or(Nonce::zero()),
697            inbox_responses_from: last_inbox_message_response_nonce
698                .map(|nonce| nonce.saturating_add(One::one()))
699                .unwrap_or(Nonce::zero()),
700        }),
701        (Some(dst_channel_state), last_outbox_nonce, last_inbox_message_response_nonce) => {
702            should_relay_messages_to_channel(dst_chain_id, &dst_channel_state, local_channel_state)
703                .then(|| {
704                    let next_outbox_nonce = max(
705                        dst_channel_state.next_inbox_nonce,
706                        last_outbox_nonce
707                            .map(|nonce| nonce.saturating_add(One::one()))
708                            .unwrap_or(Nonce::zero()),
709                    );
710
711                    let next_inbox_response_nonce = max(
712                        dst_channel_state
713                            .latest_response_received_message_nonce
714                            .map(|nonce| nonce.saturating_add(One::one()))
715                            .unwrap_or(Nonce::zero()),
716                        last_inbox_message_response_nonce
717                            .map(|nonce| nonce.saturating_add(One::one()))
718                            .unwrap_or(Nonce::zero()),
719                    );
720
721                    BlockMessagesQuery {
722                        chain_id: dst_chain_id,
723                        channel_id,
724                        outbox_from: next_outbox_nonce,
725                        inbox_responses_from: next_inbox_response_nonce,
726                    }
727                })
728        }
729    };
730
731    tracing::debug!(
732        "From Chain[{:?}] to Chain[{:?}] and Channel[{:?}] Query: {:?}",
733        self_chain_id,
734        dst_chain_id,
735        channel_id,
736        query
737    );
738
739    Ok(query)
740}
741
742fn should_relay_messages_to_channel(
743    dst_chain_id: ChainId,
744    dst_channel_state: &ChannelDetail,
745    local_channel_state: ChannelStateWithNonce,
746) -> bool {
747    let should_process = if dst_channel_state.state == ChannelState::Closed
748        && let ChannelStateWithNonce::Closed {
749            next_outbox_nonce,
750            next_inbox_nonce,
751        } = local_channel_state
752    {
753        // if the next outbox nonce of local channel is same as
754        // next inbox nonce of dst_channel, then there are no further
755        // outbox messages to be sent from the local channel
756        let no_outbox_messages = next_outbox_nonce == dst_channel_state.next_inbox_nonce;
757
758        // if next inbox nonce of local channel is +1 of
759        // last received response nonce on dst_chain, then there are
760        // no further messages responses to be sent from the local channel
761        let no_inbox_responses_messages = dst_channel_state
762            .latest_response_received_message_nonce
763            .map(|nonce| nonce == next_inbox_nonce.saturating_sub(Nonce::one()))
764            .unwrap_or(false);
765
766        !(no_outbox_messages && no_inbox_responses_messages)
767    } else {
768        true
769    };
770
771    if !should_process {
772        tracing::debug!(
773            "Chain[{:?}] for Channel[{:?}] is closed and no messages to process",
774            dst_chain_id,
775            dst_channel_state.channel_id
776        );
777    }
778
779    should_process
780}