domain_client_message_relayer/
worker.rs

1use crate::{BlockT, Error, GossipMessageSink, HeaderBackend, HeaderT, Relayer, LOG_TARGET};
2use cross_domain_message_gossip::{ChannelUpdate, Message as GossipMessage, MessageData};
3use futures::StreamExt;
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sp_api::{ApiExt, ProvideRuntimeApi};
6use sp_consensus::SyncOracle;
7use sp_domains::{DomainId, DomainsApi};
8use sp_messenger::messages::ChainId;
9use sp_messenger::{MessengerApi, RelayerApi};
10use sp_mmr_primitives::MmrApi;
11use sp_runtime::traits::{CheckedSub, NumberFor, One, Zero};
12use sp_runtime::SaturatedConversion;
13use std::sync::Arc;
14
15pub async fn gossip_channel_updates<Client, Block, CBlock, SO>(
16    chain_id: ChainId,
17    client: Arc<Client>,
18    sync_oracle: SO,
19    gossip_message_sink: GossipMessageSink,
20) where
21    Block: BlockT,
22    CBlock: BlockT,
23    Client: BlockchainEvents<Block>
24        + HeaderBackend<Block>
25        + AuxStore
26        + ProofProvider<Block>
27        + ProvideRuntimeApi<Block>,
28    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
29    SO: SyncOracle,
30{
31    tracing::info!(
32        target: LOG_TARGET,
33        "Starting Channel updates for chain: {:?}",
34        chain_id,
35    );
36    let mut chain_block_imported = client.every_import_notification_stream();
37    while let Some(imported_block) = chain_block_imported.next().await {
38        // if the client is in major sync, wait until sync is complete
39        if sync_oracle.is_major_syncing() {
40            tracing::debug!(target: LOG_TARGET, "Client is in major sync. Skipping...");
41            continue;
42        }
43
44        if !imported_block.is_new_best {
45            tracing::debug!(target: LOG_TARGET, "Imported non-best block. Skipping...");
46            continue;
47        }
48
49        let (block_hash, block_number) = match chain_id {
50            ChainId::Consensus => (
51                imported_block.header.hash(),
52                *imported_block.header.number(),
53            ),
54            ChainId::Domain(_) => {
55                // for domains, we gossip channel updates of imported block - 1
56                // since the execution receipt of the imported block is not registered on consensus
57                // without the execution receipt, we would not be able to verify the storage proof
58                let number = match imported_block.header.number().checked_sub(&One::one()) {
59                    None => continue,
60                    Some(number) => number,
61                };
62
63                let hash = match client.hash(number).ok().flatten() {
64                    Some(hash) => hash,
65                    None => {
66                        tracing::debug!(target: LOG_TARGET, "Missing block hash for number: {:?}", number);
67                        continue;
68                    }
69                };
70
71                (hash, number)
72            }
73        };
74
75        if let Err(err) = do_gossip_channel_updates::<_, _, CBlock>(
76            chain_id,
77            &client,
78            &gossip_message_sink,
79            block_number,
80            block_hash,
81        ) {
82            tracing::error!(target: LOG_TARGET, ?err, "failed to gossip channel update");
83        }
84    }
85}
86
87fn do_gossip_channel_updates<Client, Block, CBlock>(
88    src_chain_id: ChainId,
89    client: &Arc<Client>,
90    gossip_message_sink: &GossipMessageSink,
91    block_number: NumberFor<Block>,
92    block_hash: Block::Hash,
93) -> Result<(), Error>
94where
95    Block: BlockT,
96    CBlock: BlockT,
97    Client: BlockchainEvents<Block>
98        + HeaderBackend<Block>
99        + AuxStore
100        + ProofProvider<Block>
101        + ProvideRuntimeApi<Block>,
102    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
103{
104    let api = client.runtime_api();
105
106    let channels_status_to_broadcast = {
107        let updated_channels = api.updated_channels(block_hash)?;
108
109        // TODO: remove version check before next network
110        let relayer_api_version = api
111            .api_version::<dyn RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>>(block_hash)?
112            // It is safe to return a default version of 1, since there will always be version 1.
113            .unwrap_or(1);
114
115        // if there are no channel updates, broadcast channel's status for every 300 blocks
116        if updated_channels.is_empty()
117            && relayer_api_version >= 2
118            && block_number % 300u32.into() == Zero::zero()
119        {
120            api.open_channels(block_hash)?
121        } else {
122            updated_channels
123        }
124    };
125
126    for (dst_chain_id, channel_id) in channels_status_to_broadcast {
127        let storage_key = api.channel_storage_key(block_hash, dst_chain_id, channel_id)?;
128        let proof = client
129            .read_proof(block_hash, &mut [storage_key.as_ref()].into_iter())
130            .map_err(|_| Error::ConstructStorageProof)?;
131
132        let gossip_message = GossipMessage {
133            chain_id: dst_chain_id,
134            data: MessageData::ChannelUpdate(ChannelUpdate {
135                src_chain_id,
136                channel_id,
137                block_number: block_number.saturated_into(),
138                storage_proof: proof,
139            }),
140        };
141
142        gossip_message_sink
143            .unbounded_send(gossip_message)
144            .map_err(Error::UnableToSubmitCrossDomainMessage)?;
145    }
146
147    Ok(())
148}
149
150pub async fn start_relaying_messages<CClient, Client, CBlock, Block, SO>(
151    domain_id: DomainId,
152    consensus_client: Arc<CClient>,
153    domain_client: Arc<Client>,
154    confirmation_depth_k: NumberFor<CBlock>,
155    sync_oracle: SO,
156    gossip_message_sink: GossipMessageSink,
157) where
158    Block: BlockT,
159    CBlock: BlockT,
160    Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
161    Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
162    CClient: BlockchainEvents<CBlock>
163        + HeaderBackend<CBlock>
164        + ProvideRuntimeApi<CBlock>
165        + ProofProvider<CBlock>
166        + AuxStore,
167    CClient::Api: DomainsApi<CBlock, Block::Header>
168        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
169        + MmrApi<CBlock, sp_core::H256, NumberFor<CBlock>>
170        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
171    SO: SyncOracle + Send,
172{
173    tracing::info!(
174        target: LOG_TARGET,
175        "Starting relayer for domain: {domain_id:?} and the consensus chain",
176    );
177    let mut chain_block_imported = consensus_client.every_import_notification_stream();
178
179    // from the start block, start processing all the messages assigned
180    // wait for new block finalization of the chain,
181    // then fetch new messages in the block
182    // construct proof of each message to be relayed
183    // submit XDM as unsigned extrinsic.
184    while let Some(imported_block) = chain_block_imported.next().await {
185        // if the client is in major sync, wait until sync is complete
186        if sync_oracle.is_major_syncing() {
187            tracing::debug!(target: LOG_TARGET, "Client is in major sync. Skipping...");
188            continue;
189        }
190
191        if !imported_block.is_new_best {
192            tracing::debug!(target: LOG_TARGET, "Imported non-best block. Skipping...");
193            continue;
194        }
195
196        let Some(confirmed_block_number) = imported_block
197            .header
198            .number()
199            .checked_sub(&confirmation_depth_k)
200        else {
201            tracing::debug!(target: LOG_TARGET, "Not enough confirmed blocks. Skipping...");
202            continue;
203        };
204
205        for chain_id in [ChainId::Consensus, ChainId::Domain(domain_id)] {
206            let res = Relayer::construct_and_submit_xdm(
207                chain_id,
208                &domain_client,
209                &consensus_client,
210                confirmed_block_number,
211                &gossip_message_sink,
212            );
213
214            if let Err(err) = res {
215                tracing::error!(
216                    target: LOG_TARGET,
217                    ?err,
218                    "Failed to submit messages from the chain {chain_id:?} at the block ({confirmed_block_number:?}"
219                );
220                continue;
221            }
222        }
223    }
224}