domain_client_message_relayer/
worker.rs

1use crate::{BlockT, Error, GossipMessageSink, HeaderBackend, HeaderT, Relayer};
2use cross_domain_message_gossip::{ChannelUpdate, Message as GossipMessage, MessageData};
3use futures::StreamExt;
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sp_api::ProvideRuntimeApi;
6use sp_consensus::SyncOracle;
7use sp_domains::{DomainId, DomainsApi};
8use sp_messenger::messages::ChainId;
9use sp_messenger::{MessengerApi, RelayerApi};
10use sp_mmr_primitives::MmrApi;
11use sp_runtime::SaturatedConversion;
12use sp_runtime::traits::{CheckedSub, NumberFor, One, Zero};
13use std::sync::Arc;
14
15pub async fn gossip_channel_updates<Client, Block, CBlock, SO>(
16    chain_id: ChainId,
17    client: Arc<Client>,
18    sync_oracle: SO,
19    gossip_message_sink: GossipMessageSink,
20) where
21    Block: BlockT,
22    CBlock: BlockT,
23    Client: BlockchainEvents<Block>
24        + HeaderBackend<Block>
25        + AuxStore
26        + ProofProvider<Block>
27        + ProvideRuntimeApi<Block>,
28    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
29    SO: SyncOracle,
30{
31    tracing::info!("Starting Channel updates for chain: {:?}", chain_id,);
32    let mut chain_block_imported = client.every_import_notification_stream();
33    while let Some(imported_block) = chain_block_imported.next().await {
34        // if the client is in major sync, wait until sync is complete
35        if sync_oracle.is_major_syncing() {
36            tracing::debug!("Client is in major sync. Skipping...");
37            continue;
38        }
39
40        if !imported_block.is_new_best {
41            tracing::debug!("Imported non-best block. Skipping...");
42            continue;
43        }
44
45        let (block_hash, block_number) = match chain_id {
46            ChainId::Consensus => (
47                imported_block.header.hash(),
48                *imported_block.header.number(),
49            ),
50            ChainId::Domain(_) => {
51                // for domains, we gossip channel updates of imported block - 1
52                // since the execution receipt of the imported block is not registered on consensus
53                // without the execution receipt, we would not be able to verify the storage proof
54                let number = match imported_block.header.number().checked_sub(&One::one()) {
55                    None => continue,
56                    Some(number) => number,
57                };
58
59                let hash = match client.hash(number).ok().flatten() {
60                    Some(hash) => hash,
61                    None => {
62                        tracing::debug!("Missing block hash for number: {:?}", number);
63                        continue;
64                    }
65                };
66
67                (hash, number)
68            }
69        };
70
71        if let Err(err) = do_gossip_channel_updates::<_, _, CBlock>(
72            chain_id,
73            &client,
74            &gossip_message_sink,
75            block_number,
76            block_hash,
77        ) {
78            tracing::error!(?err, "failed to gossip channel update");
79        }
80    }
81}
82
83fn do_gossip_channel_updates<Client, Block, CBlock>(
84    src_chain_id: ChainId,
85    client: &Arc<Client>,
86    gossip_message_sink: &GossipMessageSink,
87    block_number: NumberFor<Block>,
88    block_hash: Block::Hash,
89) -> Result<(), Error>
90where
91    Block: BlockT,
92    CBlock: BlockT,
93    Client: BlockchainEvents<Block>
94        + HeaderBackend<Block>
95        + AuxStore
96        + ProofProvider<Block>
97        + ProvideRuntimeApi<Block>,
98    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
99{
100    let api = client.runtime_api();
101
102    let channels_status_to_broadcast = {
103        let updated_channels = api.updated_channels(block_hash)?;
104
105        // if there are no channel updates, broadcast channel's status for every 300 blocks
106        if updated_channels.is_empty() && block_number % 300u32.into() == Zero::zero() {
107            api.open_channels(block_hash)?
108        } else {
109            updated_channels
110        }
111    };
112
113    for (dst_chain_id, channel_id) in channels_status_to_broadcast {
114        let storage_key = api.channel_storage_key(block_hash, dst_chain_id, channel_id)?;
115        let proof = client
116            .read_proof(block_hash, &mut [storage_key.as_ref()].into_iter())
117            .map_err(|_| Error::ConstructStorageProof)?;
118
119        let gossip_message = GossipMessage {
120            chain_id: dst_chain_id,
121            data: MessageData::ChannelUpdate(ChannelUpdate {
122                src_chain_id,
123                channel_id,
124                block_number: block_number.saturated_into(),
125                storage_proof: proof,
126            }),
127        };
128
129        gossip_message_sink
130            .unbounded_send(gossip_message)
131            .map_err(Error::UnableToSubmitCrossDomainMessage)?;
132    }
133
134    Ok(())
135}
136
137pub async fn start_relaying_messages<CClient, Client, CBlock, Block, SO>(
138    domain_id: DomainId,
139    consensus_client: Arc<CClient>,
140    domain_client: Arc<Client>,
141    confirmation_depth_k: NumberFor<CBlock>,
142    sync_oracle: SO,
143    gossip_message_sink: GossipMessageSink,
144) where
145    Block: BlockT,
146    CBlock: BlockT,
147    Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
148    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
149    CClient: BlockchainEvents<CBlock>
150        + HeaderBackend<CBlock>
151        + ProvideRuntimeApi<CBlock>
152        + ProofProvider<CBlock>
153        + AuxStore,
154    CClient::Api: DomainsApi<CBlock, Block::Header>
155        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
156        + MmrApi<CBlock, sp_core::H256, NumberFor<CBlock>>
157        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
158    SO: SyncOracle + Send,
159{
160    tracing::info!("Starting relayer for domain: {domain_id:?} and the consensus chain",);
161    let mut chain_block_imported = consensus_client.every_import_notification_stream();
162
163    // from the start block, start processing all the messages assigned
164    // wait for new block finalization of the chain,
165    // then fetch new messages in the block
166    // construct proof of each message to be relayed
167    // submit XDM as unsigned extrinsic.
168    while let Some(imported_block) = chain_block_imported.next().await {
169        // if the client is in major sync, wait until sync is complete
170        if sync_oracle.is_major_syncing() {
171            tracing::debug!("Client is in major sync. Skipping...");
172            continue;
173        }
174
175        if !imported_block.is_new_best {
176            tracing::debug!("Imported non-best block. Skipping...");
177            continue;
178        }
179
180        let Some(confirmed_block_number) = imported_block
181            .header
182            .number()
183            .checked_sub(&confirmation_depth_k)
184        else {
185            tracing::debug!("Not enough confirmed blocks. Skipping...");
186            continue;
187        };
188
189        for chain_id in [ChainId::Consensus, ChainId::Domain(domain_id)] {
190            let res = Relayer::construct_and_submit_xdm(
191                chain_id,
192                &domain_client,
193                &consensus_client,
194                confirmed_block_number,
195                &gossip_message_sink,
196            );
197
198            if let Err(err) = res {
199                tracing::error!(
200                    ?err,
201                    "Failed to submit messages from the chain {chain_id:?} at the block ({confirmed_block_number:?}"
202                );
203                continue;
204            }
205        }
206    }
207}