domain_client_message_relayer/
worker.rs

1use crate::{BlockT, Error, GossipMessageSink, HeaderBackend, HeaderT, Relayer};
2use cross_domain_message_gossip::{ChannelUpdate, Message as GossipMessage, MessageData};
3use futures::StreamExt;
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sp_api::{ApiExt, ProvideRuntimeApi};
6use sp_consensus::SyncOracle;
7use sp_domains::{DomainId, DomainsApi};
8use sp_messenger::messages::ChainId;
9use sp_messenger::{MessengerApi, RelayerApi};
10use sp_mmr_primitives::MmrApi;
11use sp_runtime::SaturatedConversion;
12use sp_runtime::traits::{CheckedSub, NumberFor, One, Zero};
13use std::sync::Arc;
14
15pub async fn gossip_channel_updates<Client, Block, CBlock, SO>(
16    chain_id: ChainId,
17    client: Arc<Client>,
18    sync_oracle: SO,
19    gossip_message_sink: GossipMessageSink,
20) where
21    Block: BlockT,
22    CBlock: BlockT,
23    Client: BlockchainEvents<Block>
24        + HeaderBackend<Block>
25        + AuxStore
26        + ProofProvider<Block>
27        + ProvideRuntimeApi<Block>,
28    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
29    SO: SyncOracle,
30{
31    tracing::info!("Starting Channel updates for chain: {:?}", chain_id,);
32    let mut chain_block_imported = client.every_import_notification_stream();
33    while let Some(imported_block) = chain_block_imported.next().await {
34        // if the client is in major sync, wait until sync is complete
35        if sync_oracle.is_major_syncing() {
36            tracing::debug!("Client is in major sync. Skipping...");
37            continue;
38        }
39
40        if !imported_block.is_new_best {
41            tracing::debug!("Imported non-best block. Skipping...");
42            continue;
43        }
44
45        let (block_hash, block_number) = match chain_id {
46            ChainId::Consensus => (
47                imported_block.header.hash(),
48                *imported_block.header.number(),
49            ),
50            ChainId::Domain(_) => {
51                // for domains, we gossip channel updates of imported block - 1
52                // since the execution receipt of the imported block is not registered on consensus
53                // without the execution receipt, we would not be able to verify the storage proof
54                let number = match imported_block.header.number().checked_sub(&One::one()) {
55                    None => continue,
56                    Some(number) => number,
57                };
58
59                let hash = match client.hash(number).ok().flatten() {
60                    Some(hash) => hash,
61                    None => {
62                        tracing::debug!("Missing block hash for number: {:?}", number);
63                        continue;
64                    }
65                };
66
67                (hash, number)
68            }
69        };
70
71        if let Err(err) = do_gossip_channel_updates::<_, _, CBlock>(
72            chain_id,
73            &client,
74            &gossip_message_sink,
75            block_number,
76            block_hash,
77        ) {
78            tracing::error!(?err, "failed to gossip channel update");
79        }
80    }
81}
82
83fn do_gossip_channel_updates<Client, Block, CBlock>(
84    src_chain_id: ChainId,
85    client: &Arc<Client>,
86    gossip_message_sink: &GossipMessageSink,
87    block_number: NumberFor<Block>,
88    block_hash: Block::Hash,
89) -> Result<(), Error>
90where
91    Block: BlockT,
92    CBlock: BlockT,
93    Client: BlockchainEvents<Block>
94        + HeaderBackend<Block>
95        + AuxStore
96        + ProofProvider<Block>
97        + ProvideRuntimeApi<Block>,
98    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
99{
100    let api = client.runtime_api();
101
102    let channels_status_to_broadcast = {
103        let updated_channels = api.updated_channels(block_hash)?;
104
105        // TODO: remove version check before next network
106        let relayer_api_version = api
107            .api_version::<dyn RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>>(block_hash)?
108            // It is safe to return a default version of 1, since there will always be version 1.
109            .unwrap_or(1);
110
111        // if there are no channel updates, broadcast channel's status for every 300 blocks
112        if updated_channels.is_empty()
113            && relayer_api_version >= 2
114            && block_number % 300u32.into() == Zero::zero()
115        {
116            api.open_channels(block_hash)?
117        } else {
118            updated_channels
119        }
120    };
121
122    for (dst_chain_id, channel_id) in channels_status_to_broadcast {
123        let storage_key = api.channel_storage_key(block_hash, dst_chain_id, channel_id)?;
124        let proof = client
125            .read_proof(block_hash, &mut [storage_key.as_ref()].into_iter())
126            .map_err(|_| Error::ConstructStorageProof)?;
127
128        let gossip_message = GossipMessage {
129            chain_id: dst_chain_id,
130            data: MessageData::ChannelUpdate(ChannelUpdate {
131                src_chain_id,
132                channel_id,
133                block_number: block_number.saturated_into(),
134                storage_proof: proof,
135            }),
136        };
137
138        gossip_message_sink
139            .unbounded_send(gossip_message)
140            .map_err(Error::UnableToSubmitCrossDomainMessage)?;
141    }
142
143    Ok(())
144}
145
146pub async fn start_relaying_messages<CClient, Client, CBlock, Block, SO>(
147    domain_id: DomainId,
148    consensus_client: Arc<CClient>,
149    domain_client: Arc<Client>,
150    confirmation_depth_k: NumberFor<CBlock>,
151    sync_oracle: SO,
152    gossip_message_sink: GossipMessageSink,
153) where
154    Block: BlockT,
155    CBlock: BlockT,
156    Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
157    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
158    CClient: BlockchainEvents<CBlock>
159        + HeaderBackend<CBlock>
160        + ProvideRuntimeApi<CBlock>
161        + ProofProvider<CBlock>
162        + AuxStore,
163    CClient::Api: DomainsApi<CBlock, Block::Header>
164        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
165        + MmrApi<CBlock, sp_core::H256, NumberFor<CBlock>>
166        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
167    SO: SyncOracle + Send,
168{
169    tracing::info!("Starting relayer for domain: {domain_id:?} and the consensus chain",);
170    let mut chain_block_imported = consensus_client.every_import_notification_stream();
171
172    // from the start block, start processing all the messages assigned
173    // wait for new block finalization of the chain,
174    // then fetch new messages in the block
175    // construct proof of each message to be relayed
176    // submit XDM as unsigned extrinsic.
177    while let Some(imported_block) = chain_block_imported.next().await {
178        // if the client is in major sync, wait until sync is complete
179        if sync_oracle.is_major_syncing() {
180            tracing::debug!("Client is in major sync. Skipping...");
181            continue;
182        }
183
184        if !imported_block.is_new_best {
185            tracing::debug!("Imported non-best block. Skipping...");
186            continue;
187        }
188
189        let Some(confirmed_block_number) = imported_block
190            .header
191            .number()
192            .checked_sub(&confirmation_depth_k)
193        else {
194            tracing::debug!("Not enough confirmed blocks. Skipping...");
195            continue;
196        };
197
198        for chain_id in [ChainId::Consensus, ChainId::Domain(domain_id)] {
199            let res = Relayer::construct_and_submit_xdm(
200                chain_id,
201                &domain_client,
202                &consensus_client,
203                confirmed_block_number,
204                &gossip_message_sink,
205            );
206
207            if let Err(err) = res {
208                tracing::error!(
209                    ?err,
210                    "Failed to submit messages from the chain {chain_id:?} at the block ({confirmed_block_number:?}"
211                );
212                continue;
213            }
214        }
215    }
216}