cross_domain_message_gossip/
message_listener.rs

1use crate::aux_schema::{
2    cleanup_chain_channel_storages, get_channel_state, get_xdm_processed_block_number,
3    set_channel_state, set_xdm_message_processed_at, BlockId,
4};
5use crate::gossip_worker::{ChannelUpdate, MessageData};
6use crate::{ChainMsg, ChannelDetail};
7use domain_block_preprocessor::stateless_runtime::StatelessRuntime;
8use fp_account::AccountId20;
9use futures::{Stream, StreamExt};
10use sc_client_api::AuxStore;
11use sc_executor::RuntimeVersionOf;
12use sc_network::NetworkPeers;
13use sc_transaction_pool_api::error::{Error as PoolError, IntoPoolError};
14use sc_transaction_pool_api::{TransactionPool, TransactionSource};
15use sp_api::{ApiError, ApiExt, ProvideRuntimeApi, StorageProof};
16use sp_blockchain::HeaderBackend;
17use sp_consensus::SyncOracle;
18use sp_core::crypto::AccountId32;
19use sp_core::storage::StorageKey;
20use sp_core::traits::CodeExecutor;
21use sp_core::{Hasher, H256};
22use sp_domains::proof_provider_and_verifier::{StorageProofVerifier, VerificationError};
23use sp_domains::{DomainId, DomainsApi, RuntimeType};
24use sp_messenger::messages::{ChainId, Channel, ChannelId};
25use sp_messenger::{ChannelNonce, MessengerApi, RelayerApi, XdmId};
26use sp_runtime::codec::Decode;
27use sp_runtime::traits::{BlakeTwo256, Block as BlockT, HashingFor, Header, NumberFor};
28use sp_runtime::{SaturatedConversion, Saturating};
29use std::collections::BTreeMap;
30use std::sync::Arc;
31use subspace_runtime_primitives::{Balance, BlockNumber};
32use thiserror::Error;
33
34pub(crate) const LOG_TARGET: &str = "domain_message_listener";
35const TX_POOL_PREFIX: &[u8] = b"xdm_tx_pool_listener";
36pub const RELAYER_PREFIX: &[u8] = b"xdm_relayer";
37
38/// Number of blocks an already submitted XDM is not accepted since last submission.
39const XDM_ACCEPT_BLOCK_LIMIT: u32 = 15;
40
41type BlockOf<T> = <T as TransactionPool>::Block;
42type HeaderOf<T> = <<T as TransactionPool>::Block as BlockT>::Header;
43type ExtrinsicOf<T> = <<T as TransactionPool>::Block as BlockT>::Extrinsic;
44
45#[derive(Debug, Error)]
46pub enum Error {
47    /// Blockchain related error.
48    #[error("Blockchain error: {0}")]
49    Blockchain(Box<sp_blockchain::Error>),
50    /// Api related error.
51    #[error("Api error: {0}")]
52    Api(sp_api::ApiError),
53    /// Missing block hash
54    #[error("Missing block hash")]
55    MissingBlockHash,
56    /// Missing block header
57    #[error("Missing block header")]
58    MissingBlockHeader,
59    /// Missing domain runtime code
60    #[error("Missing domain runtime code")]
61    MissingDomainRuntimeCode,
62    /// Missing domain receipt hash
63    #[error("Missing domain receipt hash")]
64    MissingDomainReceiptHash,
65    /// Bad domain receipt hash
66    #[error("Bad domain receipt hash")]
67    BadDomainReceiptHash,
68    /// Missing domain receipt
69    #[error("Missing domain receipt")]
70    MissingDomainReceipt,
71    /// Proof verification error
72    #[error("Proof error: {0}")]
73    Proof(VerificationError),
74}
75
76impl From<sp_api::ApiError> for Error {
77    fn from(value: ApiError) -> Self {
78        Error::Api(value)
79    }
80}
81
82impl From<sp_blockchain::Error> for Error {
83    fn from(value: sp_blockchain::Error) -> Self {
84        Error::Blockchain(Box::new(value))
85    }
86}
87
88impl From<VerificationError> for Error {
89    fn from(value: VerificationError) -> Self {
90        Error::Proof(value)
91    }
92}
93
94#[allow(clippy::too_many_arguments)]
95pub async fn start_cross_chain_message_listener<
96    Client,
97    TxPool,
98    TxnListener,
99    CClient,
100    CBlock,
101    Executor,
102    SO,
103>(
104    chain_id: ChainId,
105    consensus_client: Arc<CClient>,
106    client: Arc<Client>,
107    tx_pool: Arc<TxPool>,
108    network: Arc<dyn NetworkPeers + Send + Sync>,
109    mut listener: TxnListener,
110    domain_executor: Arc<Executor>,
111    sync_oracle: SO,
112) where
113    TxPool: TransactionPool + 'static,
114    Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>> + AuxStore,
115    CBlock: BlockT,
116    Client::Api: MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>,
117    CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
118    CClient::Api: DomainsApi<CBlock, HeaderOf<TxPool>>
119        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
120    TxnListener: Stream<Item = ChainMsg> + Unpin,
121    Executor: CodeExecutor + RuntimeVersionOf,
122    SO: SyncOracle + Send,
123{
124    tracing::info!(
125        target: LOG_TARGET,
126        "Starting transaction listener for Chain: {:?}",
127        chain_id
128    );
129
130    let mut domain_storage_key_cache = BTreeMap::<(H256, ChainId, ChannelId), StorageKey>::new();
131
132    while let Some(msg) = listener.next().await {
133        // If the client is in major sync, wait until sync is complete
134        if sync_oracle.is_major_syncing() {
135            continue;
136        }
137
138        tracing::debug!(
139            target: LOG_TARGET,
140            "Message received for Chain: {:?}",
141            chain_id,
142        );
143
144        match msg.data {
145            MessageData::Xdm(encoded_data) => {
146                let ext = match ExtrinsicOf::<TxPool>::decode(&mut encoded_data.as_ref()) {
147                    Ok(ext) => ext,
148                    Err(err) => {
149                        tracing::error!(
150                            target: LOG_TARGET,
151                            "Failed to decode message: {:?} with error: {:?}",
152                            encoded_data,
153                            err
154                        );
155                        if let Some(peer_id) = msg.maybe_peer {
156                            network.report_peer(
157                                peer_id,
158                                crate::gossip_worker::rep::GOSSIP_NOT_DECODABLE,
159                            );
160                        }
161                        continue;
162                    }
163                };
164
165                if let Ok(valid) =
166                    handle_xdm_message::<_, _, CBlock>(&client, &tx_pool, chain_id, ext).await
167                    && !valid
168                {
169                    if let Some(peer_id) = msg.maybe_peer {
170                        network.report_peer(peer_id, crate::gossip_worker::rep::NOT_XDM);
171                    }
172                    continue;
173                }
174            }
175            MessageData::ChannelUpdate(channel_update) => {
176                handle_channel_update::<_, _, _, BlockOf<TxPool>>(
177                    chain_id,
178                    channel_update,
179                    &consensus_client,
180                    domain_executor.clone(),
181                    &mut domain_storage_key_cache,
182                )
183            }
184        }
185    }
186}
187
188fn handle_channel_update<CClient, CBlock, Executor, Block>(
189    chain_id: ChainId,
190    channel_update: ChannelUpdate,
191    consensus_client: &Arc<CClient>,
192    executor: Arc<Executor>,
193    domain_storage_key_cache: &mut BTreeMap<(H256, ChainId, ChannelId), StorageKey>,
194) where
195    CBlock: BlockT,
196    Block: BlockT,
197    CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
198    CClient::Api: DomainsApi<CBlock, Block::Header>
199        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
200    Executor: CodeExecutor + RuntimeVersionOf,
201{
202    let ChannelUpdate {
203        src_chain_id,
204        channel_id,
205        block_number,
206        storage_proof,
207    } = channel_update;
208
209    match src_chain_id {
210        ChainId::Consensus => {
211            if let Err(err) = handle_consensus_channel_update(
212                chain_id,
213                channel_id,
214                consensus_client,
215                block_number,
216                storage_proof,
217            ) {
218                tracing::debug!(
219                    target: LOG_TARGET,
220                    "Failed to update channel update from {:?} to {:?}: {:?}",
221                    ChainId::Consensus,
222                    chain_id,
223                    err
224                );
225            } else {
226                tracing::debug!(
227                    target: LOG_TARGET,
228                    "Updated channel state from {:?} to {:?}: {:?}",
229                    ChainId::Consensus,
230                    chain_id,
231                    channel_id
232                );
233            }
234        }
235        ChainId::Domain(domain_id) => {
236            if let Err(err) = handle_domain_channel_update::<_, _, _, Block>(
237                domain_id,
238                chain_id,
239                channel_id,
240                consensus_client,
241                block_number,
242                storage_proof,
243                executor,
244                domain_storage_key_cache,
245            ) {
246                tracing::debug!(
247                    target: LOG_TARGET,
248                    "Failed to update channel update from {:?} to {:?}: {:?}",
249                    ChainId::Domain(domain_id),
250                    chain_id,
251                    err
252                );
253            } else {
254                tracing::debug!(
255                    target: LOG_TARGET,
256                    "Updated channel state from {:?} to {:?}: {:?}",
257                    ChainId::Domain(domain_id),
258                    chain_id,
259                    channel_id
260                );
261            }
262        }
263    };
264}
265
266fn handle_consensus_channel_update<CClient, CBlock>(
267    self_chain_id: ChainId,
268    channel_id: ChannelId,
269    consensus_client: &Arc<CClient>,
270    consensus_block_number: BlockNumber,
271    proof: StorageProof,
272) -> Result<(), Error>
273where
274    CBlock: BlockT,
275    CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
276    CClient::Api: RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
277{
278    // check if the consensus block number is canonical
279    let consensus_block_hash = consensus_client
280        .hash(consensus_block_number.into())?
281        .ok_or(Error::MissingBlockHash)?;
282
283    let maybe_existing_channel_detail = get_channel_state(
284        &**consensus_client,
285        ChainId::Consensus,
286        self_chain_id,
287        channel_id,
288    )?;
289
290    let api = consensus_client.runtime_api();
291    let best_hash = consensus_client.info().best_hash;
292    let header = consensus_client.expect_header(consensus_block_hash)?;
293
294    // if there is an existing channel detail,
295    // return if the channel update is from canonical chain and block number is latest
296    // else store the update.
297    if let Some(existing_channel_update) = maybe_existing_channel_detail {
298        let maybe_block_hash =
299            consensus_client.hash(existing_channel_update.block_number.into())?;
300        if let Some(block_hash) = maybe_block_hash {
301            if block_hash.as_ref() == existing_channel_update.block_hash.as_ref()
302                && header.state_root().as_ref() == existing_channel_update.state_root.as_ref()
303                && existing_channel_update.block_number >= consensus_block_number
304            {
305                return Ok(());
306            }
307        }
308    }
309
310    let storage_key = StorageKey(api.channel_storage_key(best_hash, self_chain_id, channel_id)?);
311    let channel = StorageProofVerifier::<HashingFor<CBlock>>::get_decoded_value::<
312        Channel<Balance, AccountId32>,
313    >(header.state_root(), proof, storage_key)?;
314
315    let channel_detail = ChannelDetail {
316        block_number: consensus_block_number,
317        block_hash: H256::from_slice(consensus_block_hash.as_ref()),
318        state_root: H256::from_slice(header.state_root().as_ref()),
319        channel_id,
320        state: channel.state,
321        next_inbox_nonce: channel.next_inbox_nonce,
322        next_outbox_nonce: channel.next_outbox_nonce,
323        latest_response_received_message_nonce: channel.latest_response_received_message_nonce,
324    };
325
326    set_channel_state(
327        &**consensus_client,
328        ChainId::Consensus,
329        self_chain_id,
330        channel_detail,
331    )?;
332    Ok(())
333}
334
335#[allow(clippy::too_many_arguments)]
336fn handle_domain_channel_update<CClient, CBlock, Executor, Block>(
337    src_domain_id: DomainId,
338    self_chain_id: ChainId,
339    channel_id: ChannelId,
340    consensus_client: &Arc<CClient>,
341    domain_block_number: BlockNumber,
342    proof: StorageProof,
343    executor: Arc<Executor>,
344    storage_key_cache: &mut BTreeMap<(H256, ChainId, ChannelId), StorageKey>,
345) -> Result<(), Error>
346where
347    CBlock: BlockT,
348    Block: BlockT,
349    CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
350    CClient::Api: DomainsApi<CBlock, Block::Header>,
351    Executor: CodeExecutor + RuntimeVersionOf,
352{
353    let runtime_api = consensus_client.runtime_api();
354    let consensus_best_hash = consensus_client.info().best_hash;
355    let consensus_block_header = consensus_client
356        .header(consensus_best_hash)?
357        .ok_or(Error::MissingBlockHeader)?;
358
359    let domain_runtime_type = runtime_api
360        .domain_instance_data(*consensus_block_header.parent_hash(), src_domain_id)?
361        .ok_or(Error::MissingDomainRuntimeCode)?
362        .0
363        .runtime_type;
364
365    let is_valid_domain_block_number =
366        |block_number: BlockNumber| -> Result<(Block::Hash, Block::Hash), Error> {
367            let runtime_api = consensus_client.runtime_api();
368            let receipt_hash = runtime_api
369                .receipt_hash(consensus_best_hash, src_domain_id, block_number.into())?
370                .ok_or(Error::MissingDomainReceiptHash)?;
371
372            // check if the receipt is challenged by fraud proof
373            if runtime_api.is_bad_er_pending_to_prune(
374                consensus_best_hash,
375                src_domain_id,
376                receipt_hash,
377            )? {
378                return Err(Error::BadDomainReceiptHash);
379            }
380
381            let receipt = runtime_api
382                .execution_receipt(consensus_best_hash, receipt_hash)?
383                .ok_or(Error::MissingDomainReceipt)?;
384
385            Ok((receipt.domain_block_hash, receipt.final_state_root))
386        };
387
388    // check if the domain block number is valid
389    let (domain_block_hash, domain_state_root) = is_valid_domain_block_number(domain_block_number)?;
390
391    // if there is an existing channel detail,
392    // return if the channel update is from canonical domain and block number is latest
393    // else store the update.
394    let maybe_existing_channel_detail = get_channel_state(
395        &**consensus_client,
396        ChainId::Domain(src_domain_id),
397        self_chain_id,
398        channel_id,
399    )?;
400
401    if let Some(existing_channel_update) = maybe_existing_channel_detail {
402        // if the existing update domain block number
403        // is valid, and
404        // more the new block number, then don't update
405        if let Ok((existing_block_hash, _)) =
406            is_valid_domain_block_number(existing_channel_update.block_number)
407        {
408            if existing_block_hash.as_ref() == existing_channel_update.block_hash.as_ref()
409                && domain_state_root.as_ref() == existing_channel_update.state_root.as_ref()
410                && existing_channel_update.block_number >= domain_block_number
411            {
412                return Ok(());
413            }
414        }
415    }
416
417    let domain_runtime = runtime_api
418        .domain_runtime_code(*consensus_block_header.parent_hash(), src_domain_id)?
419        .ok_or(Error::MissingDomainRuntimeCode)?;
420
421    let runtime_hash = BlakeTwo256::hash(&domain_runtime);
422    let storage_key = match storage_key_cache.get(&(runtime_hash, self_chain_id, channel_id)) {
423        None => {
424            let domain_stateless_runtime =
425                StatelessRuntime::<CBlock, Block, _>::new(executor.clone(), domain_runtime.into());
426            let storage_key = StorageKey(
427                domain_stateless_runtime.channel_storage_key(self_chain_id, channel_id)?,
428            );
429            storage_key_cache.insert(
430                (runtime_hash, self_chain_id, channel_id),
431                storage_key.clone(),
432            );
433            storage_key
434        }
435        Some(key) => key.clone(),
436    };
437
438    let channel_detail = match domain_runtime_type {
439        RuntimeType::Evm => {
440            let channel = StorageProofVerifier::<HashingFor<Block>>::get_decoded_value::<
441                Channel<Balance, AccountId20>,
442            >(&domain_state_root, proof, storage_key)?;
443            ChannelDetail {
444                block_number: domain_block_number,
445                block_hash: H256::from_slice(domain_block_hash.as_ref()),
446                state_root: H256::from_slice(domain_state_root.as_ref()),
447                channel_id,
448                state: channel.state,
449                next_inbox_nonce: channel.next_inbox_nonce,
450                next_outbox_nonce: channel.next_outbox_nonce,
451                latest_response_received_message_nonce: channel
452                    .latest_response_received_message_nonce,
453            }
454        }
455        RuntimeType::AutoId => {
456            let channel = StorageProofVerifier::<HashingFor<Block>>::get_decoded_value::<
457                Channel<Balance, AccountId32>,
458            >(&domain_state_root, proof, storage_key)?;
459            ChannelDetail {
460                block_number: domain_block_number,
461                block_hash: H256::from_slice(domain_block_hash.as_ref()),
462                state_root: H256::from_slice(domain_state_root.as_ref()),
463                channel_id,
464                state: channel.state,
465                next_inbox_nonce: channel.next_inbox_nonce,
466                next_outbox_nonce: channel.next_outbox_nonce,
467                latest_response_received_message_nonce: channel
468                    .latest_response_received_message_nonce,
469            }
470        }
471    };
472
473    set_channel_state(
474        &**consensus_client,
475        ChainId::Domain(src_domain_id),
476        self_chain_id,
477        channel_detail,
478    )?;
479    Ok(())
480}
481
482pub fn can_allow_xdm_submission<Client, Block>(
483    client: &Arc<Client>,
484    xdm_id: XdmId,
485    maybe_submitted_block_id: Option<BlockId<Block>>,
486    current_block_id: BlockId<Block>,
487    maybe_channel_nonce: Option<ChannelNonce>,
488) -> bool
489where
490    Client: HeaderBackend<Block>,
491    Block: BlockT,
492{
493    if let Some(channel_nonce) = maybe_channel_nonce {
494        let maybe_nonces = match (
495            xdm_id,
496            channel_nonce.relay_msg_nonce,
497            channel_nonce.relay_response_msg_nonce,
498        ) {
499            (XdmId::RelayMessage((_, _, nonce)), Some(channel_nonce), _) => {
500                Some((nonce, channel_nonce))
501            }
502            (XdmId::RelayResponseMessage((_, _, nonce)), _, Some(channel_nonce)) => {
503                Some((nonce, channel_nonce))
504            }
505            _ => None,
506        };
507
508        if let Some((xdm_nonce, channel_nonce)) = maybe_nonces
509            && (xdm_nonce <= channel_nonce)
510        {
511            tracing::debug!(
512                target: LOG_TARGET,
513                "Stale XDM submitted: XDM Nonce: {:?}, Channel Nonce: {:?}",
514                xdm_nonce,
515                channel_nonce
516            );
517            return false;
518        }
519    }
520
521    match maybe_submitted_block_id {
522        None => true,
523        Some(submitted_block_id) => {
524            match client.hash(submitted_block_id.number).ok().flatten() {
525                // there is no block at this number, allow xdm submission
526                None => return true,
527                Some(hash) => {
528                    if hash != submitted_block_id.hash {
529                        // client re-org'ed, allow xdm submission
530                        return true;
531                    }
532                }
533            }
534
535            let latest_block_number = current_block_id.number;
536            let block_limit: NumberFor<Block> = XDM_ACCEPT_BLOCK_LIMIT.saturated_into();
537            submitted_block_id.number < latest_block_number.saturating_sub(block_limit)
538        }
539    }
540}
541
542async fn handle_xdm_message<TxPool, Client, CBlock>(
543    client: &Arc<Client>,
544    tx_pool: &Arc<TxPool>,
545    chain_id: ChainId,
546    ext: ExtrinsicOf<TxPool>,
547) -> Result<bool, Error>
548where
549    TxPool: TransactionPool + 'static,
550    CBlock: BlockT,
551    Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>> + AuxStore,
552    Client::Api: MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>,
553{
554    let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
555    let runtime_api = client.runtime_api();
556    let api_version = runtime_api
557        .api_version::<dyn MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>>(
558            block_id.hash,
559        )?
560        .unwrap_or(1);
561
562    let api_available = api_version >= 2;
563    if api_available {
564        let xdm_id = match runtime_api.xdm_id(block_id.hash, &ext)? {
565            // not a valid xdm, so return as invalid
566            None => return Ok(false),
567            Some(xdm_id) => xdm_id,
568        };
569
570        let (src_chain_id, channel_id) = xdm_id.get_chain_id_and_channel_id();
571        let maybe_channel_nonce =
572            runtime_api.channel_nonce(block_id.hash, src_chain_id, channel_id)?;
573
574        let maybe_submitted_xdm_block = get_xdm_processed_block_number::<_, BlockOf<TxPool>>(
575            &**client,
576            TX_POOL_PREFIX,
577            xdm_id,
578        )?;
579        if !can_allow_xdm_submission(
580            client,
581            xdm_id,
582            maybe_submitted_xdm_block,
583            block_id.clone(),
584            maybe_channel_nonce,
585        ) {
586            tracing::debug!(
587                target: LOG_TARGET,
588                "Skipping XDM[{:?}] submission. At: {:?}",
589                xdm_id,
590                block_id
591            );
592            return Ok(true);
593        }
594
595        tracing::debug!(
596            target: LOG_TARGET,
597            "Submitting XDM[{:?}] to tx pool for chain {:?} at block: {:?}",
598            xdm_id,
599            chain_id,
600            block_id
601        );
602
603        let tx_pool_res = tx_pool
604            .submit_one(block_id.hash, TransactionSource::External, ext)
605            .await;
606
607        let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
608        if let Err(err) = tx_pool_res {
609            match err.into_pool_error() {
610                Ok(err) => match err {
611                    PoolError::TooLowPriority { .. }
612                    | PoolError::AlreadyImported(..)
613                    | PoolError::TemporarilyBanned => {
614                        tracing::debug!(
615                            target: LOG_TARGET,
616                            "XDM[{:?}] to tx pool for Chain {:?} at block: {:?}: Already included",
617                            xdm_id,
618                            chain_id,
619                            block_id
620                        );
621                        set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
622                    }
623                    _ => {
624                        tracing::error!(
625                            target: LOG_TARGET,
626                            "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}",
627                            xdm_id,
628                            chain_id,
629                            err,
630                            block_id
631                        );
632                    }
633                },
634                Err(err) => {
635                    tracing::error!(
636                        target: LOG_TARGET,
637                        "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}",
638                        xdm_id,
639                        chain_id,
640                        err,
641                        block_id
642                    );
643                }
644            }
645        } else {
646            tracing::debug!(
647                target: LOG_TARGET,
648                "Submitted XDM[{:?}] to tx pool for chain {:?} at {:?}",
649                xdm_id,
650                chain_id,
651                block_id
652            );
653
654            set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
655        }
656
657        if let Some(channel_nonce) = maybe_channel_nonce {
658            cleanup_chain_channel_storages(
659                &**client,
660                TX_POOL_PREFIX,
661                src_chain_id,
662                channel_id,
663                channel_nonce,
664            )?;
665        }
666
667        Ok(true)
668    } else {
669        let tx_pool_res = tx_pool
670            .submit_one(block_id.hash, TransactionSource::External, ext)
671            .await;
672
673        let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
674        if let Err(err) = tx_pool_res {
675            tracing::error!(
676                target: LOG_TARGET,
677                "Failed to submit XDM to tx pool for Chain {:?} with error: {:?} at block: {:?}",
678                chain_id,
679                err,
680                block_id
681            );
682        } else {
683            tracing::debug!(
684                target: LOG_TARGET,
685                "Submitted XDM to tx pool for chain {:?} at {:?}",
686                chain_id,
687                block_id
688            );
689        }
690
691        Ok(true)
692    }
693}