1use crate::aux_schema::{
2 BlockId, cleanup_chain_channel_storages, get_channel_state, get_xdm_processed_block_number,
3 set_channel_state, set_xdm_message_processed_at,
4};
5use crate::gossip_worker::{ChannelUpdate, MessageData};
6use crate::{ChainMsg, ChannelDetail};
7use domain_block_preprocessor::stateless_runtime::StatelessRuntime;
8use fp_account::AccountId20;
9use futures::{Stream, StreamExt};
10use sc_client_api::AuxStore;
11use sc_executor::RuntimeVersionOf;
12use sc_network::NetworkPeers;
13use sc_transaction_pool_api::error::{Error as PoolError, IntoPoolError};
14use sc_transaction_pool_api::{TransactionPool, TransactionSource};
15use sp_api::{ApiError, ApiExt, ProvideRuntimeApi, StorageProof};
16use sp_blockchain::HeaderBackend;
17use sp_consensus::SyncOracle;
18use sp_core::crypto::AccountId32;
19use sp_core::storage::StorageKey;
20use sp_core::traits::CodeExecutor;
21use sp_core::{H256, Hasher};
22use sp_domains::proof_provider_and_verifier::{StorageProofVerifier, VerificationError};
23use sp_domains::{DomainId, DomainsApi, RuntimeType};
24use sp_messenger::messages::{ChainId, Channel, ChannelId};
25use sp_messenger::{ChannelNonce, MessengerApi, RelayerApi, XdmId};
26use sp_runtime::codec::Decode;
27use sp_runtime::traits::{BlakeTwo256, Block as BlockT, HashingFor, Header, NumberFor};
28use sp_runtime::{SaturatedConversion, Saturating};
29use std::collections::BTreeMap;
30use std::sync::Arc;
31use subspace_runtime_primitives::{Balance, BlockNumber};
32use thiserror::Error;
33
34const TX_POOL_PREFIX: &[u8] = b"xdm_tx_pool_listener";
35
36const XDM_ACCEPT_BLOCK_LIMIT: u32 = 15;
38
39type BlockOf<T> = <T as TransactionPool>::Block;
40type HeaderOf<T> = <<T as TransactionPool>::Block as BlockT>::Header;
41type ExtrinsicOf<T> = <<T as TransactionPool>::Block as BlockT>::Extrinsic;
42
43#[derive(Debug, Error)]
44pub enum Error {
45 #[error("Blockchain error: {0}")]
47 Blockchain(Box<sp_blockchain::Error>),
48 #[error("Api error: {0}")]
50 Api(sp_api::ApiError),
51 #[error("Missing block hash")]
53 MissingBlockHash,
54 #[error("Missing block header")]
56 MissingBlockHeader,
57 #[error("Missing domain runtime code")]
59 MissingDomainRuntimeCode,
60 #[error("Missing domain receipt hash")]
62 MissingDomainReceiptHash,
63 #[error("Bad domain receipt hash")]
65 BadDomainReceiptHash,
66 #[error("Missing domain receipt")]
68 MissingDomainReceipt,
69 #[error("Proof error: {0}")]
71 Proof(VerificationError),
72}
73
74impl From<sp_api::ApiError> for Error {
75 fn from(value: ApiError) -> Self {
76 Error::Api(value)
77 }
78}
79
80impl From<sp_blockchain::Error> for Error {
81 fn from(value: sp_blockchain::Error) -> Self {
82 Error::Blockchain(Box::new(value))
83 }
84}
85
86impl From<VerificationError> for Error {
87 fn from(value: VerificationError) -> Self {
88 Error::Proof(value)
89 }
90}
91
92#[allow(clippy::too_many_arguments)]
93pub async fn start_cross_chain_message_listener<
94 Client,
95 TxPool,
96 TxnListener,
97 CClient,
98 CBlock,
99 Executor,
100 SO,
101>(
102 chain_id: ChainId,
103 consensus_client: Arc<CClient>,
104 client: Arc<Client>,
105 tx_pool: Arc<TxPool>,
106 network: Arc<dyn NetworkPeers + Send + Sync>,
107 mut listener: TxnListener,
108 domain_executor: Arc<Executor>,
109 sync_oracle: SO,
110) where
111 TxPool: TransactionPool + 'static,
112 Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>> + AuxStore,
113 CBlock: BlockT,
114 Client::Api: MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>,
115 CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
116 CClient::Api: DomainsApi<CBlock, HeaderOf<TxPool>>
117 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
118 TxnListener: Stream<Item = ChainMsg> + Unpin,
119 Executor: CodeExecutor + RuntimeVersionOf,
120 SO: SyncOracle + Send,
121{
122 tracing::info!("Starting transaction listener for Chain: {:?}", chain_id);
123
124 let mut domain_storage_key_cache = BTreeMap::<(H256, ChainId, ChannelId), StorageKey>::new();
125
126 while let Some(msg) = listener.next().await {
127 if sync_oracle.is_major_syncing() {
129 continue;
130 }
131
132 tracing::debug!("Message received for Chain: {:?}", chain_id,);
133
134 match msg.data {
135 MessageData::Xdm(encoded_data) => {
136 let ext = match ExtrinsicOf::<TxPool>::decode(&mut encoded_data.as_ref()) {
137 Ok(ext) => ext,
138 Err(err) => {
139 tracing::error!(
140 "Failed to decode message: {:?} with error: {:?}",
141 encoded_data,
142 err
143 );
144 if let Some(peer_id) = msg.maybe_peer {
145 network.report_peer(
146 peer_id,
147 crate::gossip_worker::rep::GOSSIP_NOT_DECODABLE,
148 );
149 }
150 continue;
151 }
152 };
153
154 if let Ok(valid) =
155 handle_xdm_message::<_, _, CBlock>(&client, &tx_pool, chain_id, ext).await
156 && !valid
157 {
158 if let Some(peer_id) = msg.maybe_peer {
159 network.report_peer(peer_id, crate::gossip_worker::rep::NOT_XDM);
160 }
161 continue;
162 }
163 }
164 MessageData::ChannelUpdate(channel_update) => {
165 handle_channel_update::<_, _, _, BlockOf<TxPool>>(
166 chain_id,
167 channel_update,
168 &consensus_client,
169 domain_executor.clone(),
170 &mut domain_storage_key_cache,
171 )
172 }
173 }
174 }
175}
176
177fn handle_channel_update<CClient, CBlock, Executor, Block>(
178 chain_id: ChainId,
179 channel_update: ChannelUpdate,
180 consensus_client: &Arc<CClient>,
181 executor: Arc<Executor>,
182 domain_storage_key_cache: &mut BTreeMap<(H256, ChainId, ChannelId), StorageKey>,
183) where
184 CBlock: BlockT,
185 Block: BlockT,
186 CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
187 CClient::Api: DomainsApi<CBlock, Block::Header>
188 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
189 Executor: CodeExecutor + RuntimeVersionOf,
190{
191 let ChannelUpdate {
192 src_chain_id,
193 channel_id,
194 block_number,
195 storage_proof,
196 } = channel_update;
197
198 match src_chain_id {
199 ChainId::Consensus => {
200 if let Err(err) = handle_consensus_channel_update(
201 chain_id,
202 channel_id,
203 consensus_client,
204 block_number,
205 storage_proof,
206 ) {
207 tracing::debug!(
208 "Failed to update channel update from {:?} to {:?}: {:?}",
209 ChainId::Consensus,
210 chain_id,
211 err
212 );
213 } else {
214 tracing::debug!(
215 "Updated channel state from {:?} to {:?}: {:?}",
216 ChainId::Consensus,
217 chain_id,
218 channel_id
219 );
220 }
221 }
222 ChainId::Domain(domain_id) => {
223 if let Err(err) = handle_domain_channel_update::<_, _, _, Block>(
224 domain_id,
225 chain_id,
226 channel_id,
227 consensus_client,
228 block_number,
229 storage_proof,
230 executor,
231 domain_storage_key_cache,
232 ) {
233 tracing::debug!(
234 "Failed to update channel update from {:?} to {:?}: {:?}",
235 ChainId::Domain(domain_id),
236 chain_id,
237 err
238 );
239 } else {
240 tracing::debug!(
241 "Updated channel state from {:?} to {:?}: {:?}",
242 ChainId::Domain(domain_id),
243 chain_id,
244 channel_id
245 );
246 }
247 }
248 };
249}
250
251fn handle_consensus_channel_update<CClient, CBlock>(
252 self_chain_id: ChainId,
253 channel_id: ChannelId,
254 consensus_client: &Arc<CClient>,
255 consensus_block_number: BlockNumber,
256 proof: StorageProof,
257) -> Result<(), Error>
258where
259 CBlock: BlockT,
260 CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
261 CClient::Api: RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
262{
263 let consensus_block_hash = consensus_client
265 .hash(consensus_block_number.into())?
266 .ok_or(Error::MissingBlockHash)?;
267
268 let maybe_existing_channel_detail = get_channel_state(
269 &**consensus_client,
270 ChainId::Consensus,
271 self_chain_id,
272 channel_id,
273 )?;
274
275 let api = consensus_client.runtime_api();
276 let best_hash = consensus_client.info().best_hash;
277 let header = consensus_client.expect_header(consensus_block_hash)?;
278
279 if let Some(existing_channel_update) = maybe_existing_channel_detail {
283 let maybe_block_hash =
284 consensus_client.hash(existing_channel_update.block_number.into())?;
285 if let Some(block_hash) = maybe_block_hash
286 && block_hash.as_ref() == existing_channel_update.block_hash.as_ref()
287 && header.state_root().as_ref() == existing_channel_update.state_root.as_ref()
288 && existing_channel_update.block_number >= consensus_block_number
289 {
290 return Ok(());
291 }
292 }
293
294 let storage_key = StorageKey(api.channel_storage_key(best_hash, self_chain_id, channel_id)?);
295 let channel = StorageProofVerifier::<HashingFor<CBlock>>::get_decoded_value::<
296 Channel<Balance, AccountId32>,
297 >(header.state_root(), proof, storage_key)?;
298
299 let channel_detail = ChannelDetail {
300 block_number: consensus_block_number,
301 block_hash: H256::from_slice(consensus_block_hash.as_ref()),
302 state_root: H256::from_slice(header.state_root().as_ref()),
303 channel_id,
304 state: channel.state,
305 next_inbox_nonce: channel.next_inbox_nonce,
306 next_outbox_nonce: channel.next_outbox_nonce,
307 latest_response_received_message_nonce: channel.latest_response_received_message_nonce,
308 };
309
310 set_channel_state(
311 &**consensus_client,
312 ChainId::Consensus,
313 self_chain_id,
314 channel_detail,
315 )?;
316 Ok(())
317}
318
319#[allow(clippy::too_many_arguments)]
320fn handle_domain_channel_update<CClient, CBlock, Executor, Block>(
321 src_domain_id: DomainId,
322 self_chain_id: ChainId,
323 channel_id: ChannelId,
324 consensus_client: &Arc<CClient>,
325 domain_block_number: BlockNumber,
326 proof: StorageProof,
327 executor: Arc<Executor>,
328 storage_key_cache: &mut BTreeMap<(H256, ChainId, ChannelId), StorageKey>,
329) -> Result<(), Error>
330where
331 CBlock: BlockT,
332 Block: BlockT,
333 CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
334 CClient::Api: DomainsApi<CBlock, Block::Header>,
335 Executor: CodeExecutor + RuntimeVersionOf,
336{
337 let runtime_api = consensus_client.runtime_api();
338 let consensus_best_hash = consensus_client.info().best_hash;
339 let consensus_block_header = consensus_client
340 .header(consensus_best_hash)?
341 .ok_or(Error::MissingBlockHeader)?;
342
343 let domain_runtime_type = runtime_api
344 .domain_instance_data(*consensus_block_header.parent_hash(), src_domain_id)?
345 .ok_or(Error::MissingDomainRuntimeCode)?
346 .0
347 .runtime_type;
348
349 let is_valid_domain_block_number =
350 |block_number: BlockNumber| -> Result<(Block::Hash, Block::Hash), Error> {
351 let runtime_api = consensus_client.runtime_api();
352 let receipt_hash = runtime_api
353 .receipt_hash(consensus_best_hash, src_domain_id, block_number.into())?
354 .ok_or(Error::MissingDomainReceiptHash)?;
355
356 if runtime_api.is_bad_er_pending_to_prune(
358 consensus_best_hash,
359 src_domain_id,
360 receipt_hash,
361 )? {
362 return Err(Error::BadDomainReceiptHash);
363 }
364
365 let receipt = runtime_api
366 .execution_receipt(consensus_best_hash, receipt_hash)?
367 .ok_or(Error::MissingDomainReceipt)?;
368
369 Ok((receipt.domain_block_hash, receipt.final_state_root))
370 };
371
372 let (domain_block_hash, domain_state_root) = is_valid_domain_block_number(domain_block_number)?;
374
375 let maybe_existing_channel_detail = get_channel_state(
379 &**consensus_client,
380 ChainId::Domain(src_domain_id),
381 self_chain_id,
382 channel_id,
383 )?;
384
385 if let Some(existing_channel_update) = maybe_existing_channel_detail {
386 if let Ok((existing_block_hash, _)) =
390 is_valid_domain_block_number(existing_channel_update.block_number)
391 && existing_block_hash.as_ref() == existing_channel_update.block_hash.as_ref()
392 && domain_state_root.as_ref() == existing_channel_update.state_root.as_ref()
393 && existing_channel_update.block_number >= domain_block_number
394 {
395 return Ok(());
396 }
397 }
398
399 let domain_runtime = runtime_api
400 .domain_runtime_code(*consensus_block_header.parent_hash(), src_domain_id)?
401 .ok_or(Error::MissingDomainRuntimeCode)?;
402
403 let runtime_hash = BlakeTwo256::hash(&domain_runtime);
404 let storage_key = match storage_key_cache.get(&(runtime_hash, self_chain_id, channel_id)) {
405 None => {
406 let domain_stateless_runtime =
407 StatelessRuntime::<CBlock, Block, _>::new(executor.clone(), domain_runtime.into());
408 let storage_key = StorageKey(
409 domain_stateless_runtime.channel_storage_key(self_chain_id, channel_id)?,
410 );
411 storage_key_cache.insert(
412 (runtime_hash, self_chain_id, channel_id),
413 storage_key.clone(),
414 );
415 storage_key
416 }
417 Some(key) => key.clone(),
418 };
419
420 let channel_detail = match domain_runtime_type {
421 RuntimeType::Evm => {
422 let channel = StorageProofVerifier::<HashingFor<Block>>::get_decoded_value::<
423 Channel<Balance, AccountId20>,
424 >(&domain_state_root, proof, storage_key)?;
425 ChannelDetail {
426 block_number: domain_block_number,
427 block_hash: H256::from_slice(domain_block_hash.as_ref()),
428 state_root: H256::from_slice(domain_state_root.as_ref()),
429 channel_id,
430 state: channel.state,
431 next_inbox_nonce: channel.next_inbox_nonce,
432 next_outbox_nonce: channel.next_outbox_nonce,
433 latest_response_received_message_nonce: channel
434 .latest_response_received_message_nonce,
435 }
436 }
437 RuntimeType::AutoId => {
438 let channel = StorageProofVerifier::<HashingFor<Block>>::get_decoded_value::<
439 Channel<Balance, AccountId32>,
440 >(&domain_state_root, proof, storage_key)?;
441 ChannelDetail {
442 block_number: domain_block_number,
443 block_hash: H256::from_slice(domain_block_hash.as_ref()),
444 state_root: H256::from_slice(domain_state_root.as_ref()),
445 channel_id,
446 state: channel.state,
447 next_inbox_nonce: channel.next_inbox_nonce,
448 next_outbox_nonce: channel.next_outbox_nonce,
449 latest_response_received_message_nonce: channel
450 .latest_response_received_message_nonce,
451 }
452 }
453 };
454
455 set_channel_state(
456 &**consensus_client,
457 ChainId::Domain(src_domain_id),
458 self_chain_id,
459 channel_detail,
460 )?;
461 Ok(())
462}
463
464pub fn can_allow_xdm_submission<Client, Block>(
465 client: &Arc<Client>,
466 xdm_id: XdmId,
467 maybe_submitted_block_id: Option<BlockId<Block>>,
468 current_block_id: BlockId<Block>,
469 maybe_channel_nonce: Option<ChannelNonce>,
470) -> bool
471where
472 Client: HeaderBackend<Block>,
473 Block: BlockT,
474{
475 if let Some(channel_nonce) = maybe_channel_nonce {
476 let maybe_nonces = match (
477 xdm_id,
478 channel_nonce.relay_msg_nonce,
479 channel_nonce.relay_response_msg_nonce,
480 ) {
481 (XdmId::RelayMessage((_, _, nonce)), Some(channel_nonce), _) => {
482 Some((nonce, channel_nonce))
483 }
484 (XdmId::RelayResponseMessage((_, _, nonce)), _, Some(channel_nonce)) => {
485 Some((nonce, channel_nonce))
486 }
487 _ => None,
488 };
489
490 if let Some((xdm_nonce, channel_nonce)) = maybe_nonces
491 && (xdm_nonce <= channel_nonce)
492 {
493 tracing::debug!(
494 "Stale XDM submitted: XDM Nonce: {:?}, Channel Nonce: {:?}",
495 xdm_nonce,
496 channel_nonce
497 );
498 return false;
499 }
500 }
501
502 match maybe_submitted_block_id {
503 None => true,
504 Some(submitted_block_id) => {
505 match client.hash(submitted_block_id.number).ok().flatten() {
506 None => return true,
508 Some(hash) => {
509 if hash != submitted_block_id.hash {
510 return true;
512 }
513 }
514 }
515
516 let latest_block_number = current_block_id.number;
517 let block_limit: NumberFor<Block> = XDM_ACCEPT_BLOCK_LIMIT.saturated_into();
518 submitted_block_id.number < latest_block_number.saturating_sub(block_limit)
519 }
520 }
521}
522
523async fn handle_xdm_message<TxPool, Client, CBlock>(
524 client: &Arc<Client>,
525 tx_pool: &Arc<TxPool>,
526 chain_id: ChainId,
527 ext: ExtrinsicOf<TxPool>,
528) -> Result<bool, Error>
529where
530 TxPool: TransactionPool + 'static,
531 CBlock: BlockT,
532 Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>> + AuxStore,
533 Client::Api: MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>,
534{
535 let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
536 let runtime_api = client.runtime_api();
537 let api_version = runtime_api
538 .api_version::<dyn MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>>(
539 block_id.hash,
540 )?
541 .unwrap_or(1);
542
543 let api_available = api_version >= 2;
544 if api_available {
545 let xdm_id = match runtime_api.xdm_id(block_id.hash, &ext)? {
546 None => return Ok(false),
548 Some(xdm_id) => xdm_id,
549 };
550
551 let (src_chain_id, channel_id) = xdm_id.get_chain_id_and_channel_id();
552 let maybe_channel_nonce =
553 runtime_api.channel_nonce(block_id.hash, src_chain_id, channel_id)?;
554
555 let maybe_submitted_xdm_block = get_xdm_processed_block_number::<_, BlockOf<TxPool>>(
556 &**client,
557 TX_POOL_PREFIX,
558 xdm_id,
559 )?;
560 if !can_allow_xdm_submission(
561 client,
562 xdm_id,
563 maybe_submitted_xdm_block,
564 block_id.clone(),
565 maybe_channel_nonce,
566 ) {
567 tracing::debug!("Skipping XDM[{:?}] submission. At: {:?}", xdm_id, block_id);
568 return Ok(true);
569 }
570
571 tracing::debug!(
572 "Submitting XDM[{:?}] to tx pool for chain {:?} at block: {:?}",
573 xdm_id,
574 chain_id,
575 block_id
576 );
577
578 let tx_pool_res = tx_pool
579 .submit_one(block_id.hash, TransactionSource::External, ext)
580 .await;
581
582 let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
583 if let Err(err) = tx_pool_res {
584 match err.into_pool_error() {
585 Ok(err) => match err {
586 PoolError::TooLowPriority { .. }
587 | PoolError::AlreadyImported(..)
588 | PoolError::TemporarilyBanned => {
589 tracing::debug!(
590 "XDM[{:?}] to tx pool for Chain {:?} at block: {:?}: Already included",
591 xdm_id,
592 chain_id,
593 block_id
594 );
595 set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
596 }
597 _ => {
598 tracing::error!(
599 "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}",
600 xdm_id,
601 chain_id,
602 err,
603 block_id
604 );
605 }
606 },
607 Err(err) => {
608 tracing::error!(
609 "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}",
610 xdm_id,
611 chain_id,
612 err,
613 block_id
614 );
615 }
616 }
617 } else {
618 tracing::debug!(
619 "Submitted XDM[{:?}] to tx pool for chain {:?} at {:?}",
620 xdm_id,
621 chain_id,
622 block_id
623 );
624
625 set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
626 }
627
628 if let Some(channel_nonce) = maybe_channel_nonce {
629 cleanup_chain_channel_storages(
630 &**client,
631 TX_POOL_PREFIX,
632 src_chain_id,
633 channel_id,
634 channel_nonce,
635 )?;
636 }
637
638 Ok(true)
639 } else {
640 let tx_pool_res = tx_pool
641 .submit_one(block_id.hash, TransactionSource::External, ext)
642 .await;
643
644 let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
645 if let Err(err) = tx_pool_res {
646 tracing::error!(
647 "Failed to submit XDM to tx pool for Chain {:?} with error: {:?} at block: {:?}",
648 chain_id,
649 err,
650 block_id
651 );
652 } else {
653 tracing::debug!(
654 "Submitted XDM to tx pool for chain {:?} at {:?}",
655 chain_id,
656 block_id
657 );
658 }
659
660 Ok(true)
661 }
662}