domain_client_message_relayer/
worker.rs1use crate::{BlockT, Error, GossipMessageSink, HeaderBackend, HeaderT, Relayer};
2use cross_domain_message_gossip::{ChannelUpdate, Message as GossipMessage, MessageData};
3use futures::StreamExt;
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sp_api::ProvideRuntimeApi;
6use sp_consensus::SyncOracle;
7use sp_domains::{DomainId, DomainsApi};
8use sp_messenger::messages::ChainId;
9use sp_messenger::{MessengerApi, RelayerApi};
10use sp_mmr_primitives::MmrApi;
11use sp_runtime::SaturatedConversion;
12use sp_runtime::traits::{CheckedSub, NumberFor, One, Zero};
13use std::sync::Arc;
14
15pub async fn gossip_channel_updates<Client, Block, CBlock, SO>(
16 chain_id: ChainId,
17 client: Arc<Client>,
18 sync_oracle: SO,
19 gossip_message_sink: GossipMessageSink,
20) where
21 Block: BlockT,
22 CBlock: BlockT,
23 Client: BlockchainEvents<Block>
24 + HeaderBackend<Block>
25 + AuxStore
26 + ProofProvider<Block>
27 + ProvideRuntimeApi<Block>,
28 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
29 SO: SyncOracle,
30{
31 tracing::info!("Starting Channel updates for chain: {:?}", chain_id,);
32 let mut chain_block_imported = client.every_import_notification_stream();
33 while let Some(imported_block) = chain_block_imported.next().await {
34 if sync_oracle.is_major_syncing() {
36 tracing::debug!("Client is in major sync. Skipping...");
37 continue;
38 }
39
40 if !imported_block.is_new_best {
41 tracing::debug!("Imported non-best block. Skipping...");
42 continue;
43 }
44
45 let (block_hash, block_number) = match chain_id {
46 ChainId::Consensus => (
47 imported_block.header.hash(),
48 *imported_block.header.number(),
49 ),
50 ChainId::Domain(_) => {
51 let number = match imported_block.header.number().checked_sub(&One::one()) {
55 None => continue,
56 Some(number) => number,
57 };
58
59 let hash = match client.hash(number).ok().flatten() {
60 Some(hash) => hash,
61 None => {
62 tracing::debug!("Missing block hash for number: {:?}", number);
63 continue;
64 }
65 };
66
67 (hash, number)
68 }
69 };
70
71 if let Err(err) = do_gossip_channel_updates::<_, _, CBlock>(
72 chain_id,
73 &client,
74 &gossip_message_sink,
75 block_number,
76 block_hash,
77 ) {
78 tracing::error!(?err, "failed to gossip channel update");
79 }
80 }
81}
82
83fn do_gossip_channel_updates<Client, Block, CBlock>(
84 src_chain_id: ChainId,
85 client: &Arc<Client>,
86 gossip_message_sink: &GossipMessageSink,
87 block_number: NumberFor<Block>,
88 block_hash: Block::Hash,
89) -> Result<(), Error>
90where
91 Block: BlockT,
92 CBlock: BlockT,
93 Client: BlockchainEvents<Block>
94 + HeaderBackend<Block>
95 + AuxStore
96 + ProofProvider<Block>
97 + ProvideRuntimeApi<Block>,
98 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
99{
100 let api = client.runtime_api();
101
102 let channels_status_to_broadcast = {
103 let updated_channels = api.updated_channels(block_hash)?;
104
105 if updated_channels.is_empty() && block_number % 300u32.into() == Zero::zero() {
107 api.open_channels(block_hash)?
108 } else {
109 updated_channels
110 }
111 };
112
113 for (dst_chain_id, channel_id) in channels_status_to_broadcast {
114 let storage_key = api.channel_storage_key(block_hash, dst_chain_id, channel_id)?;
115 let proof = client
116 .read_proof(block_hash, &mut [storage_key.as_ref()].into_iter())
117 .map_err(|_| Error::ConstructStorageProof)?;
118
119 let gossip_message = GossipMessage {
120 chain_id: dst_chain_id,
121 data: MessageData::ChannelUpdate(ChannelUpdate {
122 src_chain_id,
123 channel_id,
124 block_number: block_number.saturated_into(),
125 storage_proof: proof,
126 }),
127 };
128
129 gossip_message_sink
130 .unbounded_send(gossip_message)
131 .map_err(Error::UnableToSubmitCrossDomainMessage)?;
132 }
133
134 Ok(())
135}
136
137pub async fn start_relaying_messages<CClient, Client, CBlock, Block, SO>(
138 domain_id: DomainId,
139 consensus_client: Arc<CClient>,
140 domain_client: Arc<Client>,
141 confirmation_depth_k: NumberFor<CBlock>,
142 sync_oracle: SO,
143 gossip_message_sink: GossipMessageSink,
144) where
145 Block: BlockT,
146 CBlock: BlockT,
147 Client: HeaderBackend<Block> + AuxStore + ProofProvider<Block> + ProvideRuntimeApi<Block>,
148 Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
149 CClient: BlockchainEvents<CBlock>
150 + HeaderBackend<CBlock>
151 + ProvideRuntimeApi<CBlock>
152 + ProofProvider<CBlock>
153 + AuxStore,
154 CClient::Api: DomainsApi<CBlock, Block::Header>
155 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
156 + MmrApi<CBlock, sp_core::H256, NumberFor<CBlock>>
157 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
158 SO: SyncOracle + Send,
159{
160 tracing::info!("Starting relayer for domain: {domain_id:?} and the consensus chain",);
161 let mut chain_block_imported = consensus_client.every_import_notification_stream();
162
163 while let Some(imported_block) = chain_block_imported.next().await {
169 if sync_oracle.is_major_syncing() {
171 tracing::debug!("Client is in major sync. Skipping...");
172 continue;
173 }
174
175 if !imported_block.is_new_best {
176 tracing::debug!("Imported non-best block. Skipping...");
177 continue;
178 }
179
180 let Some(confirmed_block_number) = imported_block
181 .header
182 .number()
183 .checked_sub(&confirmation_depth_k)
184 else {
185 tracing::debug!("Not enough confirmed blocks. Skipping...");
186 continue;
187 };
188
189 for chain_id in [ChainId::Consensus, ChainId::Domain(domain_id)] {
190 let res = Relayer::construct_and_submit_xdm(
191 chain_id,
192 &domain_client,
193 &consensus_client,
194 confirmed_block_number,
195 &gossip_message_sink,
196 );
197
198 if let Err(err) = res {
199 tracing::error!(
200 ?err,
201 "Failed to submit messages from the chain {chain_id:?} at the block ({confirmed_block_number:?}"
202 );
203 continue;
204 }
205 }
206 }
207}