domain_client_message_relayer/
worker.rs1use crate::{BlockT, Error, GossipMessageSink, HeaderBackend, HeaderT, Relayer};
2use cross_domain_message_gossip::{ChannelUpdate, Message as GossipMessage, MessageData};
3use futures::StreamExt;
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sp_api::{ApiExt, ProvideRuntimeApi};
6use sp_consensus::SyncOracle;
7use sp_domains::{DomainId, DomainsApi};
8use sp_messenger::messages::ChainId;
9use sp_messenger::{MessengerApi, RelayerApi};
10use sp_mmr_primitives::MmrApi;
11use sp_runtime::SaturatedConversion;
12use sp_runtime::traits::{CheckedSub, NumberFor, One, Zero};
13use std::sync::Arc;
14
15pub async fn gossip_channel_updates<Client, Block, CBlock, SO>(
16 chain_id: ChainId,
17 client: Arc<Client>,
18 sync_oracle: SO,
19 gossip_message_sink: GossipMessageSink,
20) where
21 Block: BlockT,
22 CBlock: BlockT,
23 Client: BlockchainEvents<Block>
24 + HeaderBackend<Block>
25 + AuxStore
26 + ProofProvider<Block>
27 + ProvideRuntimeApi<Block>,
28 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
29 SO: SyncOracle,
30{
31 tracing::info!("Starting Channel updates for chain: {:?}", chain_id,);
32 let mut chain_block_imported = client.every_import_notification_stream();
33 while let Some(imported_block) = chain_block_imported.next().await {
34 if sync_oracle.is_major_syncing() {
36 tracing::debug!("Client is in major sync. Skipping...");
37 continue;
38 }
39
40 if !imported_block.is_new_best {
41 tracing::debug!("Imported non-best block. Skipping...");
42 continue;
43 }
44
45 let (block_hash, block_number) = match chain_id {
46 ChainId::Consensus => (
47 imported_block.header.hash(),
48 *imported_block.header.number(),
49 ),
50 ChainId::Domain(_) => {
51 let number = match imported_block.header.number().checked_sub(&One::one()) {
55 None => continue,
56 Some(number) => number,
57 };
58
59 let hash = match client.hash(number).ok().flatten() {
60 Some(hash) => hash,
61 None => {
62 tracing::debug!("Missing block hash for number: {:?}", number);
63 continue;
64 }
65 };
66
67 (hash, number)
68 }
69 };
70
71 if let Err(err) = do_gossip_channel_updates::<_, _, CBlock>(
72 chain_id,
73 &client,
74 &gossip_message_sink,
75 block_number,
76 block_hash,
77 ) {
78 tracing::error!(?err, "failed to gossip channel update");
79 }
80 }
81}
82
83fn do_gossip_channel_updates<Client, Block, CBlock>(
84 src_chain_id: ChainId,
85 client: &Arc<Client>,
86 gossip_message_sink: &GossipMessageSink,
87 block_number: NumberFor<Block>,
88 block_hash: Block::Hash,
89) -> Result<(), Error>
90where
91 Block: BlockT,
92 CBlock: BlockT,
93 Client: BlockchainEvents<Block>
94 + HeaderBackend<Block>
95 + AuxStore
96 + ProofProvider<Block>
97 + ProvideRuntimeApi<Block>,
98 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
99{
100 let api = client.runtime_api();
101
102 let channels_status_to_broadcast = {
103 let updated_channels = api.updated_channels(block_hash)?;
104
105 let relayer_api_version = api
107 .api_version::<dyn RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>>(block_hash)?
108 .unwrap_or(1);
110
111 if updated_channels.is_empty()
113 && relayer_api_version >= 2
114 && block_number % 300u32.into() == Zero::zero()
115 {
116 api.open_channels(block_hash)?
117 } else {
118 updated_channels
119 }
120 };
121
122 for (dst_chain_id, channel_id) in channels_status_to_broadcast {
123 let storage_key = api.channel_storage_key(block_hash, dst_chain_id, channel_id)?;
124 let proof = client
125 .read_proof(block_hash, &mut [storage_key.as_ref()].into_iter())
126 .map_err(|_| Error::ConstructStorageProof)?;
127
128 let gossip_message = GossipMessage {
129 chain_id: dst_chain_id,
130 data: MessageData::ChannelUpdate(ChannelUpdate {
131 src_chain_id,
132 channel_id,
133 block_number: block_number.saturated_into(),
134 storage_proof: proof,
135 }),
136 };
137
138 gossip_message_sink
139 .unbounded_send(gossip_message)
140 .map_err(Error::UnableToSubmitCrossDomainMessage)?;
141 }
142
143 Ok(())
144}
145
146pub async fn start_relaying_messages<CClient, Client, CBlock, Block, SO>(
147 domain_id: DomainId,
148 consensus_client: Arc<CClient>,
149 domain_client: Arc<Client>,
150 confirmation_depth_k: NumberFor<CBlock>,
151 sync_oracle: SO,
152 gossip_message_sink: GossipMessageSink,
153) where
154 Block: BlockT,
155 CBlock: BlockT,
156 Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
157 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
158 CClient: BlockchainEvents<CBlock>
159 + HeaderBackend<CBlock>
160 + ProvideRuntimeApi<CBlock>
161 + ProofProvider<CBlock>
162 + AuxStore,
163 CClient::Api: DomainsApi<CBlock, Block::Header>
164 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
165 + MmrApi<CBlock, sp_core::H256, NumberFor<CBlock>>
166 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
167 SO: SyncOracle + Send,
168{
169 tracing::info!("Starting relayer for domain: {domain_id:?} and the consensus chain",);
170 let mut chain_block_imported = consensus_client.every_import_notification_stream();
171
172 while let Some(imported_block) = chain_block_imported.next().await {
178 if sync_oracle.is_major_syncing() {
180 tracing::debug!("Client is in major sync. Skipping...");
181 continue;
182 }
183
184 if !imported_block.is_new_best {
185 tracing::debug!("Imported non-best block. Skipping...");
186 continue;
187 }
188
189 let Some(confirmed_block_number) = imported_block
190 .header
191 .number()
192 .checked_sub(&confirmation_depth_k)
193 else {
194 tracing::debug!("Not enough confirmed blocks. Skipping...");
195 continue;
196 };
197
198 for chain_id in [ChainId::Consensus, ChainId::Domain(domain_id)] {
199 let res = Relayer::construct_and_submit_xdm(
200 chain_id,
201 &domain_client,
202 &consensus_client,
203 confirmed_block_number,
204 &gossip_message_sink,
205 );
206
207 if let Err(err) = res {
208 tracing::error!(
209 ?err,
210 "Failed to submit messages from the chain {chain_id:?} at the block ({confirmed_block_number:?}"
211 );
212 continue;
213 }
214 }
215 }
216}