1#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34 BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41 NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44 DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45 OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, RpcEndpoint,
46 WasmExecutionMethod, WasmtimeInstantiationStrategy,
47};
48use sc_service::{BasePath, BlocksPruning, Configuration, Role, SpawnTasksParams, TaskManager};
49use sc_transaction_pool::{BasicPool, FullChainApi, Options};
50use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
51use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
52use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
53use sp_api::{ApiExt, ProvideRuntimeApi};
54use sp_blockchain::{HashAndNumber, HeaderBackend};
55use sp_consensus::{BlockOrigin, Error as ConsensusError};
56use sp_consensus_slots::Slot;
57use sp_consensus_subspace::digests::{
58 CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
59};
60use sp_consensus_subspace::{PotExtension, SubspaceApi};
61use sp_core::H256;
62use sp_core::offchain::OffchainDbExt;
63use sp_core::offchain::storage::OffchainDb;
64use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
65use sp_domains::bundle::OpaqueBundle;
66use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
67use sp_domains_fraud_proof::fraud_proof::FraudProof;
68use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
69use sp_externalities::Extensions;
70use sp_inherents::{InherentData, InherentDataProvider};
71use sp_keyring::Sr25519Keyring;
72use sp_messenger::MessengerApi;
73use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
74use sp_mmr_primitives::MmrApi;
75use sp_runtime::generic::{Digest, SignedPayload};
76use sp_runtime::traits::{
77 BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
78};
79use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
80use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
81use sp_timestamp::Timestamp;
82use std::collections::HashMap;
83use std::error::Error;
84use std::marker::PhantomData;
85use std::net::SocketAddr;
86use std::pin::Pin;
87use std::sync::Arc;
88use std::time;
89use std::time::Duration;
90use subspace_core_primitives::pot::PotOutput;
91use subspace_core_primitives::solutions::Solution;
92use subspace_core_primitives::{BlockNumber, PublicKey};
93use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
94use subspace_runtime_primitives::opaque::Block;
95use subspace_runtime_primitives::{
96 AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
97};
98use subspace_service::{FullSelectChain, RuntimeExecutor};
99use subspace_test_client::{Backend, Client, chain_spec};
100use subspace_test_primitives::OnchainStateApi;
101use subspace_test_runtime::{
102 Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
103};
104use substrate_frame_rpc_system::AccountNonceApi;
105use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
106use tokio::time::sleep;
107
108pub type FraudProofFor<Block, DomainBlock> =
110 FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
111
112const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
113
114#[expect(clippy::too_many_arguments)]
119pub fn node_config(
120 tokio_handle: tokio::runtime::Handle,
121 key: Sr25519Keyring,
122 boot_nodes: Vec<MultiaddrWithPeerId>,
123 run_farmer: bool,
124 force_authoring: bool,
125 force_synced: bool,
126 private_evm: bool,
127 evm_owner_account: Option<AccountId>,
128 base_path: BasePath,
129 rpc_addr: Option<SocketAddr>,
130 rpc_port: Option<u16>,
131) -> Configuration {
132 let root = base_path.path();
133 let role = if run_farmer {
134 Role::Authority
135 } else {
136 Role::Full
137 };
138 let key_seed = key.to_seed();
139 let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
140
141 let mut network_config = NetworkConfiguration::new(
142 key_seed.to_string(),
143 "network/test/0.1",
144 Default::default(),
145 None,
146 );
147
148 network_config.boot_nodes = boot_nodes;
149
150 network_config.allow_non_globals_in_dht = true;
151
152 let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
153 network_config.listen_addresses.push(addr.clone());
154
155 network_config.public_addresses.push(addr);
156
157 network_config.transport = TransportConfig::MemoryOnly;
158
159 network_config.force_synced = force_synced;
160
161 let rpc_configuration = match rpc_addr {
162 Some(listen_addr) => {
163 let port = rpc_port.unwrap_or(9944);
164 RpcConfiguration {
165 addr: Some(vec![RpcEndpoint {
166 batch_config: RpcBatchRequestConfig::Disabled,
167 max_connections: 100,
168 listen_addr,
169 rpc_methods: Default::default(),
170 rate_limit: None,
171 rate_limit_trust_proxy_headers: false,
172 rate_limit_whitelisted_ips: vec![],
173 max_payload_in_mb: 15,
174 max_payload_out_mb: 15,
175 max_subscriptions_per_connection: 100,
176 max_buffer_capacity_per_connection: 100,
177 cors: None,
178 retry_random_port: true,
179 is_optional: false,
180 }]),
181 max_request_size: 15,
182 max_response_size: 15,
183 id_provider: None,
184 max_subs_per_conn: 1024,
185 port,
186 message_buffer_capacity: 1024,
187 batch_config: RpcBatchRequestConfig::Disabled,
188 max_connections: 1000,
189 cors: None,
190 methods: Default::default(),
191 rate_limit: None,
192 rate_limit_whitelisted_ips: vec![],
193 rate_limit_trust_proxy_headers: false,
194 request_logger_limit: 1024,
195 }
196 }
197 None => RpcConfiguration {
198 addr: None,
199 max_request_size: 0,
200 max_response_size: 0,
201 id_provider: None,
202 max_subs_per_conn: 0,
203 port: 0,
204 message_buffer_capacity: 0,
205 batch_config: RpcBatchRequestConfig::Disabled,
206 max_connections: 0,
207 cors: None,
208 methods: Default::default(),
209 rate_limit: None,
210 rate_limit_whitelisted_ips: vec![],
211 rate_limit_trust_proxy_headers: false,
212 request_logger_limit: 1024,
213 },
214 };
215
216 Configuration {
217 impl_name: "subspace-test-node".to_string(),
218 impl_version: "0.1".to_string(),
219 role,
220 tokio_handle,
221 transaction_pool: Default::default(),
222 network: network_config,
223 keystore: KeystoreConfig::InMemory,
224 database: DatabaseSource::ParityDb {
225 path: root.join("paritydb"),
226 },
227 trie_cache_maximum_size: Some(64 * 1024 * 1024),
228 state_pruning: Default::default(),
229 blocks_pruning: BlocksPruning::KeepAll,
230 chain_spec: Box::new(spec),
231 executor: ExecutorConfiguration {
232 wasm_method: WasmExecutionMethod::Compiled {
233 instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
234 },
235 max_runtime_instances: 8,
236 default_heap_pages: None,
237 runtime_cache_size: 2,
238 },
239 wasm_runtime_overrides: Default::default(),
240 rpc: rpc_configuration,
241 prometheus_config: None,
242 telemetry_endpoints: None,
243 offchain_worker: OffchainWorkerConfig {
244 enabled: false,
245 indexing_enabled: true,
246 },
247 force_authoring,
248 disable_grandpa: false,
249 dev_key_seed: Some(key_seed),
250 tracing_targets: None,
251 tracing_receiver: Default::default(),
252 announce_block: true,
253 data_path: base_path.path().into(),
254 base_path,
255 warm_up_trie_cache: None,
256 }
257}
258
259type StorageChanges = sp_api::StorageChanges<Block>;
260
261struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
262 consensus_client: Arc<Client>,
263 consensus_backend: Arc<CBackend>,
264 executor: Arc<Executor>,
265 mock_pot_verifier: Arc<MockPotVerfier>,
266 confirmation_depth_k: BlockNumber,
267 _phantom: PhantomData<DomainBlock>,
268}
269
270impl<Client, DomainBlock, Executor, CBackend>
271 MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
272{
273 fn new(
274 consensus_client: Arc<Client>,
275 executor: Arc<Executor>,
276 mock_pot_verifier: Arc<MockPotVerfier>,
277 consensus_backend: Arc<CBackend>,
278 confirmation_depth_k: BlockNumber,
279 ) -> Self {
280 Self {
281 consensus_client,
282 consensus_backend,
283 executor,
284 mock_pot_verifier,
285 confirmation_depth_k,
286 _phantom: Default::default(),
287 }
288 }
289}
290
291#[derive(Default)]
292struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
293
294impl MockPotVerfier {
295 fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
296 self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
297 }
298
299 fn inject_pot(&self, slot: u64, pot: PotOutput) {
300 self.0.lock().insert(slot, pot);
301 }
302}
303
304impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
305 for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
306where
307 Block: BlockT,
308 Block::Hash: From<H256> + Into<H256>,
309 DomainBlock: BlockT,
310 DomainBlock::Hash: Into<H256> + From<H256>,
311 Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
312 Client::Api: DomainsApi<Block, DomainBlock::Header>
313 + BundleProducerElectionApi<Block, Balance>
314 + MessengerApi<Block, NumberFor<Block>, Block::Hash>
315 + MmrApi<Block, H256, NumberFor<Block>>,
316 Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
317 CBackend: BackendT<Block> + 'static,
318{
319 fn extensions_for(
320 &self,
321 _block_hash: Block::Hash,
322 _block_number: NumberFor<Block>,
323 ) -> Extensions {
324 let confirmation_depth_k = self.confirmation_depth_k;
325 let mut exts = Extensions::new();
326 exts.register(FraudProofExtension::new(Arc::new(
327 FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
328 self.consensus_client.clone(),
329 self.executor.clone(),
330 move |client, executor| {
331 let extension_factory =
332 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
333 client,
334 executor,
335 confirmation_depth_k,
336 );
337 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
338 },
339 ),
340 )));
341 exts.register(SubspaceMmrExtension::new(Arc::new(
342 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
343 self.consensus_client.clone(),
344 confirmation_depth_k,
345 ),
346 )));
347 exts.register(MessengerExtension::new(Arc::new(
348 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
349 self.consensus_client.clone(),
350 self.executor.clone(),
351 ),
352 )));
353
354 if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
355 let offchain_db = OffchainDb::new(offchain_storage);
356 exts.register(OffchainDbExt::new(offchain_db));
357 }
358 exts.register(PotExtension::new({
359 let client = Arc::clone(&self.consensus_client);
360 let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
361 Box::new(
362 move |parent_hash, slot, proof_of_time, _quick_verification| {
363 let parent_hash = {
364 let mut converted_parent_hash = Block::Hash::default();
365 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
366 converted_parent_hash
367 };
368
369 let parent_header = match client.header(parent_hash) {
370 Ok(Some(parent_header)) => parent_header,
371 _ => return false,
372 };
373 let parent_pre_digest = match extract_pre_digest(&parent_header) {
374 Ok(parent_pre_digest) => parent_pre_digest,
375 _ => return false,
376 };
377
378 let parent_slot = parent_pre_digest.slot();
379 if slot <= *parent_slot {
380 return false;
381 }
382
383 mock_pot_verifier.is_valid(slot, proof_of_time)
384 },
385 )
386 }));
387 exts
388 }
389}
390
391type NewSlot = (Slot, PotOutput);
392
393pub struct MockConsensusNode {
395 pub task_manager: TaskManager,
397 pub client: Arc<Client>,
399 pub backend: Arc<Backend>,
401 pub executor: RuntimeExecutor,
403 pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
405 pub select_chain: FullSelectChain,
407 pub network_service: Arc<dyn NetworkService + Send + Sync>,
409 pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
411 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
413 pub rpc_handlers: sc_service::RpcHandlers,
415 next_slot: u64,
417 mock_pot_verifier: Arc<MockPotVerfier>,
419 new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
421 acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
423 block_import: MockBlockImport<Client, Block>,
425 xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
426 mock_solution: Solution<AccountId>,
428 log_prefix: &'static str,
429 pub key: Sr25519Keyring,
431 finalize_block_depth: Option<NumberFor<Block>>,
432 base_path: BasePath,
434}
435
436pub struct MockConsensusNodeRpcConfig {
438 pub base_path: BasePath,
440 pub finalize_block_depth: Option<NumberFor<Block>>,
442 pub private_evm: bool,
444 pub evm_owner: Option<Sr25519Keyring>,
446 pub rpc_addr: Option<SocketAddr>,
448 pub rpc_port: Option<u16>,
450}
451
452impl MockConsensusNode {
453 #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
454 fn run_with_configuration(
455 mut config: Configuration,
456 key: Sr25519Keyring,
457 base_path: BasePath,
458 finalize_block_depth: Option<NumberFor<Block>>,
459 rpc_builder: Box<
460 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
461 >,
462 ) -> MockConsensusNode {
463 let log_prefix = key.into();
464
465 let tx_pool_options = Options {
466 ban_time: Duration::from_millis(0),
467 ..Default::default()
468 };
469
470 config.network.node_name = format!("{} (Consensus)", config.network.node_name);
471 let span = sc_tracing::tracing::info_span!(
472 sc_tracing::logging::PREFIX_LOG_SPAN,
473 name = config.network.node_name.as_str()
474 );
475 let _enter = span.enter();
476
477 let executor = sc_service::new_wasm_executor(&config.executor);
478
479 let (client, backend, keystore_container, mut task_manager) =
480 sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
481 .expect("Fail to new full parts");
482
483 let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
484 let client = Arc::new(client);
485 let mock_pot_verifier = Arc::new(MockPotVerfier::default());
486 let chain_constants = client
487 .runtime_api()
488 .chain_constants(client.info().best_hash)
489 .expect("Fail to get chain constants");
490 client
491 .execution_extensions()
492 .set_extensions_factory(MockExtensionsFactory::<
493 _,
494 DomainBlock,
495 sc_domains::RuntimeExecutor,
496 _,
497 >::new(
498 client.clone(),
499 domain_executor.clone(),
500 Arc::clone(&mock_pot_verifier),
501 backend.clone(),
502 chain_constants.confirmation_depth_k(),
503 ));
504
505 let select_chain = sc_consensus::LongestChain::new(backend.clone());
506 let transaction_pool = Arc::from(BasicPool::new_full(
507 tx_pool_options,
508 config.role.is_authority().into(),
509 config.prometheus_registry(),
510 task_manager.spawn_essential_handle(),
511 client.clone(),
512 ));
513
514 let block_import = MockBlockImport::<_, _>::new(client.clone());
515
516 let mut net_config = sc_network::config::FullNetworkConfiguration::<
517 _,
518 _,
519 NetworkWorker<_, _>,
520 >::new(&config.network, None);
521 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
522 xdm_gossip_peers_set_config();
523 net_config.add_notification_protocol(xdm_gossip_notification_config);
524
525 let (network_service, system_rpc_tx, tx_handler_controller, sync_service) =
526 sc_service::build_network(sc_service::BuildNetworkParams {
527 config: &config,
528 net_config,
529 client: client.clone(),
530 transaction_pool: transaction_pool.clone(),
531 spawn_handle: task_manager.spawn_handle(),
532 import_queue: mock_import_queue(
533 block_import.clone(),
534 &task_manager.spawn_essential_handle(),
535 ),
536 block_announce_validator_builder: None,
537 warp_sync_config: None,
538 block_relay: None,
539 metrics: NotificationMetrics::new(None),
540 })
541 .expect("Should be able to build network");
542
543 let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
544 network: network_service.clone(),
545 client: client.clone(),
546 keystore: keystore_container.keystore(),
547 task_manager: &mut task_manager,
548 transaction_pool: transaction_pool.clone(),
549 rpc_builder: Box::new(move |_| rpc_builder()),
550 backend: backend.clone(),
551 system_rpc_tx,
552 config,
553 telemetry: None,
554 tx_handler_controller,
555 sync_service: sync_service.clone(),
556 tracing_execute_block: None,
557 })
558 .expect("Should be able to spawn tasks");
559
560 let mock_solution =
561 Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
562
563 let mut gossip_builder = GossipWorkerBuilder::new();
564
565 task_manager
566 .spawn_essential_handle()
567 .spawn_essential_blocking(
568 "consensus-chain-channel-update-worker",
569 None,
570 Box::pin(
571 domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
572 ChainId::Consensus,
573 client.clone(),
574 sync_service.clone(),
575 gossip_builder.gossip_msg_sink(),
576 ),
577 ),
578 );
579
580 let (consensus_msg_sink, consensus_msg_receiver) =
581 tracing_unbounded("consensus_message_channel", 100);
582
583 let consensus_listener =
584 cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
585 ChainId::Consensus,
586 client.clone(),
587 client.clone(),
588 transaction_pool.clone(),
589 network_service.clone(),
590 consensus_msg_receiver,
591 domain_executor,
592 sync_service.clone(),
593 );
594
595 task_manager
596 .spawn_essential_handle()
597 .spawn_essential_blocking(
598 "consensus-message-listener",
599 None,
600 Box::pin(consensus_listener),
601 );
602
603 gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
604
605 task_manager.spawn_essential_handle().spawn_blocking(
606 "mmr-gadget",
607 None,
608 mmr_gadget::MmrGadget::start(
609 client.clone(),
610 backend.clone(),
611 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
612 ),
613 );
614
615 MockConsensusNode {
616 task_manager,
617 client,
618 backend,
619 executor,
620 transaction_pool,
621 select_chain,
622 network_service,
623 xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
624 sync_service,
625 rpc_handlers,
626 next_slot: 1,
627 mock_pot_verifier,
628 new_slot_notification_subscribers: Vec::new(),
629 acknowledgement_sender_subscribers: Vec::new(),
630 block_import,
631 xdm_gossip_worker_builder: Some(gossip_builder),
632 mock_solution,
633 log_prefix,
634 key,
635 finalize_block_depth,
636 base_path,
637 }
638 }
639
640 pub fn run(
642 tokio_handle: tokio::runtime::Handle,
643 key: Sr25519Keyring,
644 base_path: BasePath,
645 ) -> MockConsensusNode {
646 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
647 }
648
649 pub fn run_with_private_evm(
651 tokio_handle: tokio::runtime::Handle,
652 key: Sr25519Keyring,
653 evm_owner: Option<Sr25519Keyring>,
654 base_path: BasePath,
655 ) -> MockConsensusNode {
656 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
657 }
658
659 #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
661 pub fn run_with_finalization_depth(
662 tokio_handle: tokio::runtime::Handle,
663 key: Sr25519Keyring,
664 base_path: BasePath,
665 finalize_block_depth: Option<NumberFor<Block>>,
666 private_evm: bool,
667 evm_owner: Option<Sr25519Keyring>,
668 ) -> MockConsensusNode {
669 let rpc_config = MockConsensusNodeRpcConfig {
670 base_path,
671 finalize_block_depth,
672 private_evm,
673 evm_owner,
674 rpc_addr: None,
675 rpc_port: None,
676 };
677
678 Self::run_with_rpc_builder(
679 tokio_handle,
680 key,
681 rpc_config,
682 Box::new(|| Ok(RpcModule::new(()))),
683 )
684 }
685
686 pub fn run_with_rpc_builder(
688 tokio_handle: tokio::runtime::Handle,
689 key: Sr25519Keyring,
690 rpc_config: MockConsensusNodeRpcConfig,
691 rpc_builder: Box<
692 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
693 >,
694 ) -> MockConsensusNode {
695 let MockConsensusNodeRpcConfig {
696 base_path,
697 finalize_block_depth,
698 private_evm,
699 evm_owner,
700 rpc_addr,
701 rpc_port,
702 } = rpc_config;
703
704 let config = node_config(
705 tokio_handle,
706 key,
707 vec![],
708 false,
709 false,
710 false,
711 private_evm,
712 evm_owner.map(|key| key.to_account_id()),
713 base_path.clone(),
714 rpc_addr,
715 rpc_port,
716 );
717
718 Self::run_with_configuration(config, key, base_path, finalize_block_depth, rpc_builder)
719 }
720
721 #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
723 pub fn run_with_rpc_options(
724 tokio_handle: tokio::runtime::Handle,
725 key: Sr25519Keyring,
726 rpc_config: MockConsensusNodeRpcConfig,
727 ) -> MockConsensusNode {
728 Self::run_with_rpc_builder(
729 tokio_handle,
730 key,
731 rpc_config,
732 Box::new(|| Ok(RpcModule::new(()))),
733 )
734 }
735
736 pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
738 self.xdm_gossip_worker_builder
739 .as_mut()
740 .expect("gossip message worker have not started yet")
741 }
742
743 pub fn start_cross_domain_gossip_message_worker(&mut self) {
745 let xdm_gossip_worker_builder = self
746 .xdm_gossip_worker_builder
747 .take()
748 .expect("gossip message worker have not started yet");
749 let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
750 self.network_service.clone(),
751 self.xdm_gossip_notification_service
752 .take()
753 .expect("XDM gossip notification service must be used only once"),
754 self.sync_service.clone(),
755 );
756 self.task_manager
757 .spawn_essential_handle()
758 .spawn_essential_blocking(
759 "cross-domain-gossip-message-worker",
760 None,
761 Box::pin(cross_domain_message_gossip_worker.run()),
762 );
763 }
764
765 pub fn next_slot(&self) -> u64 {
767 self.next_slot
768 }
769
770 pub fn set_next_slot(&mut self, next_slot: u64) {
772 self.next_slot = next_slot;
773 }
774
775 pub fn produce_slot(&mut self) -> NewSlot {
777 let slot = Slot::from(self.next_slot);
778 let proof_of_time = PotOutput::from(
779 <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
780 .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
781 );
782 self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
783 self.next_slot += 1;
784
785 (slot, proof_of_time)
786 }
787
788 pub async fn notify_new_slot_and_wait_for_bundle(
790 &mut self,
791 new_slot: NewSlot,
792 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
793 self.new_slot_notification_subscribers
794 .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
795
796 self.confirm_acknowledgement().await;
797 self.get_bundle_from_tx_pool(new_slot)
798 }
799
800 pub async fn produce_slot_and_wait_for_bundle_submission(
802 &mut self,
803 ) -> (
804 NewSlot,
805 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
806 ) {
807 let slot = self.produce_slot();
808 for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
809 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
810 return (slot, bundle);
811 }
812 }
813 panic!(
814 "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
815 );
816 }
817
818 pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
820 &mut self,
821 operator_id: OperatorId,
822 ) -> (
823 NewSlot,
824 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
825 ) {
826 loop {
827 let slot = self.produce_slot();
828 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
829 && bundle.sealed_header().proof_of_election().operator_id == operator_id
830 {
831 return (slot, bundle);
832 }
833 }
834 }
835
836 pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
838 let (tx, rx) = mpsc::unbounded();
839 self.new_slot_notification_subscribers.push(tx);
840 rx
841 }
842
843 pub fn new_acknowledgement_sender_stream(
845 &mut self,
846 ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
847 let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
848 self.acknowledgement_sender_subscribers.push(tx);
849 rx
850 }
851
852 pub async fn confirm_acknowledgement(&mut self) {
857 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
858
859 {
862 self.acknowledgement_sender_subscribers
863 .retain(|subscriber| {
864 subscriber
865 .unbounded_send(acknowledgement_sender.clone())
866 .is_ok()
867 });
868 drop(acknowledgement_sender);
869 }
870
871 while acknowledgement_receiver.next().await.is_some() {
873 }
875 }
876
877 pub async fn confirm_block_import_processed(&mut self) {
879 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
882 {
883 let value = (NumberFor::<Block>::default(), acknowledgement_sender);
887 self.block_import
888 .block_importing_notification_subscribers
889 .lock()
890 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
891 }
892 while acknowledgement_receiver.next().await.is_some() {
893 }
895
896 self.confirm_acknowledgement().await;
898 }
899
900 pub fn block_importing_notification_stream(
902 &self,
903 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
904 self.block_import.block_importing_notification_stream()
905 }
906
907 pub fn get_bundle_from_tx_pool(
909 &self,
910 new_slot: NewSlot,
911 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
912 for ready_tx in self.transaction_pool.ready() {
913 let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
914 .expect("should be able to decode");
915 if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
916 ext.function
917 && opaque_bundle.sealed_header().slot_number() == *new_slot.0
918 {
919 return Some(opaque_bundle);
920 }
921 }
922 None
923 }
924
925 pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
927 self.transaction_pool
928 .submit_one(
929 self.client.info().best_hash,
930 TransactionSource::External,
931 tx,
932 )
933 .await
934 .map_err(|err| {
935 err.into_pool_error()
936 .expect("should always be a pool error")
937 })
938 }
939
940 pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
942 let txs: Vec<_> = self
943 .transaction_pool
944 .ready()
945 .map(|t| self.transaction_pool.hash_of(&t.data))
946 .collect();
947 self.prune_txs_from_pool(txs.as_slice()).await
948 }
949
950 pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
952 self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
953 .await
954 }
955
956 async fn prune_txs_from_pool(
957 &self,
958 tx_hashes: &[BlockHashFor<Block>],
959 ) -> Result<(), Box<dyn Error>> {
960 let hash_and_number = HashAndNumber {
961 number: self.client.info().best_number,
962 hash: self.client.info().best_hash,
963 };
964 self.transaction_pool
965 .pool()
966 .prune_known(&hash_and_number, tx_hashes);
967 tokio::time::sleep(time::Duration::from_millis(1)).await;
970 self.transaction_pool
971 .pool()
972 .validated_pool()
973 .clear_stale(&hash_and_number);
974 Ok(())
975 }
976
977 pub fn does_receipt_exist(
979 &self,
980 er_hash: BlockHashFor<DomainBlock>,
981 ) -> Result<bool, Box<dyn Error>> {
982 Ok(self
983 .client
984 .runtime_api()
985 .execution_receipt(self.client.info().best_hash, er_hash)?
986 .is_some())
987 }
988
989 pub async fn wait_for_receipt(
999 &mut self,
1000 receipt_hash: BlockHashFor<DomainBlock>,
1001 timeout: Duration,
1002 ) -> Result<(), Box<dyn Error>> {
1003 let deadline = std::time::Instant::now() + timeout;
1004 while std::time::Instant::now() < deadline {
1005 if self.does_receipt_exist(receipt_hash)? {
1006 return Ok(());
1007 }
1008 self.produce_blocks_with_bundles(1).await?;
1013 }
1014 Err(
1015 format!("wait_for_receipt timed out after {timeout:?} for receipt {receipt_hash:?}")
1016 .into(),
1017 )
1018 }
1019
1020 pub fn get_domain_staking_summary(
1022 &self,
1023 domain_id: DomainId,
1024 ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
1025 Ok(self
1026 .client
1027 .runtime_api()
1028 .domain_stake_summary(self.client.info().best_hash, domain_id)?)
1029 }
1030
1031 pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
1033 Ok(self
1034 .client
1035 .runtime_api()
1036 .block_pruning_depth(self.client.info().best_hash)?)
1037 }
1038
1039 pub fn wait_for_fraud_proof<FP>(
1042 &self,
1043 fraud_proof_predicate: FP,
1044 ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
1045 where
1046 FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
1047 {
1048 let tx_pool = self.transaction_pool.clone();
1049 let mut import_tx_stream = self.transaction_pool.import_notification_stream();
1050 Box::pin(async move {
1051 while let Some(ready_tx_hash) = import_tx_stream.next().await {
1052 let ready_tx = tx_pool
1053 .ready_transaction(&ready_tx_hash)
1054 .expect("Just get the ready tx hash from import stream; qed");
1055 let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
1056 &mut ready_tx.data.encode().as_slice(),
1057 )
1058 .expect("Decode tx must success");
1059 if let subspace_test_runtime::RuntimeCall::Domains(
1060 pallet_domains::Call::submit_fraud_proof { fraud_proof },
1061 ) = ext.function
1062 && fraud_proof_predicate(&fraud_proof)
1063 {
1064 return *fraud_proof;
1065 }
1066 }
1067 unreachable!()
1068 })
1069 }
1070
1071 pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
1073 self.client
1074 .runtime_api()
1075 .free_balance(self.client.info().best_hash, account_id)
1076 .expect("Fail to get account free balance")
1077 }
1078
1079 pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
1082 self.network_service.report_peer(
1085 addr.peer_id,
1086 ReputationChange::new_fatal("Peer banned by test (1)"),
1087 );
1088 self.network_service.report_peer(
1089 addr.peer_id,
1090 ReputationChange::new_fatal("Peer banned by test (2)"),
1091 );
1092 }
1093
1094 pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
1096 self.network_service.report_peer(
1099 addr.peer_id,
1100 ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
1101 );
1102 self.network_service.report_peer(
1103 addr.peer_id,
1104 ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
1105 );
1106 }
1107
1108 pub async fn stop(self) -> Result<(), std::io::Error> {
1114 let lock_file_path = self.base_path.path().join("paritydb").join("lock");
1115 std::mem::drop(self);
1117
1118 sleep(Duration::from_secs(2)).await;
1121
1122 if let Err(err) = std::fs::remove_file(lock_file_path) {
1124 tracing::error!("deleting paritydb lock file failed: {err:?}");
1125 }
1126 Ok(())
1127 }
1128}
1129
1130impl MockConsensusNode {
1131 async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
1132 self.transaction_pool
1133 .ready_at(parent_hash)
1134 .await
1135 .map(|pending_tx| pending_tx.data().as_ref().clone())
1136 .collect()
1137 }
1138
1139 async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
1140 let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1141 <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1142 ));
1143 let subspace_inherents =
1144 sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1145
1146 let inherent_data = (subspace_inherents, timestamp)
1147 .create_inherent_data()
1148 .await?;
1149
1150 Ok(inherent_data)
1151 }
1152
1153 fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1154 let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1155 slot,
1156 solution: self.mock_solution.clone(),
1157 pot_info: PreDigestPotInfo::V0 {
1158 proof_of_time: Default::default(),
1159 future_proof_of_time: Default::default(),
1160 },
1161 };
1162 let mut digest = Digest::default();
1163 digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1164 digest
1165 }
1166
1167 async fn build_block(
1169 &self,
1170 slot: Slot,
1171 parent_hash: BlockHashFor<Block>,
1172 extrinsics: Vec<ExtrinsicFor<Block>>,
1173 ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1174 let inherent_digest = self.mock_subspace_digest(slot);
1175
1176 let inherent_data = Self::mock_inherent_data(slot).await?;
1177
1178 let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1179 .on_parent_block(parent_hash)
1180 .fetch_parent_block_number(self.client.as_ref())?
1181 .with_inherent_digests(inherent_digest)
1182 .build()
1183 .expect("Creates new block builder");
1184
1185 let inherent_txns = block_builder.create_inherents(inherent_data)?;
1186
1187 for tx in inherent_txns.into_iter().chain(extrinsics) {
1188 if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1189 tracing::error!("Invalid transaction while building block: {}", err);
1190 }
1191 }
1192
1193 let (block, storage_changes, _) = block_builder.build()?.into_inner();
1194 Ok((block, storage_changes))
1195 }
1196
1197 async fn import_block(
1199 &self,
1200 block: Block,
1201 storage_changes: Option<StorageChanges>,
1202 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1203 let (header, body) = block.deconstruct();
1204
1205 let header_hash = header.hash();
1206 let header_number = header.number;
1207
1208 let block_import_params = {
1209 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1210 import_block.body = Some(body);
1211 import_block.state_action = match storage_changes {
1212 Some(changes) => {
1213 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1214 }
1215 None => StateAction::Execute,
1216 };
1217 import_block
1218 };
1219
1220 let import_result = self.block_import.import_block(block_import_params).await?;
1221
1222 if let Some(finalized_block_hash) = self
1223 .finalize_block_depth
1224 .and_then(|depth| header_number.checked_sub(depth))
1225 .and_then(|block_to_finalize| {
1226 self.client
1227 .hash(block_to_finalize)
1228 .expect("Block hash not found for number: {block_to_finalize:?}")
1229 })
1230 {
1231 self.client
1232 .finalize_block(finalized_block_hash, None, true)
1233 .unwrap();
1234 }
1235
1236 match import_result {
1237 ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1238 bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1239 }
1240 }
1241
1242 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1245 pub async fn produce_block_with_slot_at(
1246 &mut self,
1247 new_slot: NewSlot,
1248 parent_hash: BlockHashFor<Block>,
1249 maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1250 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1251 let block_timer = time::Instant::now();
1252
1253 let extrinsics = match maybe_extrinsics {
1254 Some(extrinsics) => extrinsics,
1255 None => self.collect_txn_from_pool(parent_hash).await,
1256 };
1257 let tx_hashes: Vec<_> = extrinsics
1258 .iter()
1259 .map(|t| self.transaction_pool.hash_of(t))
1260 .collect();
1261
1262 let (block, storage_changes) = self
1263 .build_block(new_slot.0, parent_hash, extrinsics)
1264 .await?;
1265
1266 log_new_block(&block, block_timer.elapsed().as_millis());
1267
1268 let res = match self.import_block(block, Some(storage_changes)).await {
1269 Ok(hash) => {
1270 self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1273 Ok(hash)
1274 }
1275 err => err,
1276 };
1277 self.confirm_block_import_processed().await;
1278 res
1279 }
1280
1281 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1284 pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1285 self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1286 .await?;
1287 Ok(())
1288 }
1289
1290 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1292 pub async fn produce_block_with_extrinsics(
1293 &mut self,
1294 extrinsics: Vec<ExtrinsicFor<Block>>,
1295 ) -> Result<(), Box<dyn Error>> {
1296 let slot = self.produce_slot();
1297 self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1298 .await?;
1299 Ok(())
1300 }
1301
1302 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1304 pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1305 for _ in 0..n {
1306 let slot = self.produce_slot();
1307 self.produce_block_with_slot(slot).await?;
1308 }
1309 Ok(())
1310 }
1311
1312 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1314 pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1315 for _ in 0..n {
1316 let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1317 self.produce_block_with_slot(slot).await?;
1318 }
1319 Ok(())
1320 }
1321
1322 pub fn account_nonce(&self) -> u32 {
1324 self.client
1325 .runtime_api()
1326 .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1327 .expect("Fail to get account nonce")
1328 }
1329
1330 pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1332 self.client
1333 .runtime_api()
1334 .account_nonce(self.client.info().best_hash, account_id)
1335 .expect("Fail to get account nonce")
1336 }
1337
1338 pub fn construct_extrinsic(
1340 &self,
1341 nonce: u32,
1342 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1343 ) -> UncheckedExtrinsic {
1344 construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1345 }
1346
1347 pub fn construct_unsigned_extrinsic(
1349 &self,
1350 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1351 ) -> UncheckedExtrinsic {
1352 construct_unsigned_extrinsic(&self.client, function)
1353 }
1354
1355 pub async fn construct_and_send_extrinsic_with(
1357 &self,
1358 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1359 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1360 let nonce = self.account_nonce();
1361 let extrinsic = self.construct_extrinsic(nonce, function);
1362 self.rpc_handlers.send_transaction(extrinsic.into()).await
1363 }
1364
1365 pub async fn send_extrinsic(
1367 &self,
1368 extrinsic: impl Into<OpaqueExtrinsic>,
1369 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1370 self.rpc_handlers.send_transaction(extrinsic.into()).await
1371 }
1372}
1373
1374fn log_new_block(block: &Block, used_time_ms: u128) {
1375 tracing::info!(
1376 "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1377 block.header().number(),
1378 used_time_ms,
1379 block.header().hash(),
1380 block.header().parent_hash(),
1381 block.extrinsics().len(),
1382 block
1383 .extrinsics()
1384 .iter()
1385 .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1386 .collect::<Vec<_>>()
1387 .join(", ")
1388 );
1389}
1390
1391fn mock_import_queue<Block: BlockT, I>(
1392 block_import: I,
1393 spawner: &impl SpawnEssentialNamed,
1394) -> BasicQueue<Block>
1395where
1396 I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1397{
1398 BasicQueue::new(
1399 MockVerifier::default(),
1400 Box::new(block_import),
1401 None,
1402 spawner,
1403 None,
1404 )
1405}
1406
1407struct MockVerifier<Block> {
1408 _marker: PhantomData<Block>,
1409}
1410
1411impl<Block> Default for MockVerifier<Block> {
1412 fn default() -> Self {
1413 Self {
1414 _marker: PhantomData,
1415 }
1416 }
1417}
1418
1419#[async_trait::async_trait]
1420impl<Block> VerifierT<Block> for MockVerifier<Block>
1421where
1422 Block: BlockT,
1423{
1424 async fn verify(
1425 &self,
1426 block_params: BlockImportParams<Block>,
1427 ) -> Result<BlockImportParams<Block>, String> {
1428 Ok(block_params)
1429 }
1430}
1431
1432#[allow(clippy::type_complexity)]
1435struct MockBlockImport<Client, Block: BlockT> {
1436 inner: Arc<Client>,
1437 block_importing_notification_subscribers:
1438 Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1439}
1440
1441impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1442 fn new(inner: Arc<Client>) -> Self {
1443 MockBlockImport {
1444 inner,
1445 block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1446 }
1447 }
1448
1449 fn block_importing_notification_stream(
1451 &self,
1452 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1453 let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1454 self.block_importing_notification_subscribers
1455 .lock()
1456 .push(tx);
1457 rx
1458 }
1459}
1460
1461impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1462 fn clone(&self) -> Self {
1463 MockBlockImport {
1464 inner: self.inner.clone(),
1465 block_importing_notification_subscribers: self
1466 .block_importing_notification_subscribers
1467 .clone(),
1468 }
1469 }
1470}
1471
1472#[async_trait::async_trait]
1473impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1474where
1475 Block: BlockT,
1476 for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1477 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1478 Client::Api: ApiExt<Block>,
1479{
1480 type Error = ConsensusError;
1481
1482 async fn import_block(
1483 &self,
1484 mut block: BlockImportParams<Block>,
1485 ) -> Result<ImportResult, Self::Error> {
1486 let block_number = *block.header.number();
1487 block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1488
1489 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1490
1491 {
1494 let value = (block_number, acknowledgement_sender);
1495 self.block_importing_notification_subscribers
1496 .lock()
1497 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1498 }
1499
1500 while acknowledgement_receiver.next().await.is_some() {
1501 }
1503
1504 self.inner.import_block(block).await
1505 }
1506
1507 async fn check_block(
1508 &self,
1509 block: BlockCheckParams<Block>,
1510 ) -> Result<ImportResult, Self::Error> {
1511 self.inner.check_block(block).await
1512 }
1513}
1514
1515#[macro_export]
1517macro_rules! produce_blocks {
1518 ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1519 {
1520 async {
1521 let domain_fut = {
1522 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1523 futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1524 $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1525 futures::future::join_all(futs)
1526 };
1527 $primary_node.produce_blocks_with_bundles($count).await?;
1528 domain_fut.await;
1529 Ok::<(), Box<dyn std::error::Error>>(())
1530 }
1531 }
1532 };
1533}
1534
1535#[macro_export]
1538macro_rules! produce_block_with {
1539 ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1540 {
1541 async {
1542 let domain_fut = {
1543 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1544 futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1545 $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1546 futures::future::join_all(futs)
1547 };
1548 $primary_node_produce_block.await?;
1549 domain_fut.await;
1550 Ok::<(), Box<dyn std::error::Error>>(())
1551 }
1552 }
1553 };
1554}
1555
1556#[macro_export]
1558macro_rules! produce_blocks_until {
1559 ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1560 async {
1561 while !$condition {
1562 produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1563 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1564 }
1565 Ok::<(), Box<dyn std::error::Error>>(())
1566 }
1567 };
1568}
1569
1570type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1571
1572fn get_signed_extra(
1573 current_block: u64,
1574 immortal: bool,
1575 nonce: u32,
1576 tip: BalanceOf<Runtime>,
1577) -> SignedExtra {
1578 let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1579 .checked_next_power_of_two()
1580 .map(|c| c / 2)
1581 .unwrap_or(2);
1582 (
1583 frame_system::CheckNonZeroSender::<Runtime>::new(),
1584 frame_system::CheckSpecVersion::<Runtime>::new(),
1585 frame_system::CheckTxVersion::<Runtime>::new(),
1586 frame_system::CheckGenesis::<Runtime>::new(),
1587 frame_system::CheckMortality::<Runtime>::from(if immortal {
1588 generic::Era::Immortal
1589 } else {
1590 generic::Era::mortal(period, current_block)
1591 }),
1592 frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1593 frame_system::CheckWeight::<Runtime>::new(),
1594 pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1595 BalanceTransferCheckExtension::<Runtime>::default(),
1596 pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1597 pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1598 pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1599 )
1600}
1601
1602fn construct_extrinsic_raw_payload<Client>(
1603 client: impl AsRef<Client>,
1604 function: <Runtime as frame_system::Config>::RuntimeCall,
1605 immortal: bool,
1606 nonce: u32,
1607 tip: BalanceOf<Runtime>,
1608) -> (
1609 SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1610 SignedExtra,
1611)
1612where
1613 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1614 u64: From<BlockNumberFor<Runtime>>,
1615 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1616{
1617 let current_block_hash = client.as_ref().info().best_hash;
1618 let current_block = client.as_ref().info().best_number.saturated_into();
1619 let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1620 let extra = get_signed_extra(current_block, immortal, nonce, tip);
1621 (
1622 generic::SignedPayload::<
1623 <Runtime as frame_system::Config>::RuntimeCall,
1624 SignedExtra,
1625 >::from_raw(
1626 function,
1627 extra.clone(),
1628 ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1629 ),
1630 extra,
1631 )
1632}
1633
1634pub fn construct_extrinsic_generic<Client>(
1636 client: impl AsRef<Client>,
1637 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1638 caller: Sr25519Keyring,
1639 immortal: bool,
1640 nonce: u32,
1641 tip: BalanceOf<Runtime>,
1642) -> UncheckedExtrinsic
1643where
1644 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1645 u64: From<BlockNumberFor<Runtime>>,
1646 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1647{
1648 let function = function.into();
1649 let (raw_payload, extra) =
1650 construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1651 let signature = raw_payload.using_encoded(|e| caller.sign(e));
1652 UncheckedExtrinsic::new_signed(
1653 function,
1654 MultiAddress::Id(caller.to_account_id()),
1655 Signature::Sr25519(signature),
1656 extra,
1657 )
1658}
1659
1660fn construct_unsigned_extrinsic<Client>(
1662 client: impl AsRef<Client>,
1663 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1664) -> UncheckedExtrinsic
1665where
1666 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1667 u64: From<BlockNumberFor<Runtime>>,
1668 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1669{
1670 let function = function.into();
1671 let current_block = client.as_ref().info().best_number.saturated_into();
1672 let extra = get_signed_extra(current_block, true, 0, 0);
1673 UncheckedExtrinsic::new_transaction(function, extra)
1674}