1#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34 BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41 NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44 DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45 OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, RpcEndpoint,
46 WasmExecutionMethod, WasmtimeInstantiationStrategy,
47};
48use sc_service::{
49 BasePath, BlocksPruning, Configuration, NetworkStarter, Role, SpawnTasksParams, TaskManager,
50};
51use sc_transaction_pool::{BasicPool, FullChainApi, Options};
52use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
53use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
54use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
55use sp_api::{ApiExt, ProvideRuntimeApi};
56use sp_blockchain::{HashAndNumber, HeaderBackend};
57use sp_consensus::{BlockOrigin, Error as ConsensusError};
58use sp_consensus_slots::Slot;
59use sp_consensus_subspace::digests::{
60 CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
61};
62use sp_consensus_subspace::{PotExtension, SubspaceApi};
63use sp_core::H256;
64use sp_core::offchain::OffchainDbExt;
65use sp_core::offchain::storage::OffchainDb;
66use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
67use sp_domains::bundle::OpaqueBundle;
68use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
69use sp_domains_fraud_proof::fraud_proof::FraudProof;
70use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
71use sp_externalities::Extensions;
72use sp_inherents::{InherentData, InherentDataProvider};
73use sp_keyring::Sr25519Keyring;
74use sp_messenger::MessengerApi;
75use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
76use sp_mmr_primitives::MmrApi;
77use sp_runtime::generic::{Digest, SignedPayload};
78use sp_runtime::traits::{
79 BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
80};
81use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
82use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
83use sp_timestamp::Timestamp;
84use std::collections::HashMap;
85use std::error::Error;
86use std::marker::PhantomData;
87use std::net::SocketAddr;
88use std::pin::Pin;
89use std::sync::Arc;
90use std::time;
91use std::time::Duration;
92use subspace_core_primitives::pot::PotOutput;
93use subspace_core_primitives::solutions::Solution;
94use subspace_core_primitives::{BlockNumber, PublicKey};
95use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
96use subspace_runtime_primitives::opaque::Block;
97use subspace_runtime_primitives::{
98 AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
99};
100use subspace_service::{FullSelectChain, RuntimeExecutor};
101use subspace_test_client::{Backend, Client, chain_spec};
102use subspace_test_primitives::OnchainStateApi;
103use subspace_test_runtime::{
104 Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
105};
106use substrate_frame_rpc_system::AccountNonceApi;
107use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
108use tokio::time::sleep;
109
110pub type FraudProofFor<Block, DomainBlock> =
112 FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
113
114const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
115
116#[expect(clippy::too_many_arguments)]
121pub fn node_config(
122 tokio_handle: tokio::runtime::Handle,
123 key: Sr25519Keyring,
124 boot_nodes: Vec<MultiaddrWithPeerId>,
125 run_farmer: bool,
126 force_authoring: bool,
127 force_synced: bool,
128 private_evm: bool,
129 evm_owner_account: Option<AccountId>,
130 base_path: BasePath,
131 rpc_addr: Option<SocketAddr>,
132 rpc_port: Option<u16>,
133) -> Configuration {
134 let root = base_path.path();
135 let role = if run_farmer {
136 Role::Authority
137 } else {
138 Role::Full
139 };
140 let key_seed = key.to_seed();
141 let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
142
143 let mut network_config = NetworkConfiguration::new(
144 key_seed.to_string(),
145 "network/test/0.1",
146 Default::default(),
147 None,
148 );
149
150 network_config.boot_nodes = boot_nodes;
151
152 network_config.allow_non_globals_in_dht = true;
153
154 let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
155 network_config.listen_addresses.push(addr.clone());
156
157 network_config.public_addresses.push(addr);
158
159 network_config.transport = TransportConfig::MemoryOnly;
160
161 network_config.force_synced = force_synced;
162
163 let rpc_configuration = match rpc_addr {
164 Some(listen_addr) => {
165 let port = rpc_port.unwrap_or(9944);
166 RpcConfiguration {
167 addr: Some(vec![RpcEndpoint {
168 batch_config: RpcBatchRequestConfig::Disabled,
169 max_connections: 100,
170 listen_addr,
171 rpc_methods: Default::default(),
172 rate_limit: None,
173 rate_limit_trust_proxy_headers: false,
174 rate_limit_whitelisted_ips: vec![],
175 max_payload_in_mb: 15,
176 max_payload_out_mb: 15,
177 max_subscriptions_per_connection: 100,
178 max_buffer_capacity_per_connection: 100,
179 cors: None,
180 retry_random_port: true,
181 is_optional: false,
182 }]),
183 max_request_size: 15,
184 max_response_size: 15,
185 id_provider: None,
186 max_subs_per_conn: 1024,
187 port,
188 message_buffer_capacity: 1024,
189 batch_config: RpcBatchRequestConfig::Disabled,
190 max_connections: 1000,
191 cors: None,
192 methods: Default::default(),
193 rate_limit: None,
194 rate_limit_whitelisted_ips: vec![],
195 rate_limit_trust_proxy_headers: false,
196 }
197 }
198 None => RpcConfiguration {
199 addr: None,
200 max_request_size: 0,
201 max_response_size: 0,
202 id_provider: None,
203 max_subs_per_conn: 0,
204 port: 0,
205 message_buffer_capacity: 0,
206 batch_config: RpcBatchRequestConfig::Disabled,
207 max_connections: 0,
208 cors: None,
209 methods: Default::default(),
210 rate_limit: None,
211 rate_limit_whitelisted_ips: vec![],
212 rate_limit_trust_proxy_headers: false,
213 },
214 };
215
216 Configuration {
217 impl_name: "subspace-test-node".to_string(),
218 impl_version: "0.1".to_string(),
219 role,
220 tokio_handle,
221 transaction_pool: Default::default(),
222 network: network_config,
223 keystore: KeystoreConfig::InMemory,
224 database: DatabaseSource::ParityDb {
225 path: root.join("paritydb"),
226 },
227 trie_cache_maximum_size: Some(64 * 1024 * 1024),
228 state_pruning: Default::default(),
229 blocks_pruning: BlocksPruning::KeepAll,
230 chain_spec: Box::new(spec),
231 executor: ExecutorConfiguration {
232 wasm_method: WasmExecutionMethod::Compiled {
233 instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
234 },
235 max_runtime_instances: 8,
236 default_heap_pages: None,
237 runtime_cache_size: 2,
238 },
239 wasm_runtime_overrides: Default::default(),
240 rpc: rpc_configuration,
241 prometheus_config: None,
242 telemetry_endpoints: None,
243 offchain_worker: OffchainWorkerConfig {
244 enabled: false,
245 indexing_enabled: true,
246 },
247 force_authoring,
248 disable_grandpa: false,
249 dev_key_seed: Some(key_seed),
250 tracing_targets: None,
251 tracing_receiver: Default::default(),
252 announce_block: true,
253 data_path: base_path.path().into(),
254 base_path,
255 }
256}
257
258type StorageChanges = sp_api::StorageChanges<Block>;
259
260struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
261 consensus_client: Arc<Client>,
262 consensus_backend: Arc<CBackend>,
263 executor: Arc<Executor>,
264 mock_pot_verifier: Arc<MockPotVerfier>,
265 confirmation_depth_k: BlockNumber,
266 _phantom: PhantomData<DomainBlock>,
267}
268
269impl<Client, DomainBlock, Executor, CBackend>
270 MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
271{
272 fn new(
273 consensus_client: Arc<Client>,
274 executor: Arc<Executor>,
275 mock_pot_verifier: Arc<MockPotVerfier>,
276 consensus_backend: Arc<CBackend>,
277 confirmation_depth_k: BlockNumber,
278 ) -> Self {
279 Self {
280 consensus_client,
281 consensus_backend,
282 executor,
283 mock_pot_verifier,
284 confirmation_depth_k,
285 _phantom: Default::default(),
286 }
287 }
288}
289
290#[derive(Default)]
291struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
292
293impl MockPotVerfier {
294 fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
295 self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
296 }
297
298 fn inject_pot(&self, slot: u64, pot: PotOutput) {
299 self.0.lock().insert(slot, pot);
300 }
301}
302
303impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
304 for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
305where
306 Block: BlockT,
307 Block::Hash: From<H256> + Into<H256>,
308 DomainBlock: BlockT,
309 DomainBlock::Hash: Into<H256> + From<H256>,
310 Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
311 Client::Api: DomainsApi<Block, DomainBlock::Header>
312 + BundleProducerElectionApi<Block, Balance>
313 + MessengerApi<Block, NumberFor<Block>, Block::Hash>
314 + MmrApi<Block, H256, NumberFor<Block>>,
315 Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
316 CBackend: BackendT<Block> + 'static,
317{
318 fn extensions_for(
319 &self,
320 _block_hash: Block::Hash,
321 _block_number: NumberFor<Block>,
322 ) -> Extensions {
323 let confirmation_depth_k = self.confirmation_depth_k;
324 let mut exts = Extensions::new();
325 exts.register(FraudProofExtension::new(Arc::new(
326 FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
327 self.consensus_client.clone(),
328 self.executor.clone(),
329 move |client, executor| {
330 let extension_factory =
331 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
332 client,
333 executor,
334 confirmation_depth_k,
335 );
336 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
337 },
338 ),
339 )));
340 exts.register(SubspaceMmrExtension::new(Arc::new(
341 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
342 self.consensus_client.clone(),
343 confirmation_depth_k,
344 ),
345 )));
346 exts.register(MessengerExtension::new(Arc::new(
347 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
348 self.consensus_client.clone(),
349 self.executor.clone(),
350 ),
351 )));
352
353 if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
354 let offchain_db = OffchainDb::new(offchain_storage);
355 exts.register(OffchainDbExt::new(offchain_db));
356 }
357 exts.register(PotExtension::new({
358 let client = Arc::clone(&self.consensus_client);
359 let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
360 Box::new(
361 move |parent_hash, slot, proof_of_time, _quick_verification| {
362 let parent_hash = {
363 let mut converted_parent_hash = Block::Hash::default();
364 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
365 converted_parent_hash
366 };
367
368 let parent_header = match client.header(parent_hash) {
369 Ok(Some(parent_header)) => parent_header,
370 _ => return false,
371 };
372 let parent_pre_digest = match extract_pre_digest(&parent_header) {
373 Ok(parent_pre_digest) => parent_pre_digest,
374 _ => return false,
375 };
376
377 let parent_slot = parent_pre_digest.slot();
378 if slot <= *parent_slot {
379 return false;
380 }
381
382 mock_pot_verifier.is_valid(slot, proof_of_time)
383 },
384 )
385 }));
386 exts
387 }
388}
389
390type NewSlot = (Slot, PotOutput);
391
392pub struct MockConsensusNode {
394 pub task_manager: TaskManager,
396 pub client: Arc<Client>,
398 pub backend: Arc<Backend>,
400 pub executor: RuntimeExecutor,
402 pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
404 pub select_chain: FullSelectChain,
406 pub network_service: Arc<dyn NetworkService + Send + Sync>,
408 pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
410 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
412 pub rpc_handlers: sc_service::RpcHandlers,
414 pub network_starter: Option<NetworkStarter>,
416 next_slot: u64,
418 mock_pot_verifier: Arc<MockPotVerfier>,
420 new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
422 acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
424 block_import: MockBlockImport<Client, Block>,
426 xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
427 mock_solution: Solution<AccountId>,
429 log_prefix: &'static str,
430 pub key: Sr25519Keyring,
432 finalize_block_depth: Option<NumberFor<Block>>,
433 base_path: BasePath,
435}
436
437pub struct MockConsensusNodeRpcConfig {
439 pub base_path: BasePath,
441 pub finalize_block_depth: Option<NumberFor<Block>>,
443 pub private_evm: bool,
445 pub evm_owner: Option<Sr25519Keyring>,
447 pub rpc_addr: Option<SocketAddr>,
449 pub rpc_port: Option<u16>,
451}
452
453impl MockConsensusNode {
454 fn run_with_configuration(
455 mut config: Configuration,
456 key: Sr25519Keyring,
457 base_path: BasePath,
458 finalize_block_depth: Option<NumberFor<Block>>,
459 rpc_builder: Box<
460 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
461 >,
462 ) -> MockConsensusNode {
463 let log_prefix = key.into();
464
465 let tx_pool_options = Options {
466 ban_time: Duration::from_millis(0),
467 ..Default::default()
468 };
469
470 config.network.node_name = format!("{} (Consensus)", config.network.node_name);
471 let span = sc_tracing::tracing::info_span!(
472 sc_tracing::logging::PREFIX_LOG_SPAN,
473 name = config.network.node_name.as_str()
474 );
475 let _enter = span.enter();
476
477 let executor = sc_service::new_wasm_executor(&config.executor);
478
479 let (client, backend, keystore_container, mut task_manager) =
480 sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
481 .expect("Fail to new full parts");
482
483 let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
484 let client = Arc::new(client);
485 let mock_pot_verifier = Arc::new(MockPotVerfier::default());
486 let chain_constants = client
487 .runtime_api()
488 .chain_constants(client.info().best_hash)
489 .expect("Fail to get chain constants");
490 client
491 .execution_extensions()
492 .set_extensions_factory(MockExtensionsFactory::<
493 _,
494 DomainBlock,
495 sc_domains::RuntimeExecutor,
496 _,
497 >::new(
498 client.clone(),
499 domain_executor.clone(),
500 Arc::clone(&mock_pot_verifier),
501 backend.clone(),
502 chain_constants.confirmation_depth_k(),
503 ));
504
505 let select_chain = sc_consensus::LongestChain::new(backend.clone());
506 let transaction_pool = Arc::from(BasicPool::new_full(
507 tx_pool_options,
508 config.role.is_authority().into(),
509 config.prometheus_registry(),
510 task_manager.spawn_essential_handle(),
511 client.clone(),
512 ));
513
514 let block_import = MockBlockImport::<_, _>::new(client.clone());
515
516 let mut net_config = sc_network::config::FullNetworkConfiguration::<
517 _,
518 _,
519 NetworkWorker<_, _>,
520 >::new(&config.network, None);
521 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
522 xdm_gossip_peers_set_config();
523 net_config.add_notification_protocol(xdm_gossip_notification_config);
524
525 let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
526 sc_service::build_network(sc_service::BuildNetworkParams {
527 config: &config,
528 net_config,
529 client: client.clone(),
530 transaction_pool: transaction_pool.clone(),
531 spawn_handle: task_manager.spawn_handle(),
532 import_queue: mock_import_queue(
533 block_import.clone(),
534 &task_manager.spawn_essential_handle(),
535 ),
536 block_announce_validator_builder: None,
537 warp_sync_config: None,
538 block_relay: None,
539 metrics: NotificationMetrics::new(None),
540 })
541 .expect("Should be able to build network");
542
543 let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
544 network: network_service.clone(),
545 client: client.clone(),
546 keystore: keystore_container.keystore(),
547 task_manager: &mut task_manager,
548 transaction_pool: transaction_pool.clone(),
549 rpc_builder: Box::new(move |_| rpc_builder()),
550 backend: backend.clone(),
551 system_rpc_tx,
552 config,
553 telemetry: None,
554 tx_handler_controller,
555 sync_service: sync_service.clone(),
556 })
557 .expect("Should be able to spawn tasks");
558
559 let mock_solution =
560 Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
561
562 let mut gossip_builder = GossipWorkerBuilder::new();
563
564 task_manager
565 .spawn_essential_handle()
566 .spawn_essential_blocking(
567 "consensus-chain-channel-update-worker",
568 None,
569 Box::pin(
570 domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
571 ChainId::Consensus,
572 client.clone(),
573 sync_service.clone(),
574 gossip_builder.gossip_msg_sink(),
575 ),
576 ),
577 );
578
579 let (consensus_msg_sink, consensus_msg_receiver) =
580 tracing_unbounded("consensus_message_channel", 100);
581
582 let consensus_listener =
583 cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
584 ChainId::Consensus,
585 client.clone(),
586 client.clone(),
587 transaction_pool.clone(),
588 network_service.clone(),
589 consensus_msg_receiver,
590 domain_executor,
591 sync_service.clone(),
592 );
593
594 task_manager
595 .spawn_essential_handle()
596 .spawn_essential_blocking(
597 "consensus-message-listener",
598 None,
599 Box::pin(consensus_listener),
600 );
601
602 gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
603
604 task_manager.spawn_essential_handle().spawn_blocking(
605 "mmr-gadget",
606 None,
607 mmr_gadget::MmrGadget::start(
608 client.clone(),
609 backend.clone(),
610 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
611 ),
612 );
613
614 MockConsensusNode {
615 task_manager,
616 client,
617 backend,
618 executor,
619 transaction_pool,
620 select_chain,
621 network_service,
622 xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
623 sync_service,
624 rpc_handlers,
625 network_starter: Some(network_starter),
626 next_slot: 1,
627 mock_pot_verifier,
628 new_slot_notification_subscribers: Vec::new(),
629 acknowledgement_sender_subscribers: Vec::new(),
630 block_import,
631 xdm_gossip_worker_builder: Some(gossip_builder),
632 mock_solution,
633 log_prefix,
634 key,
635 finalize_block_depth,
636 base_path,
637 }
638 }
639
640 pub fn run(
642 tokio_handle: tokio::runtime::Handle,
643 key: Sr25519Keyring,
644 base_path: BasePath,
645 ) -> MockConsensusNode {
646 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
647 }
648
649 pub fn run_with_private_evm(
651 tokio_handle: tokio::runtime::Handle,
652 key: Sr25519Keyring,
653 evm_owner: Option<Sr25519Keyring>,
654 base_path: BasePath,
655 ) -> MockConsensusNode {
656 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
657 }
658
659 pub fn run_with_finalization_depth(
661 tokio_handle: tokio::runtime::Handle,
662 key: Sr25519Keyring,
663 base_path: BasePath,
664 finalize_block_depth: Option<NumberFor<Block>>,
665 private_evm: bool,
666 evm_owner: Option<Sr25519Keyring>,
667 ) -> MockConsensusNode {
668 let rpc_config = MockConsensusNodeRpcConfig {
669 base_path,
670 finalize_block_depth,
671 private_evm,
672 evm_owner,
673 rpc_addr: None,
674 rpc_port: None,
675 };
676
677 Self::run_with_rpc_builder(
678 tokio_handle,
679 key,
680 rpc_config,
681 Box::new(|| Ok(RpcModule::new(()))),
682 )
683 }
684
685 pub fn run_with_rpc_builder(
687 tokio_handle: tokio::runtime::Handle,
688 key: Sr25519Keyring,
689 rpc_config: MockConsensusNodeRpcConfig,
690 rpc_builder: Box<
691 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
692 >,
693 ) -> MockConsensusNode {
694 let MockConsensusNodeRpcConfig {
695 base_path,
696 finalize_block_depth,
697 private_evm,
698 evm_owner,
699 rpc_addr,
700 rpc_port,
701 } = rpc_config;
702
703 let config = node_config(
704 tokio_handle,
705 key,
706 vec![],
707 false,
708 false,
709 false,
710 private_evm,
711 evm_owner.map(|key| key.to_account_id()),
712 base_path.clone(),
713 rpc_addr,
714 rpc_port,
715 );
716
717 Self::run_with_configuration(config, key, base_path, finalize_block_depth, rpc_builder)
718 }
719
720 pub fn run_with_rpc_options(
722 tokio_handle: tokio::runtime::Handle,
723 key: Sr25519Keyring,
724 rpc_config: MockConsensusNodeRpcConfig,
725 ) -> MockConsensusNode {
726 Self::run_with_rpc_builder(
727 tokio_handle,
728 key,
729 rpc_config,
730 Box::new(|| Ok(RpcModule::new(()))),
731 )
732 }
733
734 pub fn start_network(&mut self) {
736 self.network_starter
737 .take()
738 .expect("mock consensus node network have not started yet")
739 .start_network();
740 }
741
742 pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
744 self.xdm_gossip_worker_builder
745 .as_mut()
746 .expect("gossip message worker have not started yet")
747 }
748
749 pub fn start_cross_domain_gossip_message_worker(&mut self) {
751 let xdm_gossip_worker_builder = self
752 .xdm_gossip_worker_builder
753 .take()
754 .expect("gossip message worker have not started yet");
755 let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
756 self.network_service.clone(),
757 self.xdm_gossip_notification_service
758 .take()
759 .expect("XDM gossip notification service must be used only once"),
760 self.sync_service.clone(),
761 );
762 self.task_manager
763 .spawn_essential_handle()
764 .spawn_essential_blocking(
765 "cross-domain-gossip-message-worker",
766 None,
767 Box::pin(cross_domain_message_gossip_worker.run()),
768 );
769 }
770
771 pub fn next_slot(&self) -> u64 {
773 self.next_slot
774 }
775
776 pub fn set_next_slot(&mut self, next_slot: u64) {
778 self.next_slot = next_slot;
779 }
780
781 pub fn produce_slot(&mut self) -> NewSlot {
783 let slot = Slot::from(self.next_slot);
784 let proof_of_time = PotOutput::from(
785 <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
786 .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
787 );
788 self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
789 self.next_slot += 1;
790
791 (slot, proof_of_time)
792 }
793
794 pub async fn notify_new_slot_and_wait_for_bundle(
796 &mut self,
797 new_slot: NewSlot,
798 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
799 self.new_slot_notification_subscribers
800 .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
801
802 self.confirm_acknowledgement().await;
803 self.get_bundle_from_tx_pool(new_slot)
804 }
805
806 pub async fn produce_slot_and_wait_for_bundle_submission(
808 &mut self,
809 ) -> (
810 NewSlot,
811 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
812 ) {
813 let slot = self.produce_slot();
814 for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
815 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
816 return (slot, bundle);
817 }
818 }
819 panic!(
820 "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
821 );
822 }
823
824 pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
826 &mut self,
827 operator_id: OperatorId,
828 ) -> (
829 NewSlot,
830 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
831 ) {
832 loop {
833 let slot = self.produce_slot();
834 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
835 && bundle.sealed_header().proof_of_election().operator_id == operator_id
836 {
837 return (slot, bundle);
838 }
839 }
840 }
841
842 pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
844 let (tx, rx) = mpsc::unbounded();
845 self.new_slot_notification_subscribers.push(tx);
846 rx
847 }
848
849 pub fn new_acknowledgement_sender_stream(
851 &mut self,
852 ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
853 let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
854 self.acknowledgement_sender_subscribers.push(tx);
855 rx
856 }
857
858 pub async fn confirm_acknowledgement(&mut self) {
863 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
864
865 {
868 self.acknowledgement_sender_subscribers
869 .retain(|subscriber| {
870 subscriber
871 .unbounded_send(acknowledgement_sender.clone())
872 .is_ok()
873 });
874 drop(acknowledgement_sender);
875 }
876
877 while acknowledgement_receiver.next().await.is_some() {
879 }
881 }
882
883 pub async fn confirm_block_import_processed(&mut self) {
885 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
888 {
889 let value = (NumberFor::<Block>::default(), acknowledgement_sender);
893 self.block_import
894 .block_importing_notification_subscribers
895 .lock()
896 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
897 }
898 while acknowledgement_receiver.next().await.is_some() {
899 }
901
902 self.confirm_acknowledgement().await;
904 }
905
906 pub fn block_importing_notification_stream(
908 &self,
909 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
910 self.block_import.block_importing_notification_stream()
911 }
912
913 pub fn get_bundle_from_tx_pool(
915 &self,
916 new_slot: NewSlot,
917 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
918 for ready_tx in self.transaction_pool.ready() {
919 let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
920 .expect("should be able to decode");
921 if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
922 ext.function
923 && opaque_bundle.sealed_header().slot_number() == *new_slot.0
924 {
925 return Some(opaque_bundle);
926 }
927 }
928 None
929 }
930
931 pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
933 self.transaction_pool
934 .submit_one(
935 self.client.info().best_hash,
936 TransactionSource::External,
937 tx,
938 )
939 .await
940 .map_err(|err| {
941 err.into_pool_error()
942 .expect("should always be a pool error")
943 })
944 }
945
946 pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
948 let txs: Vec<_> = self
949 .transaction_pool
950 .ready()
951 .map(|t| self.transaction_pool.hash_of(&t.data))
952 .collect();
953 self.prune_txs_from_pool(txs.as_slice()).await
954 }
955
956 pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
958 self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
959 .await
960 }
961
962 async fn prune_txs_from_pool(
963 &self,
964 tx_hashes: &[BlockHashFor<Block>],
965 ) -> Result<(), Box<dyn Error>> {
966 let hash_and_number = HashAndNumber {
967 number: self.client.info().best_number,
968 hash: self.client.info().best_hash,
969 };
970 self.transaction_pool
971 .pool()
972 .prune_known(&hash_and_number, tx_hashes);
973 tokio::time::sleep(time::Duration::from_millis(1)).await;
976 self.transaction_pool
977 .pool()
978 .validated_pool()
979 .clear_stale(&hash_and_number);
980 Ok(())
981 }
982
983 pub fn does_receipt_exist(
985 &self,
986 er_hash: BlockHashFor<DomainBlock>,
987 ) -> Result<bool, Box<dyn Error>> {
988 Ok(self
989 .client
990 .runtime_api()
991 .execution_receipt(self.client.info().best_hash, er_hash)?
992 .is_some())
993 }
994
995 pub fn get_domain_staking_summary(
997 &self,
998 domain_id: DomainId,
999 ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
1000 Ok(self
1001 .client
1002 .runtime_api()
1003 .domain_stake_summary(self.client.info().best_hash, domain_id)?)
1004 }
1005
1006 pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
1008 Ok(self
1009 .client
1010 .runtime_api()
1011 .block_pruning_depth(self.client.info().best_hash)?)
1012 }
1013
1014 pub fn wait_for_fraud_proof<FP>(
1017 &self,
1018 fraud_proof_predicate: FP,
1019 ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
1020 where
1021 FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
1022 {
1023 let tx_pool = self.transaction_pool.clone();
1024 let mut import_tx_stream = self.transaction_pool.import_notification_stream();
1025 Box::pin(async move {
1026 while let Some(ready_tx_hash) = import_tx_stream.next().await {
1027 let ready_tx = tx_pool
1028 .ready_transaction(&ready_tx_hash)
1029 .expect("Just get the ready tx hash from import stream; qed");
1030 let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
1031 &mut ready_tx.data.encode().as_slice(),
1032 )
1033 .expect("Decode tx must success");
1034 if let subspace_test_runtime::RuntimeCall::Domains(
1035 pallet_domains::Call::submit_fraud_proof { fraud_proof },
1036 ) = ext.function
1037 && fraud_proof_predicate(&fraud_proof)
1038 {
1039 return *fraud_proof;
1040 }
1041 }
1042 unreachable!()
1043 })
1044 }
1045
1046 pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
1048 self.client
1049 .runtime_api()
1050 .free_balance(self.client.info().best_hash, account_id)
1051 .expect("Fail to get account free balance")
1052 }
1053
1054 pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
1057 self.network_service.report_peer(
1060 addr.peer_id,
1061 ReputationChange::new_fatal("Peer banned by test (1)"),
1062 );
1063 self.network_service.report_peer(
1064 addr.peer_id,
1065 ReputationChange::new_fatal("Peer banned by test (2)"),
1066 );
1067 }
1068
1069 pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
1071 self.network_service.report_peer(
1074 addr.peer_id,
1075 ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
1076 );
1077 self.network_service.report_peer(
1078 addr.peer_id,
1079 ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
1080 );
1081 }
1082
1083 pub async fn stop(self) -> Result<(), std::io::Error> {
1089 let lock_file_path = self.base_path.path().join("paritydb").join("lock");
1090 std::mem::drop(self);
1092
1093 sleep(Duration::from_secs(2)).await;
1096
1097 if let Err(err) = std::fs::remove_file(lock_file_path) {
1099 tracing::error!("deleting paritydb lock file failed: {err:?}");
1100 }
1101 Ok(())
1102 }
1103}
1104
1105impl MockConsensusNode {
1106 async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
1107 self.transaction_pool
1108 .ready_at(parent_hash)
1109 .await
1110 .map(|pending_tx| pending_tx.data().as_ref().clone())
1111 .collect()
1112 }
1113
1114 async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
1115 let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1116 <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1117 ));
1118 let subspace_inherents =
1119 sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1120
1121 let inherent_data = (subspace_inherents, timestamp)
1122 .create_inherent_data()
1123 .await?;
1124
1125 Ok(inherent_data)
1126 }
1127
1128 fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1129 let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1130 slot,
1131 solution: self.mock_solution.clone(),
1132 pot_info: PreDigestPotInfo::V0 {
1133 proof_of_time: Default::default(),
1134 future_proof_of_time: Default::default(),
1135 },
1136 };
1137 let mut digest = Digest::default();
1138 digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1139 digest
1140 }
1141
1142 async fn build_block(
1144 &self,
1145 slot: Slot,
1146 parent_hash: BlockHashFor<Block>,
1147 extrinsics: Vec<ExtrinsicFor<Block>>,
1148 ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1149 let inherent_digest = self.mock_subspace_digest(slot);
1150
1151 let inherent_data = Self::mock_inherent_data(slot).await?;
1152
1153 let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1154 .on_parent_block(parent_hash)
1155 .fetch_parent_block_number(self.client.as_ref())?
1156 .with_inherent_digests(inherent_digest)
1157 .build()
1158 .expect("Creates new block builder");
1159
1160 let inherent_txns = block_builder.create_inherents(inherent_data)?;
1161
1162 for tx in inherent_txns.into_iter().chain(extrinsics) {
1163 if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1164 tracing::error!("Invalid transaction while building block: {}", err);
1165 }
1166 }
1167
1168 let (block, storage_changes, _) = block_builder.build()?.into_inner();
1169 Ok((block, storage_changes))
1170 }
1171
1172 async fn import_block(
1174 &self,
1175 block: Block,
1176 storage_changes: Option<StorageChanges>,
1177 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1178 let (header, body) = block.deconstruct();
1179
1180 let header_hash = header.hash();
1181 let header_number = header.number;
1182
1183 let block_import_params = {
1184 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1185 import_block.body = Some(body);
1186 import_block.state_action = match storage_changes {
1187 Some(changes) => {
1188 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1189 }
1190 None => StateAction::Execute,
1191 };
1192 import_block
1193 };
1194
1195 let import_result = self.block_import.import_block(block_import_params).await?;
1196
1197 if let Some(finalized_block_hash) = self
1198 .finalize_block_depth
1199 .and_then(|depth| header_number.checked_sub(depth))
1200 .and_then(|block_to_finalize| {
1201 self.client
1202 .hash(block_to_finalize)
1203 .expect("Block hash not found for number: {block_to_finalize:?}")
1204 })
1205 {
1206 self.client
1207 .finalize_block(finalized_block_hash, None, true)
1208 .unwrap();
1209 }
1210
1211 match import_result {
1212 ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1213 bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1214 }
1215 }
1216
1217 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1220 pub async fn produce_block_with_slot_at(
1221 &mut self,
1222 new_slot: NewSlot,
1223 parent_hash: BlockHashFor<Block>,
1224 maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1225 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1226 let block_timer = time::Instant::now();
1227
1228 let extrinsics = match maybe_extrinsics {
1229 Some(extrinsics) => extrinsics,
1230 None => self.collect_txn_from_pool(parent_hash).await,
1231 };
1232 let tx_hashes: Vec<_> = extrinsics
1233 .iter()
1234 .map(|t| self.transaction_pool.hash_of(t))
1235 .collect();
1236
1237 let (block, storage_changes) = self
1238 .build_block(new_slot.0, parent_hash, extrinsics)
1239 .await?;
1240
1241 log_new_block(&block, block_timer.elapsed().as_millis());
1242
1243 let res = match self.import_block(block, Some(storage_changes)).await {
1244 Ok(hash) => {
1245 self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1248 Ok(hash)
1249 }
1250 err => err,
1251 };
1252 self.confirm_block_import_processed().await;
1253 res
1254 }
1255
1256 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1259 pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1260 self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1261 .await?;
1262 Ok(())
1263 }
1264
1265 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1267 pub async fn produce_block_with_extrinsics(
1268 &mut self,
1269 extrinsics: Vec<ExtrinsicFor<Block>>,
1270 ) -> Result<(), Box<dyn Error>> {
1271 let slot = self.produce_slot();
1272 self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1273 .await?;
1274 Ok(())
1275 }
1276
1277 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1279 pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1280 for _ in 0..n {
1281 let slot = self.produce_slot();
1282 self.produce_block_with_slot(slot).await?;
1283 }
1284 Ok(())
1285 }
1286
1287 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1289 pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1290 for _ in 0..n {
1291 let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1292 self.produce_block_with_slot(slot).await?;
1293 }
1294 Ok(())
1295 }
1296
1297 pub fn account_nonce(&self) -> u32 {
1299 self.client
1300 .runtime_api()
1301 .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1302 .expect("Fail to get account nonce")
1303 }
1304
1305 pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1307 self.client
1308 .runtime_api()
1309 .account_nonce(self.client.info().best_hash, account_id)
1310 .expect("Fail to get account nonce")
1311 }
1312
1313 pub fn construct_extrinsic(
1315 &self,
1316 nonce: u32,
1317 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1318 ) -> UncheckedExtrinsic {
1319 construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1320 }
1321
1322 pub fn construct_unsigned_extrinsic(
1324 &self,
1325 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1326 ) -> UncheckedExtrinsic {
1327 construct_unsigned_extrinsic(&self.client, function)
1328 }
1329
1330 pub async fn construct_and_send_extrinsic_with(
1332 &self,
1333 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1334 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1335 let nonce = self.account_nonce();
1336 let extrinsic = self.construct_extrinsic(nonce, function);
1337 self.rpc_handlers.send_transaction(extrinsic.into()).await
1338 }
1339
1340 pub async fn send_extrinsic(
1342 &self,
1343 extrinsic: impl Into<OpaqueExtrinsic>,
1344 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1345 self.rpc_handlers.send_transaction(extrinsic.into()).await
1346 }
1347}
1348
1349fn log_new_block(block: &Block, used_time_ms: u128) {
1350 tracing::info!(
1351 "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1352 block.header().number(),
1353 used_time_ms,
1354 block.header().hash(),
1355 block.header().parent_hash(),
1356 block.extrinsics().len(),
1357 block
1358 .extrinsics()
1359 .iter()
1360 .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1361 .collect::<Vec<_>>()
1362 .join(", ")
1363 );
1364}
1365
1366fn mock_import_queue<Block: BlockT, I>(
1367 block_import: I,
1368 spawner: &impl SpawnEssentialNamed,
1369) -> BasicQueue<Block>
1370where
1371 I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1372{
1373 BasicQueue::new(
1374 MockVerifier::default(),
1375 Box::new(block_import),
1376 None,
1377 spawner,
1378 None,
1379 )
1380}
1381
1382struct MockVerifier<Block> {
1383 _marker: PhantomData<Block>,
1384}
1385
1386impl<Block> Default for MockVerifier<Block> {
1387 fn default() -> Self {
1388 Self {
1389 _marker: PhantomData,
1390 }
1391 }
1392}
1393
1394#[async_trait::async_trait]
1395impl<Block> VerifierT<Block> for MockVerifier<Block>
1396where
1397 Block: BlockT,
1398{
1399 async fn verify(
1400 &self,
1401 block_params: BlockImportParams<Block>,
1402 ) -> Result<BlockImportParams<Block>, String> {
1403 Ok(block_params)
1404 }
1405}
1406
1407#[allow(clippy::type_complexity)]
1410struct MockBlockImport<Client, Block: BlockT> {
1411 inner: Arc<Client>,
1412 block_importing_notification_subscribers:
1413 Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1414}
1415
1416impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1417 fn new(inner: Arc<Client>) -> Self {
1418 MockBlockImport {
1419 inner,
1420 block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1421 }
1422 }
1423
1424 fn block_importing_notification_stream(
1426 &self,
1427 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1428 let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1429 self.block_importing_notification_subscribers
1430 .lock()
1431 .push(tx);
1432 rx
1433 }
1434}
1435
1436impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1437 fn clone(&self) -> Self {
1438 MockBlockImport {
1439 inner: self.inner.clone(),
1440 block_importing_notification_subscribers: self
1441 .block_importing_notification_subscribers
1442 .clone(),
1443 }
1444 }
1445}
1446
1447#[async_trait::async_trait]
1448impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1449where
1450 Block: BlockT,
1451 for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1452 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1453 Client::Api: ApiExt<Block>,
1454{
1455 type Error = ConsensusError;
1456
1457 async fn import_block(
1458 &self,
1459 mut block: BlockImportParams<Block>,
1460 ) -> Result<ImportResult, Self::Error> {
1461 let block_number = *block.header.number();
1462 block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1463
1464 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1465
1466 {
1469 let value = (block_number, acknowledgement_sender);
1470 self.block_importing_notification_subscribers
1471 .lock()
1472 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1473 }
1474
1475 while acknowledgement_receiver.next().await.is_some() {
1476 }
1478
1479 self.inner.import_block(block).await
1480 }
1481
1482 async fn check_block(
1483 &self,
1484 block: BlockCheckParams<Block>,
1485 ) -> Result<ImportResult, Self::Error> {
1486 self.inner.check_block(block).await
1487 }
1488}
1489
1490#[macro_export]
1492macro_rules! produce_blocks {
1493 ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1494 {
1495 async {
1496 let domain_fut = {
1497 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1498 futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1499 $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1500 futures::future::join_all(futs)
1501 };
1502 $primary_node.produce_blocks_with_bundles($count).await?;
1503 domain_fut.await;
1504 Ok::<(), Box<dyn std::error::Error>>(())
1505 }
1506 }
1507 };
1508}
1509
1510#[macro_export]
1513macro_rules! produce_block_with {
1514 ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1515 {
1516 async {
1517 let domain_fut = {
1518 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1519 futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1520 $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1521 futures::future::join_all(futs)
1522 };
1523 $primary_node_produce_block.await?;
1524 domain_fut.await;
1525 Ok::<(), Box<dyn std::error::Error>>(())
1526 }
1527 }
1528 };
1529}
1530
1531#[macro_export]
1533macro_rules! produce_blocks_until {
1534 ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1535 async {
1536 while !$condition {
1537 produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1538 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1539 }
1540 Ok::<(), Box<dyn std::error::Error>>(())
1541 }
1542 };
1543}
1544
1545type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1546
1547fn get_signed_extra(
1548 current_block: u64,
1549 immortal: bool,
1550 nonce: u32,
1551 tip: BalanceOf<Runtime>,
1552) -> SignedExtra {
1553 let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1554 .checked_next_power_of_two()
1555 .map(|c| c / 2)
1556 .unwrap_or(2);
1557 (
1558 frame_system::CheckNonZeroSender::<Runtime>::new(),
1559 frame_system::CheckSpecVersion::<Runtime>::new(),
1560 frame_system::CheckTxVersion::<Runtime>::new(),
1561 frame_system::CheckGenesis::<Runtime>::new(),
1562 frame_system::CheckMortality::<Runtime>::from(if immortal {
1563 generic::Era::Immortal
1564 } else {
1565 generic::Era::mortal(period, current_block)
1566 }),
1567 frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1568 frame_system::CheckWeight::<Runtime>::new(),
1569 pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1570 BalanceTransferCheckExtension::<Runtime>::default(),
1571 pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1572 pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1573 pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1574 )
1575}
1576
1577fn construct_extrinsic_raw_payload<Client>(
1578 client: impl AsRef<Client>,
1579 function: <Runtime as frame_system::Config>::RuntimeCall,
1580 immortal: bool,
1581 nonce: u32,
1582 tip: BalanceOf<Runtime>,
1583) -> (
1584 SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1585 SignedExtra,
1586)
1587where
1588 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1589 u64: From<BlockNumberFor<Runtime>>,
1590 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1591{
1592 let current_block_hash = client.as_ref().info().best_hash;
1593 let current_block = client.as_ref().info().best_number.saturated_into();
1594 let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1595 let extra = get_signed_extra(current_block, immortal, nonce, tip);
1596 (
1597 generic::SignedPayload::<
1598 <Runtime as frame_system::Config>::RuntimeCall,
1599 SignedExtra,
1600 >::from_raw(
1601 function,
1602 extra.clone(),
1603 ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1604 ),
1605 extra,
1606 )
1607}
1608
1609pub fn construct_extrinsic_generic<Client>(
1611 client: impl AsRef<Client>,
1612 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1613 caller: Sr25519Keyring,
1614 immortal: bool,
1615 nonce: u32,
1616 tip: BalanceOf<Runtime>,
1617) -> UncheckedExtrinsic
1618where
1619 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1620 u64: From<BlockNumberFor<Runtime>>,
1621 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1622{
1623 let function = function.into();
1624 let (raw_payload, extra) =
1625 construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1626 let signature = raw_payload.using_encoded(|e| caller.sign(e));
1627 UncheckedExtrinsic::new_signed(
1628 function,
1629 MultiAddress::Id(caller.to_account_id()),
1630 Signature::Sr25519(signature),
1631 extra,
1632 )
1633}
1634
1635fn construct_unsigned_extrinsic<Client>(
1637 client: impl AsRef<Client>,
1638 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1639) -> UncheckedExtrinsic
1640where
1641 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1642 u64: From<BlockNumberFor<Runtime>>,
1643 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1644{
1645 let function = function.into();
1646 let current_block = client.as_ref().info().best_number.saturated_into();
1647 let extra = get_signed_extra(current_block, true, 0, 0);
1648 UncheckedExtrinsic::new_transaction(function, extra)
1649}