1#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34 BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41 NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44 DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45 OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, RpcEndpoint,
46 WasmExecutionMethod, WasmtimeInstantiationStrategy,
47};
48use sc_service::{BasePath, BlocksPruning, Configuration, Role, SpawnTasksParams, TaskManager};
49use sc_transaction_pool::{BasicPool, FullChainApi, Options};
50use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
51use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
52use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
53use sp_api::{ApiExt, ProvideRuntimeApi};
54use sp_blockchain::{HashAndNumber, HeaderBackend};
55use sp_consensus::{BlockOrigin, Error as ConsensusError};
56use sp_consensus_slots::Slot;
57use sp_consensus_subspace::digests::{
58 CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
59};
60use sp_consensus_subspace::{PotExtension, SubspaceApi};
61use sp_core::H256;
62use sp_core::offchain::OffchainDbExt;
63use sp_core::offchain::storage::OffchainDb;
64use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
65use sp_domains::bundle::OpaqueBundle;
66use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
67use sp_domains_fraud_proof::fraud_proof::FraudProof;
68use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
69use sp_externalities::Extensions;
70use sp_inherents::{InherentData, InherentDataProvider};
71use sp_keyring::Sr25519Keyring;
72use sp_messenger::MessengerApi;
73use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
74use sp_mmr_primitives::MmrApi;
75use sp_runtime::generic::{Digest, SignedPayload};
76use sp_runtime::traits::{
77 BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
78};
79use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
80use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
81use sp_timestamp::Timestamp;
82use std::collections::HashMap;
83use std::error::Error;
84use std::marker::PhantomData;
85use std::net::SocketAddr;
86use std::pin::Pin;
87use std::sync::Arc;
88use std::time;
89use std::time::Duration;
90use subspace_core_primitives::pot::PotOutput;
91use subspace_core_primitives::solutions::Solution;
92use subspace_core_primitives::{BlockNumber, PublicKey};
93use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
94use subspace_runtime_primitives::opaque::Block;
95use subspace_runtime_primitives::{
96 AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
97};
98use subspace_service::{FullSelectChain, RuntimeExecutor};
99use subspace_test_client::{Backend, Client, chain_spec};
100use subspace_test_primitives::OnchainStateApi;
101use subspace_test_runtime::{
102 Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
103};
104use substrate_frame_rpc_system::AccountNonceApi;
105use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
106use tokio::time::sleep;
107
108pub type FraudProofFor<Block, DomainBlock> =
110 FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
111
112const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
113
114#[expect(clippy::too_many_arguments)]
119pub fn node_config(
120 tokio_handle: tokio::runtime::Handle,
121 key: Sr25519Keyring,
122 boot_nodes: Vec<MultiaddrWithPeerId>,
123 run_farmer: bool,
124 force_authoring: bool,
125 force_synced: bool,
126 private_evm: bool,
127 evm_owner_account: Option<AccountId>,
128 base_path: BasePath,
129 rpc_addr: Option<SocketAddr>,
130 rpc_port: Option<u16>,
131) -> Configuration {
132 let root = base_path.path();
133 let role = if run_farmer {
134 Role::Authority
135 } else {
136 Role::Full
137 };
138 let key_seed = key.to_seed();
139 let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
140
141 let mut network_config = NetworkConfiguration::new(
142 key_seed.to_string(),
143 "network/test/0.1",
144 Default::default(),
145 None,
146 );
147
148 network_config.boot_nodes = boot_nodes;
149
150 network_config.allow_non_globals_in_dht = true;
151
152 let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
153 network_config.listen_addresses.push(addr.clone());
154
155 network_config.public_addresses.push(addr);
156
157 network_config.transport = TransportConfig::MemoryOnly;
158
159 network_config.force_synced = force_synced;
160
161 let rpc_configuration = match rpc_addr {
162 Some(listen_addr) => {
163 let port = rpc_port.unwrap_or(9944);
164 RpcConfiguration {
165 addr: Some(vec![RpcEndpoint {
166 batch_config: RpcBatchRequestConfig::Disabled,
167 max_connections: 100,
168 listen_addr,
169 rpc_methods: Default::default(),
170 rate_limit: None,
171 rate_limit_trust_proxy_headers: false,
172 rate_limit_whitelisted_ips: vec![],
173 max_payload_in_mb: 15,
174 max_payload_out_mb: 15,
175 max_subscriptions_per_connection: 100,
176 max_buffer_capacity_per_connection: 100,
177 cors: None,
178 retry_random_port: true,
179 is_optional: false,
180 }]),
181 max_request_size: 15,
182 max_response_size: 15,
183 id_provider: None,
184 max_subs_per_conn: 1024,
185 port,
186 message_buffer_capacity: 1024,
187 batch_config: RpcBatchRequestConfig::Disabled,
188 max_connections: 1000,
189 cors: None,
190 methods: Default::default(),
191 rate_limit: None,
192 rate_limit_whitelisted_ips: vec![],
193 rate_limit_trust_proxy_headers: false,
194 request_logger_limit: 1024,
195 }
196 }
197 None => RpcConfiguration {
198 addr: None,
199 max_request_size: 0,
200 max_response_size: 0,
201 id_provider: None,
202 max_subs_per_conn: 0,
203 port: 0,
204 message_buffer_capacity: 0,
205 batch_config: RpcBatchRequestConfig::Disabled,
206 max_connections: 0,
207 cors: None,
208 methods: Default::default(),
209 rate_limit: None,
210 rate_limit_whitelisted_ips: vec![],
211 rate_limit_trust_proxy_headers: false,
212 request_logger_limit: 1024,
213 },
214 };
215
216 Configuration {
217 impl_name: "subspace-test-node".to_string(),
218 impl_version: "0.1".to_string(),
219 role,
220 tokio_handle,
221 transaction_pool: Default::default(),
222 network: network_config,
223 keystore: KeystoreConfig::InMemory,
224 database: DatabaseSource::ParityDb {
225 path: root.join("paritydb"),
226 },
227 trie_cache_maximum_size: Some(64 * 1024 * 1024),
228 state_pruning: Default::default(),
229 blocks_pruning: BlocksPruning::KeepAll,
230 chain_spec: Box::new(spec),
231 executor: ExecutorConfiguration {
232 wasm_method: WasmExecutionMethod::Compiled {
233 instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
234 },
235 max_runtime_instances: 8,
236 default_heap_pages: None,
237 runtime_cache_size: 2,
238 },
239 wasm_runtime_overrides: Default::default(),
240 rpc: rpc_configuration,
241 prometheus_config: None,
242 telemetry_endpoints: None,
243 offchain_worker: OffchainWorkerConfig {
244 enabled: false,
245 indexing_enabled: true,
246 },
247 force_authoring,
248 disable_grandpa: false,
249 dev_key_seed: Some(key_seed),
250 tracing_targets: None,
251 tracing_receiver: Default::default(),
252 announce_block: true,
253 data_path: base_path.path().into(),
254 base_path,
255 warm_up_trie_cache: None,
256 }
257}
258
259type StorageChanges = sp_api::StorageChanges<Block>;
260
261struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
262 consensus_client: Arc<Client>,
263 consensus_backend: Arc<CBackend>,
264 executor: Arc<Executor>,
265 mock_pot_verifier: Arc<MockPotVerfier>,
266 confirmation_depth_k: BlockNumber,
267 _phantom: PhantomData<DomainBlock>,
268}
269
270impl<Client, DomainBlock, Executor, CBackend>
271 MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
272{
273 fn new(
274 consensus_client: Arc<Client>,
275 executor: Arc<Executor>,
276 mock_pot_verifier: Arc<MockPotVerfier>,
277 consensus_backend: Arc<CBackend>,
278 confirmation_depth_k: BlockNumber,
279 ) -> Self {
280 Self {
281 consensus_client,
282 consensus_backend,
283 executor,
284 mock_pot_verifier,
285 confirmation_depth_k,
286 _phantom: Default::default(),
287 }
288 }
289}
290
291#[derive(Default)]
292struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
293
294impl MockPotVerfier {
295 fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
296 self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
297 }
298
299 fn inject_pot(&self, slot: u64, pot: PotOutput) {
300 self.0.lock().insert(slot, pot);
301 }
302}
303
304impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
305 for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
306where
307 Block: BlockT,
308 Block::Hash: From<H256> + Into<H256>,
309 DomainBlock: BlockT,
310 DomainBlock::Hash: Into<H256> + From<H256>,
311 Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
312 Client::Api: DomainsApi<Block, DomainBlock::Header>
313 + BundleProducerElectionApi<Block, Balance>
314 + MessengerApi<Block, NumberFor<Block>, Block::Hash>
315 + MmrApi<Block, H256, NumberFor<Block>>,
316 Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
317 CBackend: BackendT<Block> + 'static,
318{
319 fn extensions_for(
320 &self,
321 _block_hash: Block::Hash,
322 _block_number: NumberFor<Block>,
323 ) -> Extensions {
324 let confirmation_depth_k = self.confirmation_depth_k;
325 let mut exts = Extensions::new();
326 exts.register(FraudProofExtension::new(Arc::new(
327 FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
328 self.consensus_client.clone(),
329 self.executor.clone(),
330 move |client, executor| {
331 let extension_factory =
332 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
333 client,
334 executor,
335 confirmation_depth_k,
336 );
337 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
338 },
339 ),
340 )));
341 exts.register(SubspaceMmrExtension::new(Arc::new(
342 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
343 self.consensus_client.clone(),
344 confirmation_depth_k,
345 ),
346 )));
347 exts.register(MessengerExtension::new(Arc::new(
348 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
349 self.consensus_client.clone(),
350 self.executor.clone(),
351 ),
352 )));
353
354 if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
355 let offchain_db = OffchainDb::new(offchain_storage);
356 exts.register(OffchainDbExt::new(offchain_db));
357 }
358 exts.register(PotExtension::new({
359 let client = Arc::clone(&self.consensus_client);
360 let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
361 Box::new(
362 move |parent_hash, slot, proof_of_time, _quick_verification| {
363 let parent_hash = {
364 let mut converted_parent_hash = Block::Hash::default();
365 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
366 converted_parent_hash
367 };
368
369 let parent_header = match client.header(parent_hash) {
370 Ok(Some(parent_header)) => parent_header,
371 _ => return false,
372 };
373 let parent_pre_digest = match extract_pre_digest(&parent_header) {
374 Ok(parent_pre_digest) => parent_pre_digest,
375 _ => return false,
376 };
377
378 let parent_slot = parent_pre_digest.slot();
379 if slot <= *parent_slot {
380 return false;
381 }
382
383 mock_pot_verifier.is_valid(slot, proof_of_time)
384 },
385 )
386 }));
387 exts
388 }
389}
390
391type NewSlot = (Slot, PotOutput);
392
393pub struct MockConsensusNode {
395 pub task_manager: TaskManager,
397 pub client: Arc<Client>,
399 pub backend: Arc<Backend>,
401 pub executor: RuntimeExecutor,
403 pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
405 pub select_chain: FullSelectChain,
407 pub network_service: Arc<dyn NetworkService + Send + Sync>,
409 pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
411 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
413 pub rpc_handlers: sc_service::RpcHandlers,
415 next_slot: u64,
417 mock_pot_verifier: Arc<MockPotVerfier>,
419 new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
421 acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
423 block_import: MockBlockImport<Client, Block>,
425 xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
426 mock_solution: Solution<AccountId>,
428 log_prefix: &'static str,
429 pub key: Sr25519Keyring,
431 finalize_block_depth: Option<NumberFor<Block>>,
432 base_path: BasePath,
434}
435
436pub struct MockConsensusNodeRpcConfig {
438 pub base_path: BasePath,
440 pub finalize_block_depth: Option<NumberFor<Block>>,
442 pub private_evm: bool,
444 pub evm_owner: Option<Sr25519Keyring>,
446 pub rpc_addr: Option<SocketAddr>,
448 pub rpc_port: Option<u16>,
450}
451
452impl MockConsensusNode {
453 #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
454 fn run_with_configuration(
455 mut config: Configuration,
456 key: Sr25519Keyring,
457 base_path: BasePath,
458 finalize_block_depth: Option<NumberFor<Block>>,
459 rpc_builder: Box<
460 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
461 >,
462 ) -> MockConsensusNode {
463 let log_prefix = key.into();
464
465 let tx_pool_options = Options {
466 ban_time: Duration::from_millis(0),
467 ..Default::default()
468 };
469
470 config.network.node_name = format!("{} (Consensus)", config.network.node_name);
471 let span = sc_tracing::tracing::info_span!(
472 sc_tracing::logging::PREFIX_LOG_SPAN,
473 name = config.network.node_name.as_str()
474 );
475 let _enter = span.enter();
476
477 let executor = sc_service::new_wasm_executor(&config.executor);
478
479 let (client, backend, keystore_container, mut task_manager) =
480 sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
481 .expect("Fail to new full parts");
482
483 let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
484 let client = Arc::new(client);
485 let mock_pot_verifier = Arc::new(MockPotVerfier::default());
486 let chain_constants = client
487 .runtime_api()
488 .chain_constants(client.info().best_hash)
489 .expect("Fail to get chain constants");
490 client
491 .execution_extensions()
492 .set_extensions_factory(MockExtensionsFactory::<
493 _,
494 DomainBlock,
495 sc_domains::RuntimeExecutor,
496 _,
497 >::new(
498 client.clone(),
499 domain_executor.clone(),
500 Arc::clone(&mock_pot_verifier),
501 backend.clone(),
502 chain_constants.confirmation_depth_k(),
503 ));
504
505 let select_chain = sc_consensus::LongestChain::new(backend.clone());
506 let transaction_pool = Arc::from(BasicPool::new_full(
507 tx_pool_options,
508 config.role.is_authority().into(),
509 config.prometheus_registry(),
510 task_manager.spawn_essential_handle(),
511 client.clone(),
512 ));
513
514 let block_import = MockBlockImport::<_, _>::new(client.clone());
515
516 let mut net_config = sc_network::config::FullNetworkConfiguration::<
517 _,
518 _,
519 NetworkWorker<_, _>,
520 >::new(&config.network, None);
521 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
522 xdm_gossip_peers_set_config();
523 net_config.add_notification_protocol(xdm_gossip_notification_config);
524
525 let (network_service, system_rpc_tx, tx_handler_controller, sync_service) =
526 sc_service::build_network(sc_service::BuildNetworkParams {
527 config: &config,
528 net_config,
529 client: client.clone(),
530 transaction_pool: transaction_pool.clone(),
531 spawn_handle: task_manager.spawn_handle(),
532 import_queue: mock_import_queue(
533 block_import.clone(),
534 &task_manager.spawn_essential_handle(),
535 ),
536 block_announce_validator_builder: None,
537 warp_sync_config: None,
538 block_relay: None,
539 metrics: NotificationMetrics::new(None),
540 })
541 .expect("Should be able to build network");
542
543 let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
544 network: network_service.clone(),
545 client: client.clone(),
546 keystore: keystore_container.keystore(),
547 task_manager: &mut task_manager,
548 transaction_pool: transaction_pool.clone(),
549 rpc_builder: Box::new(move |_| rpc_builder()),
550 backend: backend.clone(),
551 system_rpc_tx,
552 config,
553 telemetry: None,
554 tx_handler_controller,
555 sync_service: sync_service.clone(),
556 tracing_execute_block: None,
557 })
558 .expect("Should be able to spawn tasks");
559
560 let mock_solution =
561 Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
562
563 let mut gossip_builder = GossipWorkerBuilder::new();
564
565 task_manager
566 .spawn_essential_handle()
567 .spawn_essential_blocking(
568 "consensus-chain-channel-update-worker",
569 None,
570 Box::pin(
571 domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
572 ChainId::Consensus,
573 client.clone(),
574 sync_service.clone(),
575 gossip_builder.gossip_msg_sink(),
576 ),
577 ),
578 );
579
580 let (consensus_msg_sink, consensus_msg_receiver) =
581 tracing_unbounded("consensus_message_channel", 100);
582
583 let consensus_listener =
584 cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
585 ChainId::Consensus,
586 client.clone(),
587 client.clone(),
588 transaction_pool.clone(),
589 network_service.clone(),
590 consensus_msg_receiver,
591 domain_executor,
592 sync_service.clone(),
593 );
594
595 task_manager
596 .spawn_essential_handle()
597 .spawn_essential_blocking(
598 "consensus-message-listener",
599 None,
600 Box::pin(consensus_listener),
601 );
602
603 gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
604
605 task_manager.spawn_essential_handle().spawn_blocking(
606 "mmr-gadget",
607 None,
608 mmr_gadget::MmrGadget::start(
609 client.clone(),
610 backend.clone(),
611 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
612 ),
613 );
614
615 MockConsensusNode {
616 task_manager,
617 client,
618 backend,
619 executor,
620 transaction_pool,
621 select_chain,
622 network_service,
623 xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
624 sync_service,
625 rpc_handlers,
626 next_slot: 1,
627 mock_pot_verifier,
628 new_slot_notification_subscribers: Vec::new(),
629 acknowledgement_sender_subscribers: Vec::new(),
630 block_import,
631 xdm_gossip_worker_builder: Some(gossip_builder),
632 mock_solution,
633 log_prefix,
634 key,
635 finalize_block_depth,
636 base_path,
637 }
638 }
639
640 pub fn run(
642 tokio_handle: tokio::runtime::Handle,
643 key: Sr25519Keyring,
644 base_path: BasePath,
645 ) -> MockConsensusNode {
646 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
647 }
648
649 pub fn run_with_private_evm(
651 tokio_handle: tokio::runtime::Handle,
652 key: Sr25519Keyring,
653 evm_owner: Option<Sr25519Keyring>,
654 base_path: BasePath,
655 ) -> MockConsensusNode {
656 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
657 }
658
659 #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
661 pub fn run_with_finalization_depth(
662 tokio_handle: tokio::runtime::Handle,
663 key: Sr25519Keyring,
664 base_path: BasePath,
665 finalize_block_depth: Option<NumberFor<Block>>,
666 private_evm: bool,
667 evm_owner: Option<Sr25519Keyring>,
668 ) -> MockConsensusNode {
669 let rpc_config = MockConsensusNodeRpcConfig {
670 base_path,
671 finalize_block_depth,
672 private_evm,
673 evm_owner,
674 rpc_addr: None,
675 rpc_port: None,
676 };
677
678 Self::run_with_rpc_builder(
679 tokio_handle,
680 key,
681 rpc_config,
682 Box::new(|| Ok(RpcModule::new(()))),
683 )
684 }
685
686 pub fn run_with_rpc_builder(
688 tokio_handle: tokio::runtime::Handle,
689 key: Sr25519Keyring,
690 rpc_config: MockConsensusNodeRpcConfig,
691 rpc_builder: Box<
692 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
693 >,
694 ) -> MockConsensusNode {
695 let MockConsensusNodeRpcConfig {
696 base_path,
697 finalize_block_depth,
698 private_evm,
699 evm_owner,
700 rpc_addr,
701 rpc_port,
702 } = rpc_config;
703
704 let config = node_config(
705 tokio_handle,
706 key,
707 vec![],
708 false,
709 false,
710 false,
711 private_evm,
712 evm_owner.map(|key| key.to_account_id()),
713 base_path.clone(),
714 rpc_addr,
715 rpc_port,
716 );
717
718 Self::run_with_configuration(config, key, base_path, finalize_block_depth, rpc_builder)
719 }
720
721 #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
723 pub fn run_with_rpc_options(
724 tokio_handle: tokio::runtime::Handle,
725 key: Sr25519Keyring,
726 rpc_config: MockConsensusNodeRpcConfig,
727 ) -> MockConsensusNode {
728 Self::run_with_rpc_builder(
729 tokio_handle,
730 key,
731 rpc_config,
732 Box::new(|| Ok(RpcModule::new(()))),
733 )
734 }
735
736 pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
738 self.xdm_gossip_worker_builder
739 .as_mut()
740 .expect("gossip message worker have not started yet")
741 }
742
743 pub fn start_cross_domain_gossip_message_worker(&mut self) {
745 let xdm_gossip_worker_builder = self
746 .xdm_gossip_worker_builder
747 .take()
748 .expect("gossip message worker have not started yet");
749 let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
750 self.network_service.clone(),
751 self.xdm_gossip_notification_service
752 .take()
753 .expect("XDM gossip notification service must be used only once"),
754 self.sync_service.clone(),
755 );
756 self.task_manager
757 .spawn_essential_handle()
758 .spawn_essential_blocking(
759 "cross-domain-gossip-message-worker",
760 None,
761 Box::pin(cross_domain_message_gossip_worker.run()),
762 );
763 }
764
765 pub fn next_slot(&self) -> u64 {
767 self.next_slot
768 }
769
770 pub fn set_next_slot(&mut self, next_slot: u64) {
772 self.next_slot = next_slot;
773 }
774
775 pub fn produce_slot(&mut self) -> NewSlot {
777 let slot = Slot::from(self.next_slot);
778 let proof_of_time = PotOutput::from(
779 <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
780 .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
781 );
782 self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
783 self.next_slot += 1;
784
785 (slot, proof_of_time)
786 }
787
788 pub async fn notify_new_slot_and_wait_for_bundle(
790 &mut self,
791 new_slot: NewSlot,
792 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
793 self.new_slot_notification_subscribers
794 .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
795
796 self.confirm_acknowledgement().await;
797 self.get_bundle_from_tx_pool(new_slot)
798 }
799
800 pub async fn produce_slot_and_wait_for_bundle_submission(
802 &mut self,
803 ) -> (
804 NewSlot,
805 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
806 ) {
807 let slot = self.produce_slot();
808 for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
809 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
810 return (slot, bundle);
811 }
812 }
813 panic!(
814 "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
815 );
816 }
817
818 pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
820 &mut self,
821 operator_id: OperatorId,
822 ) -> (
823 NewSlot,
824 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
825 ) {
826 loop {
827 let slot = self.produce_slot();
828 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
829 && bundle.sealed_header().proof_of_election().operator_id == operator_id
830 {
831 return (slot, bundle);
832 }
833 }
834 }
835
836 pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
838 let (tx, rx) = mpsc::unbounded();
839 self.new_slot_notification_subscribers.push(tx);
840 rx
841 }
842
843 pub fn new_acknowledgement_sender_stream(
845 &mut self,
846 ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
847 let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
848 self.acknowledgement_sender_subscribers.push(tx);
849 rx
850 }
851
852 pub async fn confirm_acknowledgement(&mut self) {
857 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
858
859 {
862 self.acknowledgement_sender_subscribers
863 .retain(|subscriber| {
864 subscriber
865 .unbounded_send(acknowledgement_sender.clone())
866 .is_ok()
867 });
868 drop(acknowledgement_sender);
869 }
870
871 while acknowledgement_receiver.next().await.is_some() {
873 }
875 }
876
877 pub async fn confirm_block_import_processed(&mut self) {
879 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
882 {
883 let value = (NumberFor::<Block>::default(), acknowledgement_sender);
887 self.block_import
888 .block_importing_notification_subscribers
889 .lock()
890 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
891 }
892 while acknowledgement_receiver.next().await.is_some() {
893 }
895
896 self.confirm_acknowledgement().await;
898 }
899
900 pub fn block_importing_notification_stream(
902 &self,
903 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
904 self.block_import.block_importing_notification_stream()
905 }
906
907 pub fn get_bundle_from_tx_pool(
909 &self,
910 new_slot: NewSlot,
911 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
912 for ready_tx in self.transaction_pool.ready() {
913 let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
914 .expect("should be able to decode");
915 if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
916 ext.function
917 && opaque_bundle.sealed_header().slot_number() == *new_slot.0
918 {
919 return Some(opaque_bundle);
920 }
921 }
922 None
923 }
924
925 pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
927 self.transaction_pool
928 .submit_one(
929 self.client.info().best_hash,
930 TransactionSource::External,
931 tx,
932 )
933 .await
934 .map_err(|err| {
935 err.into_pool_error()
936 .expect("should always be a pool error")
937 })
938 }
939
940 pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
942 let txs: Vec<_> = self
943 .transaction_pool
944 .ready()
945 .map(|t| self.transaction_pool.hash_of(&t.data))
946 .collect();
947 self.prune_txs_from_pool(txs.as_slice()).await
948 }
949
950 pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
952 self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
953 .await
954 }
955
956 async fn prune_txs_from_pool(
957 &self,
958 tx_hashes: &[BlockHashFor<Block>],
959 ) -> Result<(), Box<dyn Error>> {
960 let hash_and_number = HashAndNumber {
961 number: self.client.info().best_number,
962 hash: self.client.info().best_hash,
963 };
964 self.transaction_pool
965 .pool()
966 .prune_known(&hash_and_number, tx_hashes);
967 tokio::time::sleep(time::Duration::from_millis(1)).await;
970 self.transaction_pool
971 .pool()
972 .validated_pool()
973 .clear_stale(&hash_and_number);
974 Ok(())
975 }
976
977 pub fn does_receipt_exist(
979 &self,
980 er_hash: BlockHashFor<DomainBlock>,
981 ) -> Result<bool, Box<dyn Error>> {
982 Ok(self
983 .client
984 .runtime_api()
985 .execution_receipt(self.client.info().best_hash, er_hash)?
986 .is_some())
987 }
988
989 pub fn get_domain_staking_summary(
991 &self,
992 domain_id: DomainId,
993 ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
994 Ok(self
995 .client
996 .runtime_api()
997 .domain_stake_summary(self.client.info().best_hash, domain_id)?)
998 }
999
1000 pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
1002 Ok(self
1003 .client
1004 .runtime_api()
1005 .block_pruning_depth(self.client.info().best_hash)?)
1006 }
1007
1008 pub fn wait_for_fraud_proof<FP>(
1011 &self,
1012 fraud_proof_predicate: FP,
1013 ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
1014 where
1015 FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
1016 {
1017 let tx_pool = self.transaction_pool.clone();
1018 let mut import_tx_stream = self.transaction_pool.import_notification_stream();
1019 Box::pin(async move {
1020 while let Some(ready_tx_hash) = import_tx_stream.next().await {
1021 let ready_tx = tx_pool
1022 .ready_transaction(&ready_tx_hash)
1023 .expect("Just get the ready tx hash from import stream; qed");
1024 let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
1025 &mut ready_tx.data.encode().as_slice(),
1026 )
1027 .expect("Decode tx must success");
1028 if let subspace_test_runtime::RuntimeCall::Domains(
1029 pallet_domains::Call::submit_fraud_proof { fraud_proof },
1030 ) = ext.function
1031 && fraud_proof_predicate(&fraud_proof)
1032 {
1033 return *fraud_proof;
1034 }
1035 }
1036 unreachable!()
1037 })
1038 }
1039
1040 pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
1042 self.client
1043 .runtime_api()
1044 .free_balance(self.client.info().best_hash, account_id)
1045 .expect("Fail to get account free balance")
1046 }
1047
1048 pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
1051 self.network_service.report_peer(
1054 addr.peer_id,
1055 ReputationChange::new_fatal("Peer banned by test (1)"),
1056 );
1057 self.network_service.report_peer(
1058 addr.peer_id,
1059 ReputationChange::new_fatal("Peer banned by test (2)"),
1060 );
1061 }
1062
1063 pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
1065 self.network_service.report_peer(
1068 addr.peer_id,
1069 ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
1070 );
1071 self.network_service.report_peer(
1072 addr.peer_id,
1073 ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
1074 );
1075 }
1076
1077 pub async fn stop(self) -> Result<(), std::io::Error> {
1083 let lock_file_path = self.base_path.path().join("paritydb").join("lock");
1084 std::mem::drop(self);
1086
1087 sleep(Duration::from_secs(2)).await;
1090
1091 if let Err(err) = std::fs::remove_file(lock_file_path) {
1093 tracing::error!("deleting paritydb lock file failed: {err:?}");
1094 }
1095 Ok(())
1096 }
1097}
1098
1099impl MockConsensusNode {
1100 async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
1101 self.transaction_pool
1102 .ready_at(parent_hash)
1103 .await
1104 .map(|pending_tx| pending_tx.data().as_ref().clone())
1105 .collect()
1106 }
1107
1108 async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
1109 let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1110 <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1111 ));
1112 let subspace_inherents =
1113 sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1114
1115 let inherent_data = (subspace_inherents, timestamp)
1116 .create_inherent_data()
1117 .await?;
1118
1119 Ok(inherent_data)
1120 }
1121
1122 fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1123 let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1124 slot,
1125 solution: self.mock_solution.clone(),
1126 pot_info: PreDigestPotInfo::V0 {
1127 proof_of_time: Default::default(),
1128 future_proof_of_time: Default::default(),
1129 },
1130 };
1131 let mut digest = Digest::default();
1132 digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1133 digest
1134 }
1135
1136 async fn build_block(
1138 &self,
1139 slot: Slot,
1140 parent_hash: BlockHashFor<Block>,
1141 extrinsics: Vec<ExtrinsicFor<Block>>,
1142 ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1143 let inherent_digest = self.mock_subspace_digest(slot);
1144
1145 let inherent_data = Self::mock_inherent_data(slot).await?;
1146
1147 let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1148 .on_parent_block(parent_hash)
1149 .fetch_parent_block_number(self.client.as_ref())?
1150 .with_inherent_digests(inherent_digest)
1151 .build()
1152 .expect("Creates new block builder");
1153
1154 let inherent_txns = block_builder.create_inherents(inherent_data)?;
1155
1156 for tx in inherent_txns.into_iter().chain(extrinsics) {
1157 if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1158 tracing::error!("Invalid transaction while building block: {}", err);
1159 }
1160 }
1161
1162 let (block, storage_changes, _) = block_builder.build()?.into_inner();
1163 Ok((block, storage_changes))
1164 }
1165
1166 async fn import_block(
1168 &self,
1169 block: Block,
1170 storage_changes: Option<StorageChanges>,
1171 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1172 let (header, body) = block.deconstruct();
1173
1174 let header_hash = header.hash();
1175 let header_number = header.number;
1176
1177 let block_import_params = {
1178 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1179 import_block.body = Some(body);
1180 import_block.state_action = match storage_changes {
1181 Some(changes) => {
1182 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1183 }
1184 None => StateAction::Execute,
1185 };
1186 import_block
1187 };
1188
1189 let import_result = self.block_import.import_block(block_import_params).await?;
1190
1191 if let Some(finalized_block_hash) = self
1192 .finalize_block_depth
1193 .and_then(|depth| header_number.checked_sub(depth))
1194 .and_then(|block_to_finalize| {
1195 self.client
1196 .hash(block_to_finalize)
1197 .expect("Block hash not found for number: {block_to_finalize:?}")
1198 })
1199 {
1200 self.client
1201 .finalize_block(finalized_block_hash, None, true)
1202 .unwrap();
1203 }
1204
1205 match import_result {
1206 ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1207 bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1208 }
1209 }
1210
1211 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1214 pub async fn produce_block_with_slot_at(
1215 &mut self,
1216 new_slot: NewSlot,
1217 parent_hash: BlockHashFor<Block>,
1218 maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1219 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1220 let block_timer = time::Instant::now();
1221
1222 let extrinsics = match maybe_extrinsics {
1223 Some(extrinsics) => extrinsics,
1224 None => self.collect_txn_from_pool(parent_hash).await,
1225 };
1226 let tx_hashes: Vec<_> = extrinsics
1227 .iter()
1228 .map(|t| self.transaction_pool.hash_of(t))
1229 .collect();
1230
1231 let (block, storage_changes) = self
1232 .build_block(new_slot.0, parent_hash, extrinsics)
1233 .await?;
1234
1235 log_new_block(&block, block_timer.elapsed().as_millis());
1236
1237 let res = match self.import_block(block, Some(storage_changes)).await {
1238 Ok(hash) => {
1239 self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1242 Ok(hash)
1243 }
1244 err => err,
1245 };
1246 self.confirm_block_import_processed().await;
1247 res
1248 }
1249
1250 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1253 pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1254 self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1255 .await?;
1256 Ok(())
1257 }
1258
1259 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1261 pub async fn produce_block_with_extrinsics(
1262 &mut self,
1263 extrinsics: Vec<ExtrinsicFor<Block>>,
1264 ) -> Result<(), Box<dyn Error>> {
1265 let slot = self.produce_slot();
1266 self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1267 .await?;
1268 Ok(())
1269 }
1270
1271 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1273 pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1274 for _ in 0..n {
1275 let slot = self.produce_slot();
1276 self.produce_block_with_slot(slot).await?;
1277 }
1278 Ok(())
1279 }
1280
1281 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1283 pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1284 for _ in 0..n {
1285 let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1286 self.produce_block_with_slot(slot).await?;
1287 }
1288 Ok(())
1289 }
1290
1291 pub fn account_nonce(&self) -> u32 {
1293 self.client
1294 .runtime_api()
1295 .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1296 .expect("Fail to get account nonce")
1297 }
1298
1299 pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1301 self.client
1302 .runtime_api()
1303 .account_nonce(self.client.info().best_hash, account_id)
1304 .expect("Fail to get account nonce")
1305 }
1306
1307 pub fn construct_extrinsic(
1309 &self,
1310 nonce: u32,
1311 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1312 ) -> UncheckedExtrinsic {
1313 construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1314 }
1315
1316 pub fn construct_unsigned_extrinsic(
1318 &self,
1319 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1320 ) -> UncheckedExtrinsic {
1321 construct_unsigned_extrinsic(&self.client, function)
1322 }
1323
1324 pub async fn construct_and_send_extrinsic_with(
1326 &self,
1327 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1328 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1329 let nonce = self.account_nonce();
1330 let extrinsic = self.construct_extrinsic(nonce, function);
1331 self.rpc_handlers.send_transaction(extrinsic.into()).await
1332 }
1333
1334 pub async fn send_extrinsic(
1336 &self,
1337 extrinsic: impl Into<OpaqueExtrinsic>,
1338 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1339 self.rpc_handlers.send_transaction(extrinsic.into()).await
1340 }
1341}
1342
1343fn log_new_block(block: &Block, used_time_ms: u128) {
1344 tracing::info!(
1345 "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1346 block.header().number(),
1347 used_time_ms,
1348 block.header().hash(),
1349 block.header().parent_hash(),
1350 block.extrinsics().len(),
1351 block
1352 .extrinsics()
1353 .iter()
1354 .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1355 .collect::<Vec<_>>()
1356 .join(", ")
1357 );
1358}
1359
1360fn mock_import_queue<Block: BlockT, I>(
1361 block_import: I,
1362 spawner: &impl SpawnEssentialNamed,
1363) -> BasicQueue<Block>
1364where
1365 I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1366{
1367 BasicQueue::new(
1368 MockVerifier::default(),
1369 Box::new(block_import),
1370 None,
1371 spawner,
1372 None,
1373 )
1374}
1375
1376struct MockVerifier<Block> {
1377 _marker: PhantomData<Block>,
1378}
1379
1380impl<Block> Default for MockVerifier<Block> {
1381 fn default() -> Self {
1382 Self {
1383 _marker: PhantomData,
1384 }
1385 }
1386}
1387
1388#[async_trait::async_trait]
1389impl<Block> VerifierT<Block> for MockVerifier<Block>
1390where
1391 Block: BlockT,
1392{
1393 async fn verify(
1394 &self,
1395 block_params: BlockImportParams<Block>,
1396 ) -> Result<BlockImportParams<Block>, String> {
1397 Ok(block_params)
1398 }
1399}
1400
1401#[allow(clippy::type_complexity)]
1404struct MockBlockImport<Client, Block: BlockT> {
1405 inner: Arc<Client>,
1406 block_importing_notification_subscribers:
1407 Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1408}
1409
1410impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1411 fn new(inner: Arc<Client>) -> Self {
1412 MockBlockImport {
1413 inner,
1414 block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1415 }
1416 }
1417
1418 fn block_importing_notification_stream(
1420 &self,
1421 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1422 let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1423 self.block_importing_notification_subscribers
1424 .lock()
1425 .push(tx);
1426 rx
1427 }
1428}
1429
1430impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1431 fn clone(&self) -> Self {
1432 MockBlockImport {
1433 inner: self.inner.clone(),
1434 block_importing_notification_subscribers: self
1435 .block_importing_notification_subscribers
1436 .clone(),
1437 }
1438 }
1439}
1440
1441#[async_trait::async_trait]
1442impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1443where
1444 Block: BlockT,
1445 for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1446 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1447 Client::Api: ApiExt<Block>,
1448{
1449 type Error = ConsensusError;
1450
1451 async fn import_block(
1452 &self,
1453 mut block: BlockImportParams<Block>,
1454 ) -> Result<ImportResult, Self::Error> {
1455 let block_number = *block.header.number();
1456 block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1457
1458 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1459
1460 {
1463 let value = (block_number, acknowledgement_sender);
1464 self.block_importing_notification_subscribers
1465 .lock()
1466 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1467 }
1468
1469 while acknowledgement_receiver.next().await.is_some() {
1470 }
1472
1473 self.inner.import_block(block).await
1474 }
1475
1476 async fn check_block(
1477 &self,
1478 block: BlockCheckParams<Block>,
1479 ) -> Result<ImportResult, Self::Error> {
1480 self.inner.check_block(block).await
1481 }
1482}
1483
1484#[macro_export]
1486macro_rules! produce_blocks {
1487 ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1488 {
1489 async {
1490 let domain_fut = {
1491 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1492 futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1493 $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1494 futures::future::join_all(futs)
1495 };
1496 $primary_node.produce_blocks_with_bundles($count).await?;
1497 domain_fut.await;
1498 Ok::<(), Box<dyn std::error::Error>>(())
1499 }
1500 }
1501 };
1502}
1503
1504#[macro_export]
1507macro_rules! produce_block_with {
1508 ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1509 {
1510 async {
1511 let domain_fut = {
1512 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1513 futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1514 $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1515 futures::future::join_all(futs)
1516 };
1517 $primary_node_produce_block.await?;
1518 domain_fut.await;
1519 Ok::<(), Box<dyn std::error::Error>>(())
1520 }
1521 }
1522 };
1523}
1524
1525#[macro_export]
1527macro_rules! produce_blocks_until {
1528 ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1529 async {
1530 while !$condition {
1531 produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1532 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1533 }
1534 Ok::<(), Box<dyn std::error::Error>>(())
1535 }
1536 };
1537}
1538
1539type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1540
1541fn get_signed_extra(
1542 current_block: u64,
1543 immortal: bool,
1544 nonce: u32,
1545 tip: BalanceOf<Runtime>,
1546) -> SignedExtra {
1547 let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1548 .checked_next_power_of_two()
1549 .map(|c| c / 2)
1550 .unwrap_or(2);
1551 (
1552 frame_system::CheckNonZeroSender::<Runtime>::new(),
1553 frame_system::CheckSpecVersion::<Runtime>::new(),
1554 frame_system::CheckTxVersion::<Runtime>::new(),
1555 frame_system::CheckGenesis::<Runtime>::new(),
1556 frame_system::CheckMortality::<Runtime>::from(if immortal {
1557 generic::Era::Immortal
1558 } else {
1559 generic::Era::mortal(period, current_block)
1560 }),
1561 frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1562 frame_system::CheckWeight::<Runtime>::new(),
1563 pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1564 BalanceTransferCheckExtension::<Runtime>::default(),
1565 pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1566 pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1567 pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1568 )
1569}
1570
1571fn construct_extrinsic_raw_payload<Client>(
1572 client: impl AsRef<Client>,
1573 function: <Runtime as frame_system::Config>::RuntimeCall,
1574 immortal: bool,
1575 nonce: u32,
1576 tip: BalanceOf<Runtime>,
1577) -> (
1578 SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1579 SignedExtra,
1580)
1581where
1582 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1583 u64: From<BlockNumberFor<Runtime>>,
1584 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1585{
1586 let current_block_hash = client.as_ref().info().best_hash;
1587 let current_block = client.as_ref().info().best_number.saturated_into();
1588 let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1589 let extra = get_signed_extra(current_block, immortal, nonce, tip);
1590 (
1591 generic::SignedPayload::<
1592 <Runtime as frame_system::Config>::RuntimeCall,
1593 SignedExtra,
1594 >::from_raw(
1595 function,
1596 extra.clone(),
1597 ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1598 ),
1599 extra,
1600 )
1601}
1602
1603pub fn construct_extrinsic_generic<Client>(
1605 client: impl AsRef<Client>,
1606 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1607 caller: Sr25519Keyring,
1608 immortal: bool,
1609 nonce: u32,
1610 tip: BalanceOf<Runtime>,
1611) -> UncheckedExtrinsic
1612where
1613 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1614 u64: From<BlockNumberFor<Runtime>>,
1615 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1616{
1617 let function = function.into();
1618 let (raw_payload, extra) =
1619 construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1620 let signature = raw_payload.using_encoded(|e| caller.sign(e));
1621 UncheckedExtrinsic::new_signed(
1622 function,
1623 MultiAddress::Id(caller.to_account_id()),
1624 Signature::Sr25519(signature),
1625 extra,
1626 )
1627}
1628
1629fn construct_unsigned_extrinsic<Client>(
1631 client: impl AsRef<Client>,
1632 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1633) -> UncheckedExtrinsic
1634where
1635 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1636 u64: From<BlockNumberFor<Runtime>>,
1637 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1638{
1639 let function = function.into();
1640 let current_block = client.as_ref().info().best_number.saturated_into();
1641 let extra = get_signed_extra(current_block, true, 0, 0);
1642 UncheckedExtrinsic::new_transaction(function, extra)
1643}