domain_client_message_relayer/
worker.rs1use crate::{BlockT, Error, GossipMessageSink, HeaderBackend, HeaderT, Relayer, LOG_TARGET};
2use cross_domain_message_gossip::{ChannelUpdate, Message as GossipMessage, MessageData};
3use futures::StreamExt;
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sp_api::{ApiExt, ProvideRuntimeApi};
6use sp_consensus::SyncOracle;
7use sp_domains::{DomainId, DomainsApi};
8use sp_messenger::messages::ChainId;
9use sp_messenger::{MessengerApi, RelayerApi};
10use sp_mmr_primitives::MmrApi;
11use sp_runtime::traits::{CheckedSub, NumberFor, One, Zero};
12use sp_runtime::SaturatedConversion;
13use std::sync::Arc;
14
15pub async fn gossip_channel_updates<Client, Block, CBlock, SO>(
16 chain_id: ChainId,
17 client: Arc<Client>,
18 sync_oracle: SO,
19 gossip_message_sink: GossipMessageSink,
20) where
21 Block: BlockT,
22 CBlock: BlockT,
23 Client: BlockchainEvents<Block>
24 + HeaderBackend<Block>
25 + AuxStore
26 + ProofProvider<Block>
27 + ProvideRuntimeApi<Block>,
28 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
29 SO: SyncOracle,
30{
31 tracing::info!(
32 target: LOG_TARGET,
33 "Starting Channel updates for chain: {:?}",
34 chain_id,
35 );
36 let mut chain_block_imported = client.every_import_notification_stream();
37 while let Some(imported_block) = chain_block_imported.next().await {
38 if sync_oracle.is_major_syncing() {
40 tracing::debug!(target: LOG_TARGET, "Client is in major sync. Skipping...");
41 continue;
42 }
43
44 if !imported_block.is_new_best {
45 tracing::debug!(target: LOG_TARGET, "Imported non-best block. Skipping...");
46 continue;
47 }
48
49 let (block_hash, block_number) = match chain_id {
50 ChainId::Consensus => (
51 imported_block.header.hash(),
52 *imported_block.header.number(),
53 ),
54 ChainId::Domain(_) => {
55 let number = match imported_block.header.number().checked_sub(&One::one()) {
59 None => continue,
60 Some(number) => number,
61 };
62
63 let hash = match client.hash(number).ok().flatten() {
64 Some(hash) => hash,
65 None => {
66 tracing::debug!(target: LOG_TARGET, "Missing block hash for number: {:?}", number);
67 continue;
68 }
69 };
70
71 (hash, number)
72 }
73 };
74
75 if let Err(err) = do_gossip_channel_updates::<_, _, CBlock>(
76 chain_id,
77 &client,
78 &gossip_message_sink,
79 block_number,
80 block_hash,
81 ) {
82 tracing::error!(target: LOG_TARGET, ?err, "failed to gossip channel update");
83 }
84 }
85}
86
87fn do_gossip_channel_updates<Client, Block, CBlock>(
88 src_chain_id: ChainId,
89 client: &Arc<Client>,
90 gossip_message_sink: &GossipMessageSink,
91 block_number: NumberFor<Block>,
92 block_hash: Block::Hash,
93) -> Result<(), Error>
94where
95 Block: BlockT,
96 CBlock: BlockT,
97 Client: BlockchainEvents<Block>
98 + HeaderBackend<Block>
99 + AuxStore
100 + ProofProvider<Block>
101 + ProvideRuntimeApi<Block>,
102 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
103{
104 let api = client.runtime_api();
105
106 let channels_status_to_broadcast = {
107 let updated_channels = api.updated_channels(block_hash)?;
108
109 let relayer_api_version = api
111 .api_version::<dyn RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>>(block_hash)?
112 .unwrap_or(1);
114
115 if updated_channels.is_empty()
117 && relayer_api_version >= 2
118 && block_number % 300u32.into() == Zero::zero()
119 {
120 api.open_channels(block_hash)?
121 } else {
122 updated_channels
123 }
124 };
125
126 for (dst_chain_id, channel_id) in channels_status_to_broadcast {
127 let storage_key = api.channel_storage_key(block_hash, dst_chain_id, channel_id)?;
128 let proof = client
129 .read_proof(block_hash, &mut [storage_key.as_ref()].into_iter())
130 .map_err(|_| Error::ConstructStorageProof)?;
131
132 let gossip_message = GossipMessage {
133 chain_id: dst_chain_id,
134 data: MessageData::ChannelUpdate(ChannelUpdate {
135 src_chain_id,
136 channel_id,
137 block_number: block_number.saturated_into(),
138 storage_proof: proof,
139 }),
140 };
141
142 gossip_message_sink
143 .unbounded_send(gossip_message)
144 .map_err(Error::UnableToSubmitCrossDomainMessage)?;
145 }
146
147 Ok(())
148}
149
150pub async fn start_relaying_messages<CClient, Client, CBlock, Block, SO>(
151 domain_id: DomainId,
152 consensus_client: Arc<CClient>,
153 domain_client: Arc<Client>,
154 confirmation_depth_k: NumberFor<CBlock>,
155 sync_oracle: SO,
156 gossip_message_sink: GossipMessageSink,
157) where
158 Block: BlockT,
159 CBlock: BlockT,
160 Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
161 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
162 CClient: BlockchainEvents<CBlock>
163 + HeaderBackend<CBlock>
164 + ProvideRuntimeApi<CBlock>
165 + ProofProvider<CBlock>
166 + AuxStore,
167 CClient::Api: DomainsApi<CBlock, Block::Header>
168 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
169 + MmrApi<CBlock, sp_core::H256, NumberFor<CBlock>>
170 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
171 SO: SyncOracle + Send,
172{
173 tracing::info!(
174 target: LOG_TARGET,
175 "Starting relayer for domain: {domain_id:?} and the consensus chain",
176 );
177 let mut chain_block_imported = consensus_client.every_import_notification_stream();
178
179 while let Some(imported_block) = chain_block_imported.next().await {
185 if sync_oracle.is_major_syncing() {
187 tracing::debug!(target: LOG_TARGET, "Client is in major sync. Skipping...");
188 continue;
189 }
190
191 if !imported_block.is_new_best {
192 tracing::debug!(target: LOG_TARGET, "Imported non-best block. Skipping...");
193 continue;
194 }
195
196 let Some(confirmed_block_number) = imported_block
197 .header
198 .number()
199 .checked_sub(&confirmation_depth_k)
200 else {
201 tracing::debug!(target: LOG_TARGET, "Not enough confirmed blocks. Skipping...");
202 continue;
203 };
204
205 for chain_id in [ChainId::Consensus, ChainId::Domain(domain_id)] {
206 let res = Relayer::construct_and_submit_xdm(
207 chain_id,
208 &domain_client,
209 &consensus_client,
210 confirmed_block_number,
211 &gossip_message_sink,
212 );
213
214 if let Err(err) = res {
215 tracing::error!(
216 target: LOG_TARGET,
217 ?err,
218 "Failed to submit messages from the chain {chain_id:?} at the block ({confirmed_block_number:?}"
219 );
220 continue;
221 }
222 }
223 }
224}