1use crate::aux_schema::{
2 cleanup_chain_channel_storages, get_channel_state, get_xdm_processed_block_number,
3 set_channel_state, set_xdm_message_processed_at, BlockId,
4};
5use crate::gossip_worker::{ChannelUpdate, MessageData};
6use crate::{ChainMsg, ChannelDetail};
7use domain_block_preprocessor::stateless_runtime::StatelessRuntime;
8use fp_account::AccountId20;
9use futures::{Stream, StreamExt};
10use sc_client_api::AuxStore;
11use sc_executor::RuntimeVersionOf;
12use sc_network::NetworkPeers;
13use sc_transaction_pool_api::error::{Error as PoolError, IntoPoolError};
14use sc_transaction_pool_api::{TransactionPool, TransactionSource};
15use sp_api::{ApiError, ApiExt, ProvideRuntimeApi, StorageProof};
16use sp_blockchain::HeaderBackend;
17use sp_consensus::SyncOracle;
18use sp_core::crypto::AccountId32;
19use sp_core::storage::StorageKey;
20use sp_core::traits::CodeExecutor;
21use sp_core::{Hasher, H256};
22use sp_domains::proof_provider_and_verifier::{StorageProofVerifier, VerificationError};
23use sp_domains::{DomainId, DomainsApi, RuntimeType};
24use sp_messenger::messages::{ChainId, Channel, ChannelId};
25use sp_messenger::{ChannelNonce, MessengerApi, RelayerApi, XdmId};
26use sp_runtime::codec::Decode;
27use sp_runtime::traits::{BlakeTwo256, Block as BlockT, HashingFor, Header, NumberFor};
28use sp_runtime::{SaturatedConversion, Saturating};
29use std::collections::BTreeMap;
30use std::sync::Arc;
31use subspace_runtime_primitives::{Balance, BlockNumber};
32use thiserror::Error;
33
34pub(crate) const LOG_TARGET: &str = "domain_message_listener";
35const TX_POOL_PREFIX: &[u8] = b"xdm_tx_pool_listener";
36pub const RELAYER_PREFIX: &[u8] = b"xdm_relayer";
37
38const XDM_ACCEPT_BLOCK_LIMIT: u32 = 15;
40
41type BlockOf<T> = <T as TransactionPool>::Block;
42type HeaderOf<T> = <<T as TransactionPool>::Block as BlockT>::Header;
43type ExtrinsicOf<T> = <<T as TransactionPool>::Block as BlockT>::Extrinsic;
44
45#[derive(Debug, Error)]
46pub enum Error {
47 #[error("Blockchain error: {0}")]
49 Blockchain(Box<sp_blockchain::Error>),
50 #[error("Api error: {0}")]
52 Api(sp_api::ApiError),
53 #[error("Missing block hash")]
55 MissingBlockHash,
56 #[error("Missing block header")]
58 MissingBlockHeader,
59 #[error("Missing domain runtime code")]
61 MissingDomainRuntimeCode,
62 #[error("Missing domain receipt hash")]
64 MissingDomainReceiptHash,
65 #[error("Bad domain receipt hash")]
67 BadDomainReceiptHash,
68 #[error("Missing domain receipt")]
70 MissingDomainReceipt,
71 #[error("Proof error: {0}")]
73 Proof(VerificationError),
74}
75
76impl From<sp_api::ApiError> for Error {
77 fn from(value: ApiError) -> Self {
78 Error::Api(value)
79 }
80}
81
82impl From<sp_blockchain::Error> for Error {
83 fn from(value: sp_blockchain::Error) -> Self {
84 Error::Blockchain(Box::new(value))
85 }
86}
87
88impl From<VerificationError> for Error {
89 fn from(value: VerificationError) -> Self {
90 Error::Proof(value)
91 }
92}
93
94#[allow(clippy::too_many_arguments)]
95pub async fn start_cross_chain_message_listener<
96 Client,
97 TxPool,
98 TxnListener,
99 CClient,
100 CBlock,
101 Executor,
102 SO,
103>(
104 chain_id: ChainId,
105 consensus_client: Arc<CClient>,
106 client: Arc<Client>,
107 tx_pool: Arc<TxPool>,
108 network: Arc<dyn NetworkPeers + Send + Sync>,
109 mut listener: TxnListener,
110 domain_executor: Arc<Executor>,
111 sync_oracle: SO,
112) where
113 TxPool: TransactionPool + 'static,
114 Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>> + AuxStore,
115 CBlock: BlockT,
116 Client::Api: MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>,
117 CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
118 CClient::Api: DomainsApi<CBlock, HeaderOf<TxPool>>
119 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
120 TxnListener: Stream<Item = ChainMsg> + Unpin,
121 Executor: CodeExecutor + RuntimeVersionOf,
122 SO: SyncOracle + Send,
123{
124 tracing::info!(
125 target: LOG_TARGET,
126 "Starting transaction listener for Chain: {:?}",
127 chain_id
128 );
129
130 let mut domain_storage_key_cache = BTreeMap::<(H256, ChainId, ChannelId), StorageKey>::new();
131
132 while let Some(msg) = listener.next().await {
133 if sync_oracle.is_major_syncing() {
135 continue;
136 }
137
138 tracing::debug!(
139 target: LOG_TARGET,
140 "Message received for Chain: {:?}",
141 chain_id,
142 );
143
144 match msg.data {
145 MessageData::Xdm(encoded_data) => {
146 let ext = match ExtrinsicOf::<TxPool>::decode(&mut encoded_data.as_ref()) {
147 Ok(ext) => ext,
148 Err(err) => {
149 tracing::error!(
150 target: LOG_TARGET,
151 "Failed to decode message: {:?} with error: {:?}",
152 encoded_data,
153 err
154 );
155 if let Some(peer_id) = msg.maybe_peer {
156 network.report_peer(
157 peer_id,
158 crate::gossip_worker::rep::GOSSIP_NOT_DECODABLE,
159 );
160 }
161 continue;
162 }
163 };
164
165 if let Ok(valid) =
166 handle_xdm_message::<_, _, CBlock>(&client, &tx_pool, chain_id, ext).await
167 && !valid
168 {
169 if let Some(peer_id) = msg.maybe_peer {
170 network.report_peer(peer_id, crate::gossip_worker::rep::NOT_XDM);
171 }
172 continue;
173 }
174 }
175 MessageData::ChannelUpdate(channel_update) => {
176 handle_channel_update::<_, _, _, BlockOf<TxPool>>(
177 chain_id,
178 channel_update,
179 &consensus_client,
180 domain_executor.clone(),
181 &mut domain_storage_key_cache,
182 )
183 }
184 }
185 }
186}
187
188fn handle_channel_update<CClient, CBlock, Executor, Block>(
189 chain_id: ChainId,
190 channel_update: ChannelUpdate,
191 consensus_client: &Arc<CClient>,
192 executor: Arc<Executor>,
193 domain_storage_key_cache: &mut BTreeMap<(H256, ChainId, ChannelId), StorageKey>,
194) where
195 CBlock: BlockT,
196 Block: BlockT,
197 CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
198 CClient::Api: DomainsApi<CBlock, Block::Header>
199 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
200 Executor: CodeExecutor + RuntimeVersionOf,
201{
202 let ChannelUpdate {
203 src_chain_id,
204 channel_id,
205 block_number,
206 storage_proof,
207 } = channel_update;
208
209 match src_chain_id {
210 ChainId::Consensus => {
211 if let Err(err) = handle_consensus_channel_update(
212 chain_id,
213 channel_id,
214 consensus_client,
215 block_number,
216 storage_proof,
217 ) {
218 tracing::debug!(
219 target: LOG_TARGET,
220 "Failed to update channel update from {:?} to {:?}: {:?}",
221 ChainId::Consensus,
222 chain_id,
223 err
224 );
225 } else {
226 tracing::debug!(
227 target: LOG_TARGET,
228 "Updated channel state from {:?} to {:?}: {:?}",
229 ChainId::Consensus,
230 chain_id,
231 channel_id
232 );
233 }
234 }
235 ChainId::Domain(domain_id) => {
236 if let Err(err) = handle_domain_channel_update::<_, _, _, Block>(
237 domain_id,
238 chain_id,
239 channel_id,
240 consensus_client,
241 block_number,
242 storage_proof,
243 executor,
244 domain_storage_key_cache,
245 ) {
246 tracing::debug!(
247 target: LOG_TARGET,
248 "Failed to update channel update from {:?} to {:?}: {:?}",
249 ChainId::Domain(domain_id),
250 chain_id,
251 err
252 );
253 } else {
254 tracing::debug!(
255 target: LOG_TARGET,
256 "Updated channel state from {:?} to {:?}: {:?}",
257 ChainId::Domain(domain_id),
258 chain_id,
259 channel_id
260 );
261 }
262 }
263 };
264}
265
266fn handle_consensus_channel_update<CClient, CBlock>(
267 self_chain_id: ChainId,
268 channel_id: ChannelId,
269 consensus_client: &Arc<CClient>,
270 consensus_block_number: BlockNumber,
271 proof: StorageProof,
272) -> Result<(), Error>
273where
274 CBlock: BlockT,
275 CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
276 CClient::Api: RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
277{
278 let consensus_block_hash = consensus_client
280 .hash(consensus_block_number.into())?
281 .ok_or(Error::MissingBlockHash)?;
282
283 let maybe_existing_channel_detail = get_channel_state(
284 &**consensus_client,
285 ChainId::Consensus,
286 self_chain_id,
287 channel_id,
288 )?;
289
290 let api = consensus_client.runtime_api();
291 let best_hash = consensus_client.info().best_hash;
292 let header = consensus_client.expect_header(consensus_block_hash)?;
293
294 if let Some(existing_channel_update) = maybe_existing_channel_detail {
298 let maybe_block_hash =
299 consensus_client.hash(existing_channel_update.block_number.into())?;
300 if let Some(block_hash) = maybe_block_hash {
301 if block_hash.as_ref() == existing_channel_update.block_hash.as_ref()
302 && header.state_root().as_ref() == existing_channel_update.state_root.as_ref()
303 && existing_channel_update.block_number >= consensus_block_number
304 {
305 return Ok(());
306 }
307 }
308 }
309
310 let storage_key = StorageKey(api.channel_storage_key(best_hash, self_chain_id, channel_id)?);
311 let channel = StorageProofVerifier::<HashingFor<CBlock>>::get_decoded_value::<
312 Channel<Balance, AccountId32>,
313 >(header.state_root(), proof, storage_key)?;
314
315 let channel_detail = ChannelDetail {
316 block_number: consensus_block_number,
317 block_hash: H256::from_slice(consensus_block_hash.as_ref()),
318 state_root: H256::from_slice(header.state_root().as_ref()),
319 channel_id,
320 state: channel.state,
321 next_inbox_nonce: channel.next_inbox_nonce,
322 next_outbox_nonce: channel.next_outbox_nonce,
323 latest_response_received_message_nonce: channel.latest_response_received_message_nonce,
324 };
325
326 set_channel_state(
327 &**consensus_client,
328 ChainId::Consensus,
329 self_chain_id,
330 channel_detail,
331 )?;
332 Ok(())
333}
334
335#[allow(clippy::too_many_arguments)]
336fn handle_domain_channel_update<CClient, CBlock, Executor, Block>(
337 src_domain_id: DomainId,
338 self_chain_id: ChainId,
339 channel_id: ChannelId,
340 consensus_client: &Arc<CClient>,
341 domain_block_number: BlockNumber,
342 proof: StorageProof,
343 executor: Arc<Executor>,
344 storage_key_cache: &mut BTreeMap<(H256, ChainId, ChannelId), StorageKey>,
345) -> Result<(), Error>
346where
347 CBlock: BlockT,
348 Block: BlockT,
349 CClient: ProvideRuntimeApi<CBlock> + HeaderBackend<CBlock> + AuxStore,
350 CClient::Api: DomainsApi<CBlock, Block::Header>,
351 Executor: CodeExecutor + RuntimeVersionOf,
352{
353 let runtime_api = consensus_client.runtime_api();
354 let consensus_best_hash = consensus_client.info().best_hash;
355 let consensus_block_header = consensus_client
356 .header(consensus_best_hash)?
357 .ok_or(Error::MissingBlockHeader)?;
358
359 let domain_runtime_type = runtime_api
360 .domain_instance_data(*consensus_block_header.parent_hash(), src_domain_id)?
361 .ok_or(Error::MissingDomainRuntimeCode)?
362 .0
363 .runtime_type;
364
365 let is_valid_domain_block_number =
366 |block_number: BlockNumber| -> Result<(Block::Hash, Block::Hash), Error> {
367 let runtime_api = consensus_client.runtime_api();
368 let receipt_hash = runtime_api
369 .receipt_hash(consensus_best_hash, src_domain_id, block_number.into())?
370 .ok_or(Error::MissingDomainReceiptHash)?;
371
372 if runtime_api.is_bad_er_pending_to_prune(
374 consensus_best_hash,
375 src_domain_id,
376 receipt_hash,
377 )? {
378 return Err(Error::BadDomainReceiptHash);
379 }
380
381 let receipt = runtime_api
382 .execution_receipt(consensus_best_hash, receipt_hash)?
383 .ok_or(Error::MissingDomainReceipt)?;
384
385 Ok((receipt.domain_block_hash, receipt.final_state_root))
386 };
387
388 let (domain_block_hash, domain_state_root) = is_valid_domain_block_number(domain_block_number)?;
390
391 let maybe_existing_channel_detail = get_channel_state(
395 &**consensus_client,
396 ChainId::Domain(src_domain_id),
397 self_chain_id,
398 channel_id,
399 )?;
400
401 if let Some(existing_channel_update) = maybe_existing_channel_detail {
402 if let Ok((existing_block_hash, _)) =
406 is_valid_domain_block_number(existing_channel_update.block_number)
407 {
408 if existing_block_hash.as_ref() == existing_channel_update.block_hash.as_ref()
409 && domain_state_root.as_ref() == existing_channel_update.state_root.as_ref()
410 && existing_channel_update.block_number >= domain_block_number
411 {
412 return Ok(());
413 }
414 }
415 }
416
417 let domain_runtime = runtime_api
418 .domain_runtime_code(*consensus_block_header.parent_hash(), src_domain_id)?
419 .ok_or(Error::MissingDomainRuntimeCode)?;
420
421 let runtime_hash = BlakeTwo256::hash(&domain_runtime);
422 let storage_key = match storage_key_cache.get(&(runtime_hash, self_chain_id, channel_id)) {
423 None => {
424 let domain_stateless_runtime =
425 StatelessRuntime::<CBlock, Block, _>::new(executor.clone(), domain_runtime.into());
426 let storage_key = StorageKey(
427 domain_stateless_runtime.channel_storage_key(self_chain_id, channel_id)?,
428 );
429 storage_key_cache.insert(
430 (runtime_hash, self_chain_id, channel_id),
431 storage_key.clone(),
432 );
433 storage_key
434 }
435 Some(key) => key.clone(),
436 };
437
438 let channel_detail = match domain_runtime_type {
439 RuntimeType::Evm => {
440 let channel = StorageProofVerifier::<HashingFor<Block>>::get_decoded_value::<
441 Channel<Balance, AccountId20>,
442 >(&domain_state_root, proof, storage_key)?;
443 ChannelDetail {
444 block_number: domain_block_number,
445 block_hash: H256::from_slice(domain_block_hash.as_ref()),
446 state_root: H256::from_slice(domain_state_root.as_ref()),
447 channel_id,
448 state: channel.state,
449 next_inbox_nonce: channel.next_inbox_nonce,
450 next_outbox_nonce: channel.next_outbox_nonce,
451 latest_response_received_message_nonce: channel
452 .latest_response_received_message_nonce,
453 }
454 }
455 RuntimeType::AutoId => {
456 let channel = StorageProofVerifier::<HashingFor<Block>>::get_decoded_value::<
457 Channel<Balance, AccountId32>,
458 >(&domain_state_root, proof, storage_key)?;
459 ChannelDetail {
460 block_number: domain_block_number,
461 block_hash: H256::from_slice(domain_block_hash.as_ref()),
462 state_root: H256::from_slice(domain_state_root.as_ref()),
463 channel_id,
464 state: channel.state,
465 next_inbox_nonce: channel.next_inbox_nonce,
466 next_outbox_nonce: channel.next_outbox_nonce,
467 latest_response_received_message_nonce: channel
468 .latest_response_received_message_nonce,
469 }
470 }
471 };
472
473 set_channel_state(
474 &**consensus_client,
475 ChainId::Domain(src_domain_id),
476 self_chain_id,
477 channel_detail,
478 )?;
479 Ok(())
480}
481
482pub fn can_allow_xdm_submission<Client, Block>(
483 client: &Arc<Client>,
484 xdm_id: XdmId,
485 maybe_submitted_block_id: Option<BlockId<Block>>,
486 current_block_id: BlockId<Block>,
487 maybe_channel_nonce: Option<ChannelNonce>,
488) -> bool
489where
490 Client: HeaderBackend<Block>,
491 Block: BlockT,
492{
493 if let Some(channel_nonce) = maybe_channel_nonce {
494 let maybe_nonces = match (
495 xdm_id,
496 channel_nonce.relay_msg_nonce,
497 channel_nonce.relay_response_msg_nonce,
498 ) {
499 (XdmId::RelayMessage((_, _, nonce)), Some(channel_nonce), _) => {
500 Some((nonce, channel_nonce))
501 }
502 (XdmId::RelayResponseMessage((_, _, nonce)), _, Some(channel_nonce)) => {
503 Some((nonce, channel_nonce))
504 }
505 _ => None,
506 };
507
508 if let Some((xdm_nonce, channel_nonce)) = maybe_nonces
509 && (xdm_nonce <= channel_nonce)
510 {
511 tracing::debug!(
512 target: LOG_TARGET,
513 "Stale XDM submitted: XDM Nonce: {:?}, Channel Nonce: {:?}",
514 xdm_nonce,
515 channel_nonce
516 );
517 return false;
518 }
519 }
520
521 match maybe_submitted_block_id {
522 None => true,
523 Some(submitted_block_id) => {
524 match client.hash(submitted_block_id.number).ok().flatten() {
525 None => return true,
527 Some(hash) => {
528 if hash != submitted_block_id.hash {
529 return true;
531 }
532 }
533 }
534
535 let latest_block_number = current_block_id.number;
536 let block_limit: NumberFor<Block> = XDM_ACCEPT_BLOCK_LIMIT.saturated_into();
537 submitted_block_id.number < latest_block_number.saturating_sub(block_limit)
538 }
539 }
540}
541
542async fn handle_xdm_message<TxPool, Client, CBlock>(
543 client: &Arc<Client>,
544 tx_pool: &Arc<TxPool>,
545 chain_id: ChainId,
546 ext: ExtrinsicOf<TxPool>,
547) -> Result<bool, Error>
548where
549 TxPool: TransactionPool + 'static,
550 CBlock: BlockT,
551 Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>> + AuxStore,
552 Client::Api: MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>,
553{
554 let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
555 let runtime_api = client.runtime_api();
556 let api_version = runtime_api
557 .api_version::<dyn MessengerApi<BlockOf<TxPool>, NumberFor<CBlock>, CBlock::Hash>>(
558 block_id.hash,
559 )?
560 .unwrap_or(1);
561
562 let api_available = api_version >= 2;
563 if api_available {
564 let xdm_id = match runtime_api.xdm_id(block_id.hash, &ext)? {
565 None => return Ok(false),
567 Some(xdm_id) => xdm_id,
568 };
569
570 let (src_chain_id, channel_id) = xdm_id.get_chain_id_and_channel_id();
571 let maybe_channel_nonce =
572 runtime_api.channel_nonce(block_id.hash, src_chain_id, channel_id)?;
573
574 let maybe_submitted_xdm_block = get_xdm_processed_block_number::<_, BlockOf<TxPool>>(
575 &**client,
576 TX_POOL_PREFIX,
577 xdm_id,
578 )?;
579 if !can_allow_xdm_submission(
580 client,
581 xdm_id,
582 maybe_submitted_xdm_block,
583 block_id.clone(),
584 maybe_channel_nonce,
585 ) {
586 tracing::debug!(
587 target: LOG_TARGET,
588 "Skipping XDM[{:?}] submission. At: {:?}",
589 xdm_id,
590 block_id
591 );
592 return Ok(true);
593 }
594
595 tracing::debug!(
596 target: LOG_TARGET,
597 "Submitting XDM[{:?}] to tx pool for chain {:?} at block: {:?}",
598 xdm_id,
599 chain_id,
600 block_id
601 );
602
603 let tx_pool_res = tx_pool
604 .submit_one(block_id.hash, TransactionSource::External, ext)
605 .await;
606
607 let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
608 if let Err(err) = tx_pool_res {
609 match err.into_pool_error() {
610 Ok(err) => match err {
611 PoolError::TooLowPriority { .. }
612 | PoolError::AlreadyImported(..)
613 | PoolError::TemporarilyBanned => {
614 tracing::debug!(
615 target: LOG_TARGET,
616 "XDM[{:?}] to tx pool for Chain {:?} at block: {:?}: Already included",
617 xdm_id,
618 chain_id,
619 block_id
620 );
621 set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
622 }
623 _ => {
624 tracing::error!(
625 target: LOG_TARGET,
626 "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}",
627 xdm_id,
628 chain_id,
629 err,
630 block_id
631 );
632 }
633 },
634 Err(err) => {
635 tracing::error!(
636 target: LOG_TARGET,
637 "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}",
638 xdm_id,
639 chain_id,
640 err,
641 block_id
642 );
643 }
644 }
645 } else {
646 tracing::debug!(
647 target: LOG_TARGET,
648 "Submitted XDM[{:?}] to tx pool for chain {:?} at {:?}",
649 xdm_id,
650 chain_id,
651 block_id
652 );
653
654 set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
655 }
656
657 if let Some(channel_nonce) = maybe_channel_nonce {
658 cleanup_chain_channel_storages(
659 &**client,
660 TX_POOL_PREFIX,
661 src_chain_id,
662 channel_id,
663 channel_nonce,
664 )?;
665 }
666
667 Ok(true)
668 } else {
669 let tx_pool_res = tx_pool
670 .submit_one(block_id.hash, TransactionSource::External, ext)
671 .await;
672
673 let block_id: BlockId<BlockOf<TxPool>> = client.info().into();
674 if let Err(err) = tx_pool_res {
675 tracing::error!(
676 target: LOG_TARGET,
677 "Failed to submit XDM to tx pool for Chain {:?} with error: {:?} at block: {:?}",
678 chain_id,
679 err,
680 block_id
681 );
682 } else {
683 tracing::debug!(
684 target: LOG_TARGET,
685 "Submitted XDM to tx pool for chain {:?} at {:?}",
686 chain_id,
687 block_id
688 );
689 }
690
691 Ok(true)
692 }
693}