domain_client_message_relayer/
lib.rs

1#![warn(rust_2018_idioms)]
2// TODO: Restore once https://github.com/rust-lang/rust/issues/122105 is resolved
3// #![deny(unused_crate_dependencies)]
4
5mod aux_schema;
6pub mod worker;
7
8use crate::aux_schema::{
9    ChannelProcessedState, get_last_processed_nonces, set_channel_inbox_response_processed_state,
10    set_channel_outbox_processed_state,
11};
12use async_channel::TrySendError;
13use cross_domain_message_gossip::{
14    ChannelDetail, Message as GossipMessage, MessageData as GossipMessageData, get_channel_state,
15};
16use parity_scale_codec::{Codec, Encode};
17use rand::seq::SliceRandom;
18use sc_client_api::{AuxStore, HeaderBackend, ProofProvider, StorageProof};
19use sc_utils::mpsc::TracingUnboundedSender;
20use sp_api::{ApiExt, ApiRef, ProvideRuntimeApi};
21use sp_core::H256;
22use sp_domains::{ChannelId, DomainsApi};
23use sp_messenger::messages::{
24    BlockMessagesQuery, ChainId, ChannelState, ChannelStateWithNonce, CrossDomainMessage,
25    MessageNonceWithStorageKey, MessagesWithStorageKey, Nonce, Proof,
26};
27use sp_messenger::{MessengerApi, RelayerApi};
28use sp_mmr_primitives::MmrApi;
29use sp_runtime::ArithmeticError;
30use sp_runtime::traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One};
31use sp_subspace_mmr::ConsensusChainMmrLeafProof;
32use std::cmp::max;
33use std::marker::PhantomData;
34use std::sync::Arc;
35use subspace_runtime_primitives::BlockHashFor;
36
37const CHANNEL_PROCESSED_STATE_CACHE_LIMIT: u32 = 5;
38const MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK: usize = 15;
39
40/// Relayer relays messages between domains using consensus chain as trusted third party.
41struct Relayer<Client, Block>(PhantomData<(Client, Block)>);
42
43/// Sink used to submit all the gossip messages.
44pub type GossipMessageSink = TracingUnboundedSender<GossipMessage>;
45
46/// Relayer error types.
47#[derive(Debug)]
48pub enum Error {
49    /// Emits when storage proof construction fails.
50    ConstructStorageProof,
51    /// Emits when unsigned extrinsic construction fails.
52    FailedToConstructExtrinsic,
53    /// Emits when failed to fetch assigned messages for a given relayer.
54    FetchAssignedMessages,
55    /// Emits when failed to store the processed block number.
56    StoreRelayedBlockNumber,
57    /// Emits when unable to fetch domain_id.
58    UnableToFetchDomainId,
59    /// Emits when unable to fetch relay confirmation depth.
60    UnableToFetchRelayConfirmationDepth,
61    /// Blockchain related error.
62    BlockchainError(Box<sp_blockchain::Error>),
63    /// Arithmetic related error.
64    ArithmeticError(ArithmeticError),
65    /// Api related error.
66    ApiError(sp_api::ApiError),
67    /// Failed to submit a cross domain message
68    UnableToSubmitCrossDomainMessage(TrySendError<GossipMessage>),
69    /// Invalid ChainId
70    InvalidChainId,
71    /// Failed to generate MMR proof
72    MmrProof(sp_mmr_primitives::Error),
73    /// MMR Leaf missing
74    MmrLeafMissing,
75    /// Missing block header
76    MissingBlockHeader,
77    /// Missing block hash
78    MissingBlockHash,
79}
80
81impl From<sp_blockchain::Error> for Error {
82    #[inline]
83    fn from(err: sp_blockchain::Error) -> Self {
84        Error::BlockchainError(Box::new(err))
85    }
86}
87
88impl From<ArithmeticError> for Error {
89    #[inline]
90    fn from(err: ArithmeticError) -> Self {
91        Error::ArithmeticError(err)
92    }
93}
94
95impl From<sp_api::ApiError> for Error {
96    #[inline]
97    fn from(err: sp_api::ApiError) -> Self {
98        Error::ApiError(err)
99    }
100}
101
102type ProofOf<Block> = Proof<NumberFor<Block>, BlockHashFor<Block>, H256>;
103
104fn construct_consensus_mmr_proof<Client, Block>(
105    consensus_chain_client: &Arc<Client>,
106    dst_chain_id: ChainId,
107    (block_number, block_hash): (NumberFor<Block>, Block::Hash),
108) -> Result<ConsensusChainMmrLeafProof<NumberFor<Block>, Block::Hash, H256>, Error>
109where
110    Block: BlockT,
111    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
112    Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
113{
114    let api = consensus_chain_client.runtime_api();
115    let best_hash = consensus_chain_client.info().best_hash;
116    let best_number = consensus_chain_client.info().best_number;
117
118    let (prove_at_number, prove_at_hash) = match dst_chain_id {
119        // The consensus chain will verify the MMR proof statelessly with the MMR root
120        // stored in the runtime, we need to generate the proof with the best block
121        // with the latest MMR root in the runtime so the proof will be valid as long
122        // as the MMR root is available when verifying the proof.
123        ChainId::Consensus => (best_number, best_hash),
124        // The domain chain will verify the MMR proof with the offchain MMR leaf data
125        // in the consensus client, we need to generate the proof with the proving block
126        // (i.e. finalized block) to avoid potential consensus fork and ensure the verification
127        // result is deterministic.
128        ChainId::Domain(_) => (block_number, block_hash),
129    };
130
131    let (mut leaves, proof) = api
132        .generate_proof(best_hash, vec![block_number], Some(prove_at_number))
133        .map_err(Error::ApiError)?
134        .map_err(Error::MmrProof)?;
135    debug_assert!(leaves.len() == 1, "should always be of length 1");
136    let leaf = leaves.pop().ok_or(Error::MmrLeafMissing)?;
137
138    Ok(ConsensusChainMmrLeafProof {
139        consensus_block_number: prove_at_number,
140        consensus_block_hash: prove_at_hash,
141        opaque_mmr_leaf: leaf,
142        proof,
143    })
144}
145
146fn construct_cross_chain_message_and_submit<CNumber, CHash, Submitter, ProofConstructor>(
147    msgs: (ChainId, ChainId, ChannelId, Vec<MessageNonceWithStorageKey>),
148    proof_constructor: ProofConstructor,
149    submitter: Submitter,
150) -> Result<(), Error>
151where
152    Submitter: Fn(CrossDomainMessage<CNumber, CHash, H256>) -> Result<(), Error>,
153    ProofConstructor: Fn(&[u8], ChainId) -> Result<Proof<CNumber, CHash, H256>, Error>,
154{
155    let (src_chain_id, dst_chain_id, channel_id, msgs) = msgs;
156    for msg in msgs {
157        let proof = match proof_constructor(&msg.storage_key, dst_chain_id) {
158            Ok(proof) => proof,
159            Err(err) => {
160                tracing::error!(
161                    "Failed to construct storage proof for message: {:?} bound to chain: {:?} with error: {:?}",
162                    (channel_id, msg.nonce),
163                    dst_chain_id,
164                    err
165                );
166                continue;
167            }
168        };
169        let msg = CrossDomainMessage::from_relayer_msg_with_proof(
170            src_chain_id,
171            dst_chain_id,
172            channel_id,
173            msg,
174            proof,
175        );
176        let (dst_domain, msg_id) = (msg.dst_chain_id, (msg.channel_id, msg.nonce));
177        if let Err(err) = submitter(msg) {
178            tracing::error!(
179                ?err,
180                "Failed to submit message: {msg_id:?} to domain: {dst_domain:?}",
181            );
182        }
183    }
184
185    Ok(())
186}
187
188/// Sends an Outbox message from src_domain to dst_domain.
189fn gossip_outbox_message<Block, Client, CNumber, CHash>(
190    client: &Arc<Client>,
191    msg: CrossDomainMessage<CNumber, CHash, H256>,
192    sink: &GossipMessageSink,
193) -> Result<(), Error>
194where
195    Block: BlockT,
196    CNumber: Codec,
197    CHash: Codec,
198    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
199    Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
200{
201    let best_hash = client.info().best_hash;
202    let dst_chain_id = msg.dst_chain_id;
203    let msg_id = (msg.dst_chain_id, msg.channel_id, msg.nonce);
204    let ext = client
205        .runtime_api()
206        .outbox_message_unsigned(best_hash, msg)?
207        .ok_or(Error::FailedToConstructExtrinsic)?;
208
209    tracing::trace!("Submitting Outbox message: {:?}", msg_id);
210    sink.unbounded_send(GossipMessage {
211        chain_id: dst_chain_id,
212        data: GossipMessageData::Xdm(ext.encode()),
213    })
214    .map_err(Error::UnableToSubmitCrossDomainMessage)
215}
216
217/// Sends an Inbox message response from src_domain to dst_domain
218/// Inbox message was earlier sent by dst_domain to src_domain and
219/// this message is the response of the Inbox message execution.
220fn gossip_inbox_message_response<Block, Client, CNumber, CHash>(
221    client: &Arc<Client>,
222    msg: CrossDomainMessage<CNumber, CHash, H256>,
223    sink: &GossipMessageSink,
224) -> Result<(), Error>
225where
226    Block: BlockT,
227    CNumber: Codec,
228    CHash: Codec,
229    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
230    Client::Api: RelayerApi<Block, NumberFor<Block>, CNumber, CHash>,
231{
232    let best_hash = client.info().best_hash;
233    let dst_chain_id = msg.dst_chain_id;
234    let msg_id = (msg.dst_chain_id, msg.channel_id, msg.nonce);
235    let ext = client
236        .runtime_api()
237        .inbox_response_message_unsigned(best_hash, msg)?
238        .ok_or(Error::FailedToConstructExtrinsic)?;
239
240    tracing::trace!("Submitting Inbox response message: {:?}", msg_id);
241    sink.unbounded_send(GossipMessage {
242        chain_id: dst_chain_id,
243        data: GossipMessageData::Xdm(ext.encode()),
244    })
245    .map_err(Error::UnableToSubmitCrossDomainMessage)
246}
247
248// Fetch the XDM at the given block and filter any already relayed XDM according to the best block
249fn fetch_and_filter_messages<Client, Block, CClient, CBlock>(
250    client: &Arc<Client>,
251    fetch_message_at: Block::Hash,
252    consensus_client: &Arc<CClient>,
253    self_chain_id: ChainId,
254) -> Result<Vec<(ChainId, ChannelId, MessagesWithStorageKey)>, Error>
255where
256    CBlock: BlockT,
257    CClient: AuxStore + HeaderBackend<CBlock>,
258    Block: BlockT,
259    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
260    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
261{
262    // return no messages for previous relayer version
263    if !is_relayer_api_version_available::<_, Block, CBlock>(client, 3, fetch_message_at) {
264        return Ok(vec![]);
265    }
266
267    fetch_messages::<_, _, Block, CBlock>(
268        &**consensus_client,
269        client,
270        fetch_message_at,
271        self_chain_id,
272    )
273}
274
275// A helper struct used when constructing XDM proof
276#[derive(Clone)]
277enum XDMProofData<CHash, DHash> {
278    Consensus(CHash),
279    Domain {
280        domain_proof: StorageProof,
281        confirmed_domain_block_hash: DHash,
282    },
283}
284
285impl<Client, Block> Relayer<Client, Block>
286where
287    Block: BlockT,
288    Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
289{
290    pub(crate) fn construct_and_submit_xdm<CClient, CBlock>(
291        chain_id: ChainId,
292        domain_client: &Arc<Client>,
293        consensus_chain_client: &Arc<CClient>,
294        confirmed_block_number: NumberFor<CBlock>,
295        gossip_message_sink: &GossipMessageSink,
296    ) -> Result<(), Error>
297    where
298        CBlock: BlockT,
299        CClient:
300            HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock> + AuxStore,
301        CClient::Api: DomainsApi<CBlock, Block::Header>
302            + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
303            + MmrApi<CBlock, H256, NumberFor<CBlock>>
304            + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
305        Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
306    {
307        // Since the block MMR leaf is included in the next block, we procees the XDM of block `confirmed_block_number - 1`
308        // and use the block `confirmed_block_number` to generate the MMR proof of block `confirmed_block_number - 1`
309        let mmr_consensus_block = (
310            confirmed_block_number,
311            consensus_chain_client
312                .hash(confirmed_block_number)?
313                .ok_or(Error::MissingBlockHash)?,
314        );
315        let (to_process_consensus_number, to_process_consensus_hash) =
316            match confirmed_block_number.checked_sub(&One::one()) {
317                None => return Ok(()),
318                Some(n) => {
319                    let h = consensus_chain_client
320                        .hash(n)?
321                        .ok_or(Error::MissingBlockHash)?;
322                    (n, h)
323                }
324            };
325
326        tracing::debug!(
327            "Checking messages to be submitted from chain: {chain_id:?} at block: ({to_process_consensus_number:?}, {to_process_consensus_hash:?})",
328        );
329
330        let consensus_chain_api = consensus_chain_client.runtime_api();
331        let (block_messages, maybe_domain_data) = match chain_id {
332            ChainId::Consensus => (
333                fetch_and_filter_messages::<_, _, _, CBlock>(
334                    consensus_chain_client,
335                    to_process_consensus_hash,
336                    consensus_chain_client,
337                    chain_id,
338                )?,
339                None,
340            ),
341            ChainId::Domain(domain_id) => {
342                let confirmed_domain_block_hash = {
343                    match consensus_chain_api
344                        .latest_confirmed_domain_block(to_process_consensus_hash, domain_id)?
345                    {
346                        Some((_, confirmed_domain_block_hash)) => confirmed_domain_block_hash,
347                        // No domain block confirmed yet so just return
348                        None => return Ok(()),
349                    }
350                };
351                (
352                    fetch_and_filter_messages::<_, _, _, CBlock>(
353                        domain_client,
354                        confirmed_domain_block_hash,
355                        consensus_chain_client,
356                        chain_id,
357                    )?,
358                    Some((domain_id, confirmed_domain_block_hash)),
359                )
360            }
361        };
362
363        // short circuit if the there are no messages to relay
364        if block_messages.is_empty() {
365            tracing::debug!("No messages from chain[{:?}]. Skipping..", chain_id);
366            return Ok(());
367        }
368
369        let xdm_proof_data = match maybe_domain_data {
370            None => XDMProofData::Consensus(to_process_consensus_hash),
371            Some((domain_id, confirmed_domain_block_hash)) => {
372                let storage_key = consensus_chain_api
373                    .confirmed_domain_block_storage_key(to_process_consensus_hash, domain_id)?;
374
375                let domain_proof = consensus_chain_client.read_proof(
376                    to_process_consensus_hash,
377                    &mut [storage_key.as_ref()].into_iter(),
378                )?;
379
380                XDMProofData::Domain {
381                    domain_proof,
382                    confirmed_domain_block_hash,
383                }
384            }
385        };
386
387        for (dst_chain_id, channel_id, messages) in block_messages {
388            tracing::debug!(
389                "Submitting messages to chain[{:?}] on Channel[{:?}] with [{:?}] Outbox messages",
390                dst_chain_id,
391                channel_id,
392                messages.outbox.len()
393            );
394            construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
395                (chain_id, dst_chain_id, channel_id, messages.outbox),
396                |key, dst_chain_id| {
397                    Self::construct_xdm_proof(
398                        consensus_chain_client,
399                        domain_client,
400                        dst_chain_id,
401                        mmr_consensus_block,
402                        key,
403                        xdm_proof_data.clone(),
404                    )
405                },
406                |msg| gossip_outbox_message(domain_client, msg, gossip_message_sink),
407            )?;
408
409            tracing::debug!(
410                "Submitting messages to chain[{:?}] on Channel[{:?}] with [{:?}] Inbox response messages",
411                dst_chain_id,
412                channel_id,
413                messages.inbox_responses.len()
414            );
415            construct_cross_chain_message_and_submit::<NumberFor<CBlock>, CBlock::Hash, _, _>(
416                (chain_id, dst_chain_id, channel_id, messages.inbox_responses),
417                |key, dst_chain_id| {
418                    Self::construct_xdm_proof(
419                        consensus_chain_client,
420                        domain_client,
421                        dst_chain_id,
422                        mmr_consensus_block,
423                        key,
424                        xdm_proof_data.clone(),
425                    )
426                },
427                |msg| gossip_inbox_message_response(domain_client, msg, gossip_message_sink),
428            )?;
429        }
430
431        Ok(())
432    }
433
434    /// Constructs the proof for the given key using the domain backend.
435    fn construct_xdm_proof<CClient, CBlock>(
436        consensus_chain_client: &Arc<CClient>,
437        domain_client: &Arc<Client>,
438        dst_chain_id: ChainId,
439        mmr_consensus_block: (NumberFor<CBlock>, CBlock::Hash),
440        message_storage_key: &[u8],
441        xdm_proof_data: XDMProofData<CBlock::Hash, Block::Hash>,
442    ) -> Result<ProofOf<CBlock>, Error>
443    where
444        CBlock: BlockT,
445        CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + ProofProvider<CBlock>,
446        CClient::Api: DomainsApi<CBlock, Block::Header>
447            + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
448            + MmrApi<CBlock, H256, NumberFor<CBlock>>,
449    {
450        let consensus_chain_mmr_proof = construct_consensus_mmr_proof(
451            consensus_chain_client,
452            dst_chain_id,
453            mmr_consensus_block,
454        )?;
455
456        let proof = match xdm_proof_data {
457            XDMProofData::Consensus(at_consensus_hash) => {
458                let message_proof = consensus_chain_client
459                    .read_proof(at_consensus_hash, &mut [message_storage_key].into_iter())
460                    .map_err(|_| Error::ConstructStorageProof)?;
461
462                Proof::Consensus {
463                    consensus_chain_mmr_proof,
464                    message_proof,
465                }
466            }
467            XDMProofData::Domain {
468                domain_proof,
469                confirmed_domain_block_hash,
470            } => {
471                let message_proof = domain_client
472                    .read_proof(
473                        confirmed_domain_block_hash,
474                        &mut [message_storage_key].into_iter(),
475                    )
476                    .map_err(|_| Error::ConstructStorageProof)?;
477
478                Proof::Domain {
479                    consensus_chain_mmr_proof,
480                    domain_proof,
481                    message_proof,
482                }
483            }
484        };
485        Ok(proof)
486    }
487}
488
489fn filter_block_messages<Client, Block, CBlock>(
490    api: &ApiRef<'_, Client::Api>,
491    best_hash: Block::Hash,
492    query: BlockMessagesQuery,
493    messages: &mut MessagesWithStorageKey,
494) -> Result<(), Error>
495where
496    Block: BlockT,
497    CBlock: BlockT,
498    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
499    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
500{
501    let BlockMessagesQuery {
502        chain_id,
503        channel_id,
504        outbox_from,
505        inbox_responses_from,
506    } = query;
507    let maybe_outbox_nonce =
508        api.first_outbox_message_nonce_to_relay(best_hash, chain_id, channel_id, outbox_from)?;
509    let maybe_inbox_response_nonce = api.first_inbox_message_response_nonce_to_relay(
510        best_hash,
511        chain_id,
512        channel_id,
513        inbox_responses_from,
514    )?;
515
516    if let Some(nonce) = maybe_outbox_nonce {
517        messages.outbox.retain(|msg| msg.nonce >= nonce)
518    } else {
519        messages.outbox.clear()
520    }
521
522    if let Some(nonce) = maybe_inbox_response_nonce {
523        messages.inbox_responses.retain(|msg| msg.nonce >= nonce)
524    } else {
525        messages.inbox_responses.clear();
526    }
527
528    Ok(())
529}
530
531// Fetch the unprocessed XDMs at a given block
532fn fetch_messages<Backend, Client, Block, CBlock>(
533    backend: &Backend,
534    client: &Arc<Client>,
535    fetch_message_at: Block::Hash,
536    self_chain_id: ChainId,
537) -> Result<Vec<(ChainId, ChannelId, MessagesWithStorageKey)>, Error>
538where
539    Block: BlockT,
540    CBlock: BlockT,
541    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
542    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
543    Backend: AuxStore,
544{
545    let runtime_api = client.runtime_api();
546    let fetch_message_at_number = client
547        .number(fetch_message_at)?
548        .ok_or(Error::MissingBlockHeader)?;
549    let mut queries = runtime_api
550        .channels_and_state(fetch_message_at)?
551        .into_iter()
552        .filter_map(|(dst_chain_id, channel_id, channel_state)| {
553            get_channel_state_query(
554                backend,
555                client,
556                fetch_message_at,
557                self_chain_id,
558                dst_chain_id,
559                channel_id,
560                channel_state,
561            )
562            .ok()
563            .flatten()
564        })
565        .collect::<Vec<_>>();
566
567    // pick random 15 queries
568    let queries = if queries.len() <= MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK {
569        queries
570    } else {
571        let mut rng = rand::thread_rng();
572        queries.shuffle(&mut rng);
573        queries.truncate(MAXIMUM_CHANNELS_TO_PROCESS_IN_BLOCK);
574        queries
575    };
576
577    let best_hash = client.info().best_hash;
578    let runtime_api_best = client.runtime_api();
579    let total_messages = queries
580        .into_iter()
581        .filter_map(|query| {
582            let BlockMessagesQuery {
583                chain_id: dst_chain_id,
584                channel_id,
585                ..
586            } = query;
587            let mut messages = runtime_api
588                .block_messages_with_query(fetch_message_at, query.clone())
589                .ok()?;
590
591            // filter messages with best hash
592            filter_block_messages::<Client, _, CBlock>(
593                &runtime_api_best,
594                best_hash,
595                query,
596                &mut messages,
597            )
598            .ok()?;
599
600            if !messages.outbox.is_empty()
601                && let Some(max_nonce) = messages.outbox.iter().map(|key| key.nonce).max()
602            {
603                set_channel_outbox_processed_state(
604                    backend,
605                    self_chain_id,
606                    dst_chain_id,
607                    ChannelProcessedState::<Block> {
608                        block_number: fetch_message_at_number,
609                        block_hash: fetch_message_at,
610                        channel_id,
611                        nonce: Some(max_nonce),
612                    },
613                )
614                .ok()?;
615            }
616
617            if !messages.inbox_responses.is_empty()
618                && let Some(max_nonce) = messages.inbox_responses.iter().map(|key| key.nonce).max()
619            {
620                set_channel_inbox_response_processed_state(
621                    backend,
622                    self_chain_id,
623                    dst_chain_id,
624                    ChannelProcessedState::<Block> {
625                        block_number: fetch_message_at_number,
626                        block_hash: fetch_message_at,
627                        channel_id,
628                        nonce: Some(max_nonce),
629                    },
630                )
631                .ok()?;
632            }
633
634            Some((dst_chain_id, channel_id, messages))
635        })
636        .collect();
637
638    Ok(total_messages)
639}
640
641fn get_channel_state_query<Backend, Client, Block>(
642    backend: &Backend,
643    client: &Arc<Client>,
644    fetch_message_at: Block::Hash,
645    self_chain_id: ChainId,
646    dst_chain_id: ChainId,
647    channel_id: ChannelId,
648    local_channel_state: ChannelStateWithNonce,
649) -> Result<Option<BlockMessagesQuery>, Error>
650where
651    Backend: AuxStore,
652    Block: BlockT,
653    Client: HeaderBackend<Block>,
654{
655    let maybe_dst_channel_state =
656        get_channel_state(backend, dst_chain_id, self_chain_id, channel_id)
657            .ok()
658            .flatten();
659
660    let last_processed_nonces = get_last_processed_nonces(
661        backend,
662        client,
663        fetch_message_at,
664        self_chain_id,
665        dst_chain_id,
666        channel_id,
667    )?;
668
669    let query = match (
670        maybe_dst_channel_state,
671        last_processed_nonces.outbox_nonce,
672        last_processed_nonces.inbox_response_nonce,
673    ) {
674        // don't have any info on channel, so assume from the beginning
675        (None, None, None) => Some(BlockMessagesQuery {
676            chain_id: dst_chain_id,
677            channel_id,
678            outbox_from: Nonce::zero(),
679            inbox_responses_from: Nonce::zero(),
680        }),
681        // don't have channel processed state, so use the dst_channel state for query
682        (Some(dst_channel_state), None, None) => {
683            should_relay_messages_to_channel(dst_chain_id, &dst_channel_state, local_channel_state)
684                .then_some(BlockMessagesQuery {
685                    chain_id: dst_chain_id,
686                    channel_id,
687                    outbox_from: dst_channel_state.next_inbox_nonce,
688                    inbox_responses_from: dst_channel_state
689                        .latest_response_received_message_nonce
690                        // pick the next inbox message response nonce or default to zero
691                        .map(|nonce| nonce.saturating_add(One::one()))
692                        .unwrap_or(Nonce::zero()),
693                })
694        }
695        // don't have dst channel state, so use the last processed channel state
696        (None, last_outbox_nonce, last_inbox_message_response_nonce) => Some(BlockMessagesQuery {
697            chain_id: dst_chain_id,
698            channel_id,
699            outbox_from: last_outbox_nonce
700                .map(|nonce| nonce.saturating_add(One::one()))
701                .unwrap_or(Nonce::zero()),
702            inbox_responses_from: last_inbox_message_response_nonce
703                .map(|nonce| nonce.saturating_add(One::one()))
704                .unwrap_or(Nonce::zero()),
705        }),
706        (Some(dst_channel_state), last_outbox_nonce, last_inbox_message_response_nonce) => {
707            should_relay_messages_to_channel(dst_chain_id, &dst_channel_state, local_channel_state)
708                .then(|| {
709                    let next_outbox_nonce = max(
710                        dst_channel_state.next_inbox_nonce,
711                        last_outbox_nonce
712                            .map(|nonce| nonce.saturating_add(One::one()))
713                            .unwrap_or(Nonce::zero()),
714                    );
715
716                    let next_inbox_response_nonce = max(
717                        dst_channel_state
718                            .latest_response_received_message_nonce
719                            .map(|nonce| nonce.saturating_add(One::one()))
720                            .unwrap_or(Nonce::zero()),
721                        last_inbox_message_response_nonce
722                            .map(|nonce| nonce.saturating_add(One::one()))
723                            .unwrap_or(Nonce::zero()),
724                    );
725
726                    BlockMessagesQuery {
727                        chain_id: dst_chain_id,
728                        channel_id,
729                        outbox_from: next_outbox_nonce,
730                        inbox_responses_from: next_inbox_response_nonce,
731                    }
732                })
733        }
734    };
735
736    tracing::debug!(
737        "From Chain[{:?}] to Chain[{:?}] and Channel[{:?}] Query: {:?}",
738        self_chain_id,
739        dst_chain_id,
740        channel_id,
741        query
742    );
743
744    Ok(query)
745}
746
747fn should_relay_messages_to_channel(
748    dst_chain_id: ChainId,
749    dst_channel_state: &ChannelDetail,
750    local_channel_state: ChannelStateWithNonce,
751) -> bool {
752    let should_process = if dst_channel_state.state == ChannelState::Closed
753        && let ChannelStateWithNonce::Closed {
754            next_outbox_nonce,
755            next_inbox_nonce,
756        } = local_channel_state
757    {
758        // if the next outbox nonce of local channel is same as
759        // next inbox nonce of dst_channel, then there are no further
760        // outbox messages to be sent from the local channel
761        let no_outbox_messages = next_outbox_nonce == dst_channel_state.next_inbox_nonce;
762
763        // if next inbox nonce of local channel is +1 of
764        // last received response nonce on dst_chain, then there are
765        // no further messages responses to be sent from the local channel
766        let no_inbox_responses_messages = dst_channel_state
767            .latest_response_received_message_nonce
768            .map(|nonce| nonce == next_inbox_nonce.saturating_sub(Nonce::one()))
769            .unwrap_or(false);
770
771        !(no_outbox_messages && no_inbox_responses_messages)
772    } else {
773        true
774    };
775
776    if !should_process {
777        tracing::debug!(
778            "Chain[{:?}] for Channel[{:?}] is closed and no messages to process",
779            dst_chain_id,
780            dst_channel_state.channel_id
781        );
782    }
783
784    should_process
785}
786
787fn is_relayer_api_version_available<Client, Block, CBlock>(
788    client: &Arc<Client>,
789    version: u32,
790    block_hash: Block::Hash,
791) -> bool
792where
793    Block: BlockT,
794    CBlock: BlockT,
795    Client: ProvideRuntimeApi<Block>,
796    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
797{
798    let relayer_api_version = client
799        .runtime_api()
800        .api_version::<dyn RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>>(
801            block_hash,
802        )
803        .ok()
804        .flatten()
805        // It is safe to return a default version of 1, since there will always be version 1.
806        .unwrap_or(1);
807
808    relayer_api_version >= version
809}