cross_domain_message_gossip/
message_listener.rs

1use crate::aux_schema::{
2    BlockId, cleanup_chain_channel_storages, get_channel_state, get_xdm_processed_block_number,
3    set_channel_state, set_xdm_message_processed_at,
4};
5use crate::gossip_worker::{ChannelUpdate, MessageData};
6use crate::{ChainMsg, ChannelDetail};
7use domain_block_preprocessor::stateless_runtime::StatelessRuntime;
8use fp_account::AccountId20;
9use futures::{Stream, StreamExt};
10use sc_client_api::AuxStore;
11use sc_executor::RuntimeVersionOf;
12use sc_network::NetworkPeers;
13use sc_transaction_pool_api::error::{Error as PoolError, IntoPoolError};
14use sc_transaction_pool_api::{TransactionPool, TransactionSource};
15use sp_api::{ApiError, ApiExt, ProvideRuntimeApi, StorageProof};
16use sp_blockchain::HeaderBackend;
17use sp_consensus::SyncOracle;
18use sp_core::crypto::AccountId32;
19use sp_core::storage::StorageKey;
20use sp_core::traits::CodeExecutor;
21use sp_core::{H256, Hasher};
22use sp_domains::proof_provider_and_verifier::{StorageProofVerifier, VerificationError};
23use sp_domains::{DomainId, DomainsApi, RuntimeType};
24use sp_messenger::messages::{ChainId, Channel, ChannelId};
25use sp_messenger::{ChannelNonce, MessengerApi, RelayerApi, XdmId};
26use sp_runtime::codec::Decode;
27use sp_runtime::traits::{BlakeTwo256, Block as BlockT, HashingFor, Header, NumberFor};
28use sp_runtime::{SaturatedConversion, Saturating};
29use std::collections::BTreeMap;
30use std::sync::Arc;
31use subspace_runtime_primitives::{Balance, BlockNumber};
32use thiserror::Error;
33
34const TX_POOL_PREFIX: &[u8] = b"xdm_tx_pool_listener";
35
36/// Number of blocks an already submitted XDM is not accepted since last submission.
37const XDM_ACCEPT_BLOCK_LIMIT: u32 = 15;
38
39type BlockOf<T> = <T as TransactionPool>::Block;
40type HeaderOf<T> = <<T as TransactionPool>::Block as BlockT>::Header;
41type ExtrinsicOf<T> = <<T as TransactionPool>::Block as BlockT>::Extrinsic;
42
43#[derive(Debug, Error)]
44pub enum Error {
45    /// Blockchain related error.
46    #[error("Blockchain error: {0}")]
47    Blockchain(Box<sp_blockchain::Error>),
48    /// Api related error.
49    #[error("Api error: {0}")]
50    Api(sp_api::ApiError),
51    /// Missing block hash
52    #[error("Missing block hash")]
53    MissingBlockHash,
54    /// Missing block header
55    #[error("Missing block header")]
56    MissingBlockHeader,
57    /// Missing domain runtime code
58    #[error("Missing domain runtime code")]
59    MissingDomainRuntimeCode,
60    /// Missing domain receipt hash
61    #[error("Missing domain receipt hash")]
62    MissingDomainReceiptHash,
63    /// Bad domain receipt hash
64    #[error("Bad domain receipt hash")]
65    BadDomainReceiptHash,
66    /// Missing domain receipt
67    #[error("Missing domain receipt")]
68    MissingDomainReceipt,
69    /// Proof verification error
70    #[error("Proof error: {0}")]
71    Proof(VerificationError),
72}
73
74impl From<sp_api::ApiError> for Error {
75    fn from(value: ApiError) -> Self {
76        Error::Api(value)
77    }
78}
79
80impl From<sp_blockchain::Error> for Error {
81    fn from(value: sp_blockchain::Error) -> Self {
82        Error::Blockchain(Box::new(value))
83    }
84}
85
86impl From<VerificationError> for Error {
87    fn from(value: VerificationError) -> Self {
88        Error::Proof(value)
89    }
90}
91
92#[allow(clippy::too_many_arguments)]
93pub async fn start_cross_chain_message_listener<
94    Client,
95    TxPool,
96    TxnListener,
97    CClient,
98    CBlock,
99    Executor,
100    SO,
101>(
102    chain_id: ChainId,
103    consensus_client: Arc<CClient>,
104    client: Arc<Client>,
105    tx_pool: Arc<TxPool>,
106    network: Arc<dyn NetworkPeers + Send + Sync>,
107    mut listener: TxnListener,
108    domain_executor: Arc<Executor>,
109    sync_oracle: SO,
110) where
111    TxPool: TransactionPool + 'static,
112    Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>> + AuxStore,
113    CBlock: BlockT,
114    Client::Api: MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>,
115    CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
116    CClient::Api: DomainsApi<CBlock, HeaderOf<TxPool>>
117        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
118    TxnListener: Stream<Item = ChainMsg> + Unpin,
119    Executor: CodeExecutor + RuntimeVersionOf,
120    SO: SyncOracle + Send,
121{
122    tracing::info!("Starting transaction listener for Chain: {:?}", chain_id);
123
124    let mut domain_storage_key_cache = BTreeMap::<(H256, ChainId, ChannelId), StorageKey>::new();
125
126    while let Some(msg) = listener.next().await {
127        // If the client is in major sync, wait until sync is complete
128        if sync_oracle.is_major_syncing() {
129            continue;
130        }
131
132        tracing::debug!("Message received for Chain: {:?}", chain_id,);
133
134        match msg.data {
135            MessageData::Xdm(encoded_data) => {
136                let ext = match ExtrinsicOf::<TxPool>::decode(&mut encoded_data.as_ref()) {
137                    Ok(ext) => ext,
138                    Err(err) => {
139                        tracing::error!(
140                            "Failed to decode message: {:?} with error: {:?}",
141                            encoded_data,
142                            err
143                        );
144                        if let Some(peer_id) = msg.maybe_peer {
145                            network.report_peer(
146                                peer_id,
147                                crate::gossip_worker::rep::GOSSIP_NOT_DECODABLE,
148                            );
149                        }
150                        continue;
151                    }
152                };
153
154                if let Ok(valid) =
155                    handle_xdm_message::<_, _, CBlock>(&client, &tx_pool, chain_id, ext).await
156                    && !valid
157                {
158                    if let Some(peer_id) = msg.maybe_peer {
159                        network.report_peer(peer_id, crate::gossip_worker::rep::NOT_XDM);
160                    }
161                    continue;
162                }
163            }
164            MessageData::ChannelUpdate(channel_update) => {
165                handle_channel_update::<_, _, _, BlockOf<TxPool>>(
166                    chain_id,
167                    channel_update,
168                    &consensus_client,
169                    domain_executor.clone(),
170                    &mut domain_storage_key_cache,
171                )
172            }
173        }
174    }
175}
176
177fn handle_channel_update<CClient, CBlock, Executor, Block>(
178    chain_id: ChainId,
179    channel_update: ChannelUpdate,
180    consensus_client: &Arc<CClient>,
181    executor: Arc<Executor>,
182    domain_storage_key_cache: &mut BTreeMap<(H256, ChainId, ChannelId), StorageKey>,
183) where
184    CBlock: BlockT,
185    Block: BlockT,
186    CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
187    CClient::Api: DomainsApi<CBlock, Block::Header>
188        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
189    Executor: CodeExecutor + RuntimeVersionOf,
190{
191    let ChannelUpdate {
192        src_chain_id,
193        channel_id,
194        block_number,
195        storage_proof,
196    } = channel_update;
197
198    match src_chain_id {
199        ChainId::Consensus => {
200            if let Err(err) = handle_consensus_channel_update(
201                chain_id,
202                channel_id,
203                consensus_client,
204                block_number,
205                storage_proof,
206            ) {
207                tracing::debug!(
208                    "Failed to update channel update from {:?} to {:?}: {:?}",
209                    ChainId::Consensus,
210                    chain_id,
211                    err
212                );
213            } else {
214                tracing::debug!(
215                    "Updated channel state from {:?} to {:?}: {:?}",
216                    ChainId::Consensus,
217                    chain_id,
218                    channel_id
219                );
220            }
221        }
222        ChainId::Domain(domain_id) => {
223            if let Err(err) = handle_domain_channel_update::<_, _, _, Block>(
224                domain_id,
225                chain_id,
226                channel_id,
227                consensus_client,
228                block_number,
229                storage_proof,
230                executor,
231                domain_storage_key_cache,
232            ) {
233                tracing::debug!(
234                    "Failed to update channel update from {:?} to {:?}: {:?}",
235                    ChainId::Domain(domain_id),
236                    chain_id,
237                    err
238                );
239            } else {
240                tracing::debug!(
241                    "Updated channel state from {:?} to {:?}: {:?}",
242                    ChainId::Domain(domain_id),
243                    chain_id,
244                    channel_id
245                );
246            }
247        }
248    };
249}
250
251fn handle_consensus_channel_update<CClient, CBlock>(
252    self_chain_id: ChainId,
253    channel_id: ChannelId,
254    consensus_client: &Arc<CClient>,
255    consensus_block_number: BlockNumber,
256    proof: StorageProof,
257) -> Result<(), Error>
258where
259    CBlock: BlockT,
260    CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
261    CClient::Api: RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
262{
263    // check if the consensus block number is canonical
264    let consensus_block_hash = consensus_client
265        .hash(consensus_block_number.into())?
266        .ok_or(Error::MissingBlockHash)?;
267
268    let maybe_existing_channel_detail = get_channel_state(
269        &**consensus_client,
270        ChainId::Consensus,
271        self_chain_id,
272        channel_id,
273    )?;
274
275    let api = consensus_client.runtime_api();
276    let best_hash = consensus_client.info().best_hash;
277    let header = consensus_client.expect_header(consensus_block_hash)?;
278
279    // if there is an existing channel detail,
280    // return if the channel update is from canonical chain and block number is latest
281    // else store the update.
282    if let Some(existing_channel_update) = maybe_existing_channel_detail {
283        let maybe_block_hash =
284            consensus_client.hash(existing_channel_update.block_number.into())?;
285        if let Some(block_hash) = maybe_block_hash
286            && block_hash.as_ref() == existing_channel_update.block_hash.as_ref()
287            && header.state_root().as_ref() == existing_channel_update.state_root.as_ref()
288            && existing_channel_update.block_number >= consensus_block_number
289        {
290            return Ok(());
291        }
292    }
293
294    let storage_key = StorageKey(api.channel_storage_key(best_hash, self_chain_id, channel_id)?);
295    let channel = StorageProofVerifier::<HashingFor<CBlock>>::get_decoded_value::<
296        Channel<Balance, AccountId32>,
297    >(header.state_root(), proof, storage_key)?;
298
299    let channel_detail = ChannelDetail {
300        block_number: consensus_block_number,
301        block_hash: H256::from_slice(consensus_block_hash.as_ref()),
302        state_root: H256::from_slice(header.state_root().as_ref()),
303        channel_id,
304        state: channel.state,
305        next_inbox_nonce: channel.next_inbox_nonce,
306        next_outbox_nonce: channel.next_outbox_nonce,
307        latest_response_received_message_nonce: channel.latest_response_received_message_nonce,
308    };
309
310    set_channel_state(
311        &**consensus_client,
312        ChainId::Consensus,
313        self_chain_id,
314        channel_detail,
315    )?;
316    Ok(())
317}
318
319#[allow(clippy::too_many_arguments)]
320fn handle_domain_channel_update<CClient, CBlock, Executor, Block>(
321    src_domain_id: DomainId,
322    self_chain_id: ChainId,
323    channel_id: ChannelId,
324    consensus_client: &Arc<CClient>,
325    domain_block_number: BlockNumber,
326    proof: StorageProof,
327    executor: Arc<Executor>,
328    storage_key_cache: &mut BTreeMap<(H256, ChainId, ChannelId), StorageKey>,
329) -> Result<(), Error>
330where
331    CBlock: BlockT,
332    Block: BlockT,
333    CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
334    CClient::Api: DomainsApi<CBlock, Block::Header>,
335    Executor: CodeExecutor + RuntimeVersionOf,
336{
337    let runtime_api = consensus_client.runtime_api();
338    let consensus_best_hash = consensus_client.info().best_hash;
339    let consensus_block_header = consensus_client
340        .header(consensus_best_hash)?
341        .ok_or(Error::MissingBlockHeader)?;
342
343    let domain_runtime_type = runtime_api
344        .domain_instance_data(*consensus_block_header.parent_hash(), src_domain_id)?
345        .ok_or(Error::MissingDomainRuntimeCode)?
346        .0
347        .runtime_type;
348
349    let is_valid_domain_block_number =
350        |block_number: BlockNumber| -> Result<(Block::Hash, Block::Hash), Error> {
351            let runtime_api = consensus_client.runtime_api();
352            let receipt_hash = runtime_api
353                .receipt_hash(consensus_best_hash, src_domain_id, block_number.into())?
354                .ok_or(Error::MissingDomainReceiptHash)?;
355
356            // check if the receipt is challenged by fraud proof
357            if runtime_api.is_bad_er_pending_to_prune(
358                consensus_best_hash,
359                src_domain_id,
360                receipt_hash,
361            )? {
362                return Err(Error::BadDomainReceiptHash);
363            }
364
365            let receipt = runtime_api
366                .execution_receipt(consensus_best_hash, receipt_hash)?
367                .ok_or(Error::MissingDomainReceipt)?;
368
369            Ok((receipt.domain_block_hash, receipt.final_state_root))
370        };
371
372    // check if the domain block number is valid
373    let (domain_block_hash, domain_state_root) = is_valid_domain_block_number(domain_block_number)?;
374
375    // if there is an existing channel detail,
376    // return if the channel update is from canonical domain and block number is latest
377    // else store the update.
378    let maybe_existing_channel_detail = get_channel_state(
379        &**consensus_client,
380        ChainId::Domain(src_domain_id),
381        self_chain_id,
382        channel_id,
383    )?;
384
385    if let Some(existing_channel_update) = maybe_existing_channel_detail {
386        // if the existing update domain block number
387        // is valid, and
388        // more the new block number, then don't update
389        if let Ok((existing_block_hash, _)) =
390            is_valid_domain_block_number(existing_channel_update.block_number)
391            && existing_block_hash.as_ref() == existing_channel_update.block_hash.as_ref()
392            && domain_state_root.as_ref() == existing_channel_update.state_root.as_ref()
393            && existing_channel_update.block_number >= domain_block_number
394        {
395            return Ok(());
396        }
397    }
398
399    let domain_runtime = runtime_api
400        .domain_runtime_code(*consensus_block_header.parent_hash(), src_domain_id)?
401        .ok_or(Error::MissingDomainRuntimeCode)?;
402
403    let runtime_hash = BlakeTwo256::hash(&domain_runtime);
404    let storage_key = match storage_key_cache.get(&(runtime_hash, self_chain_id, channel_id)) {
405        None => {
406            let domain_stateless_runtime =
407                StatelessRuntime::<CBlock, Block, _>::new(executor.clone(), domain_runtime.into());
408            let storage_key = StorageKey(
409                domain_stateless_runtime.channel_storage_key(self_chain_id, channel_id)?,
410            );
411            storage_key_cache.insert(
412                (runtime_hash, self_chain_id, channel_id),
413                storage_key.clone(),
414            );
415            storage_key
416        }
417        Some(key) => key.clone(),
418    };
419
420    let channel_detail = match domain_runtime_type {
421        RuntimeType::Evm => {
422            let channel = StorageProofVerifier::<HashingFor<Block>>::get_decoded_value::<
423                Channel<Balance, AccountId20>,
424            >(&domain_state_root, proof, storage_key)?;
425            ChannelDetail {
426                block_number: domain_block_number,
427                block_hash: H256::from_slice(domain_block_hash.as_ref()),
428                state_root: H256::from_slice(domain_state_root.as_ref()),
429                channel_id,
430                state: channel.state,
431                next_inbox_nonce: channel.next_inbox_nonce,
432                next_outbox_nonce: channel.next_outbox_nonce,
433                latest_response_received_message_nonce: channel
434                    .latest_response_received_message_nonce,
435            }
436        }
437        RuntimeType::AutoId => {
438            let channel = StorageProofVerifier::<HashingFor<Block>>::get_decoded_value::<
439                Channel<Balance, AccountId32>,
440            >(&domain_state_root, proof, storage_key)?;
441            ChannelDetail {
442                block_number: domain_block_number,
443                block_hash: H256::from_slice(domain_block_hash.as_ref()),
444                state_root: H256::from_slice(domain_state_root.as_ref()),
445                channel_id,
446                state: channel.state,
447                next_inbox_nonce: channel.next_inbox_nonce,
448                next_outbox_nonce: channel.next_outbox_nonce,
449                latest_response_received_message_nonce: channel
450                    .latest_response_received_message_nonce,
451            }
452        }
453    };
454
455    set_channel_state(
456        &**consensus_client,
457        ChainId::Domain(src_domain_id),
458        self_chain_id,
459        channel_detail,
460    )?;
461    Ok(())
462}
463
464pub fn can_allow_xdm_submission<Client, Block>(
465    client: &Arc<Client>,
466    xdm_id: XdmId,
467    maybe_submitted_block_id: Option<BlockId<Block>>,
468    current_block_id: BlockId<Block>,
469    maybe_channel_nonce: Option<ChannelNonce>,
470) -> bool
471where
472    Client: HeaderBackend<Block>,
473    Block: BlockT,
474{
475    if let Some(channel_nonce) = maybe_channel_nonce {
476        let maybe_nonces = match (
477            xdm_id,
478            channel_nonce.relay_msg_nonce,
479            channel_nonce.relay_response_msg_nonce,
480        ) {
481            (XdmId::RelayMessage((_, _, nonce)), Some(channel_nonce), _) => {
482                Some((nonce, channel_nonce))
483            }
484            (XdmId::RelayResponseMessage((_, _, nonce)), _, Some(channel_nonce)) => {
485                Some((nonce, channel_nonce))
486            }
487            _ => None,
488        };
489
490        if let Some((xdm_nonce, channel_nonce)) = maybe_nonces
491            && (xdm_nonce <= channel_nonce)
492        {
493            tracing::debug!(
494                "Stale XDM submitted: XDM Nonce: {:?}, Channel Nonce: {:?}",
495                xdm_nonce,
496                channel_nonce
497            );
498            return false;
499        }
500    }
501
502    match maybe_submitted_block_id {
503        None => true,
504        Some(submitted_block_id) => {
505            match client.hash(submitted_block_id.number).ok().flatten() {
506                // there is no block at this number, allow xdm submission
507                None => return true,
508                Some(hash) => {
509                    if hash != submitted_block_id.hash {
510                        // client re-org'ed, allow xdm submission
511                        return true;
512                    }
513                }
514            }
515
516            let latest_block_number = current_block_id.number;
517            let block_limit: NumberFor<Block> = XDM_ACCEPT_BLOCK_LIMIT.saturated_into();
518            submitted_block_id.number < latest_block_number.saturating_sub(block_limit)
519        }
520    }
521}
522
523async fn handle_xdm_message<TxPool, Client, CBlock>(
524    client: &Arc<Client>,
525    tx_pool: &Arc<TxPool>,
526    chain_id: ChainId,
527    ext: ExtrinsicOf<TxPool>,
528) -> Result<bool, Error>
529where
530    TxPool: TransactionPool + 'static,
531    CBlock: BlockT,
532    Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>> + AuxStore,
533    Client::Api: MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>,
534{
535    let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
536    let runtime_api = client.runtime_api();
537    let api_version = runtime_api
538        .api_version::<dyn MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>>(
539            block_id.hash,
540        )?
541        .unwrap_or(1);
542
543    let api_available = api_version >= 2;
544    if api_available {
545        let xdm_id = match runtime_api.xdm_id(block_id.hash, &ext)? {
546            // not a valid xdm, so return as invalid
547            None => return Ok(false),
548            Some(xdm_id) => xdm_id,
549        };
550
551        let (src_chain_id, channel_id) = xdm_id.get_chain_id_and_channel_id();
552        let maybe_channel_nonce =
553            runtime_api.channel_nonce(block_id.hash, src_chain_id, channel_id)?;
554
555        let maybe_submitted_xdm_block = get_xdm_processed_block_number::<_, BlockOf<TxPool>>(
556            &**client,
557            TX_POOL_PREFIX,
558            xdm_id,
559        )?;
560        if !can_allow_xdm_submission(
561            client,
562            xdm_id,
563            maybe_submitted_xdm_block,
564            block_id.clone(),
565            maybe_channel_nonce,
566        ) {
567            tracing::debug!("Skipping XDM[{:?}] submission. At: {:?}", xdm_id, block_id);
568            return Ok(true);
569        }
570
571        tracing::debug!(
572            "Submitting XDM[{:?}] to tx pool for chain {:?} at block: {:?}",
573            xdm_id,
574            chain_id,
575            block_id
576        );
577
578        let tx_pool_res = tx_pool
579            .submit_one(block_id.hash, TransactionSource::External, ext)
580            .await;
581
582        let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
583        if let Err(err) = tx_pool_res {
584            match err.into_pool_error() {
585                Ok(err) => match err {
586                    PoolError::TooLowPriority { .. }
587                    | PoolError::AlreadyImported(..)
588                    | PoolError::TemporarilyBanned => {
589                        tracing::debug!(
590                            "XDM[{:?}] to tx pool for Chain {:?} at block: {:?}: Already included",
591                            xdm_id,
592                            chain_id,
593                            block_id
594                        );
595                        set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
596                    }
597                    _ => {
598                        tracing::error!(
599                            "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}",
600                            xdm_id,
601                            chain_id,
602                            err,
603                            block_id
604                        );
605                    }
606                },
607                Err(err) => {
608                    tracing::error!(
609                        "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}",
610                        xdm_id,
611                        chain_id,
612                        err,
613                        block_id
614                    );
615                }
616            }
617        } else {
618            tracing::debug!(
619                "Submitted XDM[{:?}] to tx pool for chain {:?} at {:?}",
620                xdm_id,
621                chain_id,
622                block_id
623            );
624
625            set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
626        }
627
628        if let Some(channel_nonce) = maybe_channel_nonce {
629            cleanup_chain_channel_storages(
630                &**client,
631                TX_POOL_PREFIX,
632                src_chain_id,
633                channel_id,
634                channel_nonce,
635            )?;
636        }
637
638        Ok(true)
639    } else {
640        let tx_pool_res = tx_pool
641            .submit_one(block_id.hash, TransactionSource::External, ext)
642            .await;
643
644        let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
645        if let Err(err) = tx_pool_res {
646            tracing::error!(
647                "Failed to submit XDM to tx pool for Chain {:?} with error: {:?} at block: {:?}",
648                chain_id,
649                err,
650                block_id
651            );
652        } else {
653            tracing::debug!(
654                "Submitted XDM to tx pool for chain {:?} at {:?}",
655                chain_id,
656                block_id
657            );
658        }
659
660        Ok(true)
661    }
662}