1#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34 BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41 NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44 DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45 OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, RpcEndpoint,
46 WasmExecutionMethod, WasmtimeInstantiationStrategy,
47};
48use sc_service::{BasePath, BlocksPruning, Configuration, Role, SpawnTasksParams, TaskManager};
49use sc_transaction_pool::{BasicPool, FullChainApi, Options};
50use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
51use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
52use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
53use sp_api::{ApiExt, ProvideRuntimeApi};
54use sp_blockchain::{HashAndNumber, HeaderBackend};
55use sp_consensus::{BlockOrigin, Error as ConsensusError};
56use sp_consensus_slots::Slot;
57use sp_consensus_subspace::digests::{
58 CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
59};
60use sp_consensus_subspace::{PotExtension, SubspaceApi};
61use sp_core::H256;
62use sp_core::offchain::OffchainDbExt;
63use sp_core::offchain::storage::OffchainDb;
64use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
65use sp_domains::bundle::OpaqueBundle;
66use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
67use sp_domains_fraud_proof::fraud_proof::FraudProof;
68use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
69use sp_externalities::Extensions;
70use sp_inherents::{InherentData, InherentDataProvider};
71use sp_keyring::Sr25519Keyring;
72use sp_messenger::MessengerApi;
73use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
74use sp_mmr_primitives::MmrApi;
75use sp_runtime::generic::{Digest, SignedPayload};
76use sp_runtime::traits::{
77 BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
78};
79use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
80use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
81use sp_timestamp::Timestamp;
82use std::collections::HashMap;
83use std::error::Error;
84use std::marker::PhantomData;
85use std::net::SocketAddr;
86use std::pin::Pin;
87use std::sync::Arc;
88use std::time;
89use std::time::Duration;
90use subspace_core_primitives::pot::PotOutput;
91use subspace_core_primitives::solutions::Solution;
92use subspace_core_primitives::{BlockNumber, PublicKey};
93use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
94use subspace_runtime_primitives::opaque::Block;
95use subspace_runtime_primitives::{
96 AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
97};
98use subspace_service::{FullSelectChain, RuntimeExecutor};
99use subspace_test_client::{Backend, Client, chain_spec};
100use subspace_test_primitives::OnchainStateApi;
101use subspace_test_runtime::{
102 Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
103};
104use substrate_frame_rpc_system::AccountNonceApi;
105use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
106use tokio::time::sleep;
107
108pub type FraudProofFor<Block, DomainBlock> =
110 FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
111
112const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
113
114#[expect(clippy::too_many_arguments)]
119pub fn node_config(
120 tokio_handle: tokio::runtime::Handle,
121 key: Sr25519Keyring,
122 boot_nodes: Vec<MultiaddrWithPeerId>,
123 run_farmer: bool,
124 force_authoring: bool,
125 force_synced: bool,
126 private_evm: bool,
127 evm_owner_account: Option<AccountId>,
128 base_path: BasePath,
129 rpc_addr: Option<SocketAddr>,
130 rpc_port: Option<u16>,
131) -> Configuration {
132 let root = base_path.path();
133 let role = if run_farmer {
134 Role::Authority
135 } else {
136 Role::Full
137 };
138 let key_seed = key.to_seed();
139 let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
140
141 let mut network_config = NetworkConfiguration::new(
142 key_seed.to_string(),
143 "network/test/0.1",
144 Default::default(),
145 None,
146 );
147
148 network_config.boot_nodes = boot_nodes;
149
150 network_config.allow_non_globals_in_dht = true;
151
152 let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
153 network_config.listen_addresses.push(addr.clone());
154
155 network_config.public_addresses.push(addr);
156
157 network_config.transport = TransportConfig::MemoryOnly;
158
159 network_config.force_synced = force_synced;
160
161 let rpc_configuration = match rpc_addr {
162 Some(listen_addr) => {
163 let port = rpc_port.unwrap_or(9944);
164 RpcConfiguration {
165 addr: Some(vec![RpcEndpoint {
166 batch_config: RpcBatchRequestConfig::Disabled,
167 max_connections: 100,
168 listen_addr,
169 rpc_methods: Default::default(),
170 rate_limit: None,
171 rate_limit_trust_proxy_headers: false,
172 rate_limit_whitelisted_ips: vec![],
173 max_payload_in_mb: 15,
174 max_payload_out_mb: 15,
175 max_subscriptions_per_connection: 100,
176 max_buffer_capacity_per_connection: 100,
177 cors: None,
178 retry_random_port: true,
179 is_optional: false,
180 }]),
181 max_request_size: 15,
182 max_response_size: 15,
183 id_provider: None,
184 max_subs_per_conn: 1024,
185 port,
186 message_buffer_capacity: 1024,
187 batch_config: RpcBatchRequestConfig::Disabled,
188 max_connections: 1000,
189 cors: None,
190 methods: Default::default(),
191 rate_limit: None,
192 rate_limit_whitelisted_ips: vec![],
193 rate_limit_trust_proxy_headers: false,
194 request_logger_limit: 1024,
195 }
196 }
197 None => RpcConfiguration {
198 addr: None,
199 max_request_size: 0,
200 max_response_size: 0,
201 id_provider: None,
202 max_subs_per_conn: 0,
203 port: 0,
204 message_buffer_capacity: 0,
205 batch_config: RpcBatchRequestConfig::Disabled,
206 max_connections: 0,
207 cors: None,
208 methods: Default::default(),
209 rate_limit: None,
210 rate_limit_whitelisted_ips: vec![],
211 rate_limit_trust_proxy_headers: false,
212 request_logger_limit: 1024,
213 },
214 };
215
216 Configuration {
217 impl_name: "subspace-test-node".to_string(),
218 impl_version: "0.1".to_string(),
219 role,
220 tokio_handle,
221 transaction_pool: Default::default(),
222 network: network_config,
223 keystore: KeystoreConfig::InMemory,
224 database: DatabaseSource::ParityDb {
225 path: root.join("paritydb"),
226 },
227 trie_cache_maximum_size: Some(64 * 1024 * 1024),
228 state_pruning: Default::default(),
229 blocks_pruning: BlocksPruning::KeepAll,
230 chain_spec: Box::new(spec),
231 executor: ExecutorConfiguration {
232 wasm_method: WasmExecutionMethod::Compiled {
233 instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
234 },
235 max_runtime_instances: 8,
236 default_heap_pages: None,
237 runtime_cache_size: 2,
238 },
239 wasm_runtime_overrides: Default::default(),
240 rpc: rpc_configuration,
241 prometheus_config: None,
242 telemetry_endpoints: None,
243 offchain_worker: OffchainWorkerConfig {
244 enabled: false,
245 indexing_enabled: true,
246 },
247 force_authoring,
248 disable_grandpa: false,
249 dev_key_seed: Some(key_seed),
250 tracing_targets: None,
251 tracing_receiver: Default::default(),
252 announce_block: true,
253 data_path: base_path.path().into(),
254 base_path,
255 warm_up_trie_cache: None,
256 }
257}
258
259type StorageChanges = sp_api::StorageChanges<Block>;
260
261struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
262 consensus_client: Arc<Client>,
263 consensus_backend: Arc<CBackend>,
264 executor: Arc<Executor>,
265 mock_pot_verifier: Arc<MockPotVerfier>,
266 confirmation_depth_k: BlockNumber,
267 _phantom: PhantomData<DomainBlock>,
268}
269
270impl<Client, DomainBlock, Executor, CBackend>
271 MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
272{
273 fn new(
274 consensus_client: Arc<Client>,
275 executor: Arc<Executor>,
276 mock_pot_verifier: Arc<MockPotVerfier>,
277 consensus_backend: Arc<CBackend>,
278 confirmation_depth_k: BlockNumber,
279 ) -> Self {
280 Self {
281 consensus_client,
282 consensus_backend,
283 executor,
284 mock_pot_verifier,
285 confirmation_depth_k,
286 _phantom: Default::default(),
287 }
288 }
289}
290
291#[derive(Default)]
292struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
293
294impl MockPotVerfier {
295 fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
296 self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
297 }
298
299 fn inject_pot(&self, slot: u64, pot: PotOutput) {
300 self.0.lock().insert(slot, pot);
301 }
302}
303
304impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
305 for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
306where
307 Block: BlockT,
308 Block::Hash: From<H256> + Into<H256>,
309 DomainBlock: BlockT,
310 DomainBlock::Hash: Into<H256> + From<H256>,
311 Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
312 Client::Api: DomainsApi<Block, DomainBlock::Header>
313 + BundleProducerElectionApi<Block, Balance>
314 + MessengerApi<Block, NumberFor<Block>, Block::Hash>
315 + MmrApi<Block, H256, NumberFor<Block>>,
316 Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
317 CBackend: BackendT<Block> + 'static,
318{
319 fn extensions_for(
320 &self,
321 _block_hash: Block::Hash,
322 _block_number: NumberFor<Block>,
323 ) -> Extensions {
324 let confirmation_depth_k = self.confirmation_depth_k;
325 let mut exts = Extensions::new();
326 exts.register(FraudProofExtension::new(Arc::new(
327 FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
328 self.consensus_client.clone(),
329 self.executor.clone(),
330 move |client, executor| {
331 let extension_factory =
332 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
333 client,
334 executor,
335 confirmation_depth_k,
336 );
337 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
338 },
339 ),
340 )));
341 exts.register(SubspaceMmrExtension::new(Arc::new(
342 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
343 self.consensus_client.clone(),
344 confirmation_depth_k,
345 ),
346 )));
347 exts.register(MessengerExtension::new(Arc::new(
348 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
349 self.consensus_client.clone(),
350 self.executor.clone(),
351 ),
352 )));
353
354 if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
355 let offchain_db = OffchainDb::new(offchain_storage);
356 exts.register(OffchainDbExt::new(offchain_db));
357 }
358 exts.register(PotExtension::new({
359 let client = Arc::clone(&self.consensus_client);
360 let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
361 Box::new(
362 move |parent_hash, slot, proof_of_time, _quick_verification| {
363 let parent_hash = {
364 let mut converted_parent_hash = Block::Hash::default();
365 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
366 converted_parent_hash
367 };
368
369 let parent_header = match client.header(parent_hash) {
370 Ok(Some(parent_header)) => parent_header,
371 _ => return false,
372 };
373 let parent_pre_digest = match extract_pre_digest(&parent_header) {
374 Ok(parent_pre_digest) => parent_pre_digest,
375 _ => return false,
376 };
377
378 let parent_slot = parent_pre_digest.slot();
379 if slot <= *parent_slot {
380 return false;
381 }
382
383 mock_pot_verifier.is_valid(slot, proof_of_time)
384 },
385 )
386 }));
387 exts
388 }
389}
390
391type NewSlot = (Slot, PotOutput);
392
393pub struct MockConsensusNode {
395 pub task_manager: TaskManager,
397 pub client: Arc<Client>,
399 pub backend: Arc<Backend>,
401 pub executor: RuntimeExecutor,
403 pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
405 pub select_chain: FullSelectChain,
407 pub network_service: Arc<dyn NetworkService + Send + Sync>,
409 pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
411 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
413 pub rpc_handlers: sc_service::RpcHandlers,
415 next_slot: u64,
417 mock_pot_verifier: Arc<MockPotVerfier>,
419 new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
421 acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
423 block_import: MockBlockImport<Client, Block>,
425 xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
426 mock_solution: Solution<AccountId>,
428 log_prefix: &'static str,
429 pub key: Sr25519Keyring,
431 finalize_block_depth: Option<NumberFor<Block>>,
432 base_path: BasePath,
434}
435
436pub struct MockConsensusNodeRpcConfig {
438 pub base_path: BasePath,
440 pub finalize_block_depth: Option<NumberFor<Block>>,
442 pub private_evm: bool,
444 pub evm_owner: Option<Sr25519Keyring>,
446 pub rpc_addr: Option<SocketAddr>,
448 pub rpc_port: Option<u16>,
450}
451
452impl MockConsensusNode {
453 fn run_with_configuration(
454 mut config: Configuration,
455 key: Sr25519Keyring,
456 base_path: BasePath,
457 finalize_block_depth: Option<NumberFor<Block>>,
458 rpc_builder: Box<
459 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
460 >,
461 ) -> MockConsensusNode {
462 let log_prefix = key.into();
463
464 let tx_pool_options = Options {
465 ban_time: Duration::from_millis(0),
466 ..Default::default()
467 };
468
469 config.network.node_name = format!("{} (Consensus)", config.network.node_name);
470 let span = sc_tracing::tracing::info_span!(
471 sc_tracing::logging::PREFIX_LOG_SPAN,
472 name = config.network.node_name.as_str()
473 );
474 let _enter = span.enter();
475
476 let executor = sc_service::new_wasm_executor(&config.executor);
477
478 let (client, backend, keystore_container, mut task_manager) =
479 sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
480 .expect("Fail to new full parts");
481
482 let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
483 let client = Arc::new(client);
484 let mock_pot_verifier = Arc::new(MockPotVerfier::default());
485 let chain_constants = client
486 .runtime_api()
487 .chain_constants(client.info().best_hash)
488 .expect("Fail to get chain constants");
489 client
490 .execution_extensions()
491 .set_extensions_factory(MockExtensionsFactory::<
492 _,
493 DomainBlock,
494 sc_domains::RuntimeExecutor,
495 _,
496 >::new(
497 client.clone(),
498 domain_executor.clone(),
499 Arc::clone(&mock_pot_verifier),
500 backend.clone(),
501 chain_constants.confirmation_depth_k(),
502 ));
503
504 let select_chain = sc_consensus::LongestChain::new(backend.clone());
505 let transaction_pool = Arc::from(BasicPool::new_full(
506 tx_pool_options,
507 config.role.is_authority().into(),
508 config.prometheus_registry(),
509 task_manager.spawn_essential_handle(),
510 client.clone(),
511 ));
512
513 let block_import = MockBlockImport::<_, _>::new(client.clone());
514
515 let mut net_config = sc_network::config::FullNetworkConfiguration::<
516 _,
517 _,
518 NetworkWorker<_, _>,
519 >::new(&config.network, None);
520 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
521 xdm_gossip_peers_set_config();
522 net_config.add_notification_protocol(xdm_gossip_notification_config);
523
524 let (network_service, system_rpc_tx, tx_handler_controller, sync_service) =
525 sc_service::build_network(sc_service::BuildNetworkParams {
526 config: &config,
527 net_config,
528 client: client.clone(),
529 transaction_pool: transaction_pool.clone(),
530 spawn_handle: task_manager.spawn_handle(),
531 import_queue: mock_import_queue(
532 block_import.clone(),
533 &task_manager.spawn_essential_handle(),
534 ),
535 block_announce_validator_builder: None,
536 warp_sync_config: None,
537 block_relay: None,
538 metrics: NotificationMetrics::new(None),
539 })
540 .expect("Should be able to build network");
541
542 let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
543 network: network_service.clone(),
544 client: client.clone(),
545 keystore: keystore_container.keystore(),
546 task_manager: &mut task_manager,
547 transaction_pool: transaction_pool.clone(),
548 rpc_builder: Box::new(move |_| rpc_builder()),
549 backend: backend.clone(),
550 system_rpc_tx,
551 config,
552 telemetry: None,
553 tx_handler_controller,
554 sync_service: sync_service.clone(),
555 tracing_execute_block: None,
556 })
557 .expect("Should be able to spawn tasks");
558
559 let mock_solution =
560 Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
561
562 let mut gossip_builder = GossipWorkerBuilder::new();
563
564 task_manager
565 .spawn_essential_handle()
566 .spawn_essential_blocking(
567 "consensus-chain-channel-update-worker",
568 None,
569 Box::pin(
570 domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
571 ChainId::Consensus,
572 client.clone(),
573 sync_service.clone(),
574 gossip_builder.gossip_msg_sink(),
575 ),
576 ),
577 );
578
579 let (consensus_msg_sink, consensus_msg_receiver) =
580 tracing_unbounded("consensus_message_channel", 100);
581
582 let consensus_listener =
583 cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
584 ChainId::Consensus,
585 client.clone(),
586 client.clone(),
587 transaction_pool.clone(),
588 network_service.clone(),
589 consensus_msg_receiver,
590 domain_executor,
591 sync_service.clone(),
592 );
593
594 task_manager
595 .spawn_essential_handle()
596 .spawn_essential_blocking(
597 "consensus-message-listener",
598 None,
599 Box::pin(consensus_listener),
600 );
601
602 gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
603
604 task_manager.spawn_essential_handle().spawn_blocking(
605 "mmr-gadget",
606 None,
607 mmr_gadget::MmrGadget::start(
608 client.clone(),
609 backend.clone(),
610 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
611 ),
612 );
613
614 MockConsensusNode {
615 task_manager,
616 client,
617 backend,
618 executor,
619 transaction_pool,
620 select_chain,
621 network_service,
622 xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
623 sync_service,
624 rpc_handlers,
625 next_slot: 1,
626 mock_pot_verifier,
627 new_slot_notification_subscribers: Vec::new(),
628 acknowledgement_sender_subscribers: Vec::new(),
629 block_import,
630 xdm_gossip_worker_builder: Some(gossip_builder),
631 mock_solution,
632 log_prefix,
633 key,
634 finalize_block_depth,
635 base_path,
636 }
637 }
638
639 pub fn run(
641 tokio_handle: tokio::runtime::Handle,
642 key: Sr25519Keyring,
643 base_path: BasePath,
644 ) -> MockConsensusNode {
645 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
646 }
647
648 pub fn run_with_private_evm(
650 tokio_handle: tokio::runtime::Handle,
651 key: Sr25519Keyring,
652 evm_owner: Option<Sr25519Keyring>,
653 base_path: BasePath,
654 ) -> MockConsensusNode {
655 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
656 }
657
658 pub fn run_with_finalization_depth(
660 tokio_handle: tokio::runtime::Handle,
661 key: Sr25519Keyring,
662 base_path: BasePath,
663 finalize_block_depth: Option<NumberFor<Block>>,
664 private_evm: bool,
665 evm_owner: Option<Sr25519Keyring>,
666 ) -> MockConsensusNode {
667 let rpc_config = MockConsensusNodeRpcConfig {
668 base_path,
669 finalize_block_depth,
670 private_evm,
671 evm_owner,
672 rpc_addr: None,
673 rpc_port: None,
674 };
675
676 Self::run_with_rpc_builder(
677 tokio_handle,
678 key,
679 rpc_config,
680 Box::new(|| Ok(RpcModule::new(()))),
681 )
682 }
683
684 pub fn run_with_rpc_builder(
686 tokio_handle: tokio::runtime::Handle,
687 key: Sr25519Keyring,
688 rpc_config: MockConsensusNodeRpcConfig,
689 rpc_builder: Box<
690 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
691 >,
692 ) -> MockConsensusNode {
693 let MockConsensusNodeRpcConfig {
694 base_path,
695 finalize_block_depth,
696 private_evm,
697 evm_owner,
698 rpc_addr,
699 rpc_port,
700 } = rpc_config;
701
702 let config = node_config(
703 tokio_handle,
704 key,
705 vec![],
706 false,
707 false,
708 false,
709 private_evm,
710 evm_owner.map(|key| key.to_account_id()),
711 base_path.clone(),
712 rpc_addr,
713 rpc_port,
714 );
715
716 Self::run_with_configuration(config, key, base_path, finalize_block_depth, rpc_builder)
717 }
718
719 pub fn run_with_rpc_options(
721 tokio_handle: tokio::runtime::Handle,
722 key: Sr25519Keyring,
723 rpc_config: MockConsensusNodeRpcConfig,
724 ) -> MockConsensusNode {
725 Self::run_with_rpc_builder(
726 tokio_handle,
727 key,
728 rpc_config,
729 Box::new(|| Ok(RpcModule::new(()))),
730 )
731 }
732
733 pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
735 self.xdm_gossip_worker_builder
736 .as_mut()
737 .expect("gossip message worker have not started yet")
738 }
739
740 pub fn start_cross_domain_gossip_message_worker(&mut self) {
742 let xdm_gossip_worker_builder = self
743 .xdm_gossip_worker_builder
744 .take()
745 .expect("gossip message worker have not started yet");
746 let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
747 self.network_service.clone(),
748 self.xdm_gossip_notification_service
749 .take()
750 .expect("XDM gossip notification service must be used only once"),
751 self.sync_service.clone(),
752 );
753 self.task_manager
754 .spawn_essential_handle()
755 .spawn_essential_blocking(
756 "cross-domain-gossip-message-worker",
757 None,
758 Box::pin(cross_domain_message_gossip_worker.run()),
759 );
760 }
761
762 pub fn next_slot(&self) -> u64 {
764 self.next_slot
765 }
766
767 pub fn set_next_slot(&mut self, next_slot: u64) {
769 self.next_slot = next_slot;
770 }
771
772 pub fn produce_slot(&mut self) -> NewSlot {
774 let slot = Slot::from(self.next_slot);
775 let proof_of_time = PotOutput::from(
776 <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
777 .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
778 );
779 self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
780 self.next_slot += 1;
781
782 (slot, proof_of_time)
783 }
784
785 pub async fn notify_new_slot_and_wait_for_bundle(
787 &mut self,
788 new_slot: NewSlot,
789 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
790 self.new_slot_notification_subscribers
791 .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
792
793 self.confirm_acknowledgement().await;
794 self.get_bundle_from_tx_pool(new_slot)
795 }
796
797 pub async fn produce_slot_and_wait_for_bundle_submission(
799 &mut self,
800 ) -> (
801 NewSlot,
802 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
803 ) {
804 let slot = self.produce_slot();
805 for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
806 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
807 return (slot, bundle);
808 }
809 }
810 panic!(
811 "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
812 );
813 }
814
815 pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
817 &mut self,
818 operator_id: OperatorId,
819 ) -> (
820 NewSlot,
821 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
822 ) {
823 loop {
824 let slot = self.produce_slot();
825 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
826 && bundle.sealed_header().proof_of_election().operator_id == operator_id
827 {
828 return (slot, bundle);
829 }
830 }
831 }
832
833 pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
835 let (tx, rx) = mpsc::unbounded();
836 self.new_slot_notification_subscribers.push(tx);
837 rx
838 }
839
840 pub fn new_acknowledgement_sender_stream(
842 &mut self,
843 ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
844 let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
845 self.acknowledgement_sender_subscribers.push(tx);
846 rx
847 }
848
849 pub async fn confirm_acknowledgement(&mut self) {
854 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
855
856 {
859 self.acknowledgement_sender_subscribers
860 .retain(|subscriber| {
861 subscriber
862 .unbounded_send(acknowledgement_sender.clone())
863 .is_ok()
864 });
865 drop(acknowledgement_sender);
866 }
867
868 while acknowledgement_receiver.next().await.is_some() {
870 }
872 }
873
874 pub async fn confirm_block_import_processed(&mut self) {
876 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
879 {
880 let value = (NumberFor::<Block>::default(), acknowledgement_sender);
884 self.block_import
885 .block_importing_notification_subscribers
886 .lock()
887 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
888 }
889 while acknowledgement_receiver.next().await.is_some() {
890 }
892
893 self.confirm_acknowledgement().await;
895 }
896
897 pub fn block_importing_notification_stream(
899 &self,
900 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
901 self.block_import.block_importing_notification_stream()
902 }
903
904 pub fn get_bundle_from_tx_pool(
906 &self,
907 new_slot: NewSlot,
908 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
909 for ready_tx in self.transaction_pool.ready() {
910 let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
911 .expect("should be able to decode");
912 if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
913 ext.function
914 && opaque_bundle.sealed_header().slot_number() == *new_slot.0
915 {
916 return Some(opaque_bundle);
917 }
918 }
919 None
920 }
921
922 pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
924 self.transaction_pool
925 .submit_one(
926 self.client.info().best_hash,
927 TransactionSource::External,
928 tx,
929 )
930 .await
931 .map_err(|err| {
932 err.into_pool_error()
933 .expect("should always be a pool error")
934 })
935 }
936
937 pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
939 let txs: Vec<_> = self
940 .transaction_pool
941 .ready()
942 .map(|t| self.transaction_pool.hash_of(&t.data))
943 .collect();
944 self.prune_txs_from_pool(txs.as_slice()).await
945 }
946
947 pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
949 self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
950 .await
951 }
952
953 async fn prune_txs_from_pool(
954 &self,
955 tx_hashes: &[BlockHashFor<Block>],
956 ) -> Result<(), Box<dyn Error>> {
957 let hash_and_number = HashAndNumber {
958 number: self.client.info().best_number,
959 hash: self.client.info().best_hash,
960 };
961 self.transaction_pool
962 .pool()
963 .prune_known(&hash_and_number, tx_hashes);
964 tokio::time::sleep(time::Duration::from_millis(1)).await;
967 self.transaction_pool
968 .pool()
969 .validated_pool()
970 .clear_stale(&hash_and_number);
971 Ok(())
972 }
973
974 pub fn does_receipt_exist(
976 &self,
977 er_hash: BlockHashFor<DomainBlock>,
978 ) -> Result<bool, Box<dyn Error>> {
979 Ok(self
980 .client
981 .runtime_api()
982 .execution_receipt(self.client.info().best_hash, er_hash)?
983 .is_some())
984 }
985
986 pub fn get_domain_staking_summary(
988 &self,
989 domain_id: DomainId,
990 ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
991 Ok(self
992 .client
993 .runtime_api()
994 .domain_stake_summary(self.client.info().best_hash, domain_id)?)
995 }
996
997 pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
999 Ok(self
1000 .client
1001 .runtime_api()
1002 .block_pruning_depth(self.client.info().best_hash)?)
1003 }
1004
1005 pub fn wait_for_fraud_proof<FP>(
1008 &self,
1009 fraud_proof_predicate: FP,
1010 ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
1011 where
1012 FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
1013 {
1014 let tx_pool = self.transaction_pool.clone();
1015 let mut import_tx_stream = self.transaction_pool.import_notification_stream();
1016 Box::pin(async move {
1017 while let Some(ready_tx_hash) = import_tx_stream.next().await {
1018 let ready_tx = tx_pool
1019 .ready_transaction(&ready_tx_hash)
1020 .expect("Just get the ready tx hash from import stream; qed");
1021 let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
1022 &mut ready_tx.data.encode().as_slice(),
1023 )
1024 .expect("Decode tx must success");
1025 if let subspace_test_runtime::RuntimeCall::Domains(
1026 pallet_domains::Call::submit_fraud_proof { fraud_proof },
1027 ) = ext.function
1028 && fraud_proof_predicate(&fraud_proof)
1029 {
1030 return *fraud_proof;
1031 }
1032 }
1033 unreachable!()
1034 })
1035 }
1036
1037 pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
1039 self.client
1040 .runtime_api()
1041 .free_balance(self.client.info().best_hash, account_id)
1042 .expect("Fail to get account free balance")
1043 }
1044
1045 pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
1048 self.network_service.report_peer(
1051 addr.peer_id,
1052 ReputationChange::new_fatal("Peer banned by test (1)"),
1053 );
1054 self.network_service.report_peer(
1055 addr.peer_id,
1056 ReputationChange::new_fatal("Peer banned by test (2)"),
1057 );
1058 }
1059
1060 pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
1062 self.network_service.report_peer(
1065 addr.peer_id,
1066 ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
1067 );
1068 self.network_service.report_peer(
1069 addr.peer_id,
1070 ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
1071 );
1072 }
1073
1074 pub async fn stop(self) -> Result<(), std::io::Error> {
1080 let lock_file_path = self.base_path.path().join("paritydb").join("lock");
1081 std::mem::drop(self);
1083
1084 sleep(Duration::from_secs(2)).await;
1087
1088 if let Err(err) = std::fs::remove_file(lock_file_path) {
1090 tracing::error!("deleting paritydb lock file failed: {err:?}");
1091 }
1092 Ok(())
1093 }
1094}
1095
1096impl MockConsensusNode {
1097 async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
1098 self.transaction_pool
1099 .ready_at(parent_hash)
1100 .await
1101 .map(|pending_tx| pending_tx.data().as_ref().clone())
1102 .collect()
1103 }
1104
1105 async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
1106 let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1107 <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1108 ));
1109 let subspace_inherents =
1110 sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1111
1112 let inherent_data = (subspace_inherents, timestamp)
1113 .create_inherent_data()
1114 .await?;
1115
1116 Ok(inherent_data)
1117 }
1118
1119 fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1120 let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1121 slot,
1122 solution: self.mock_solution.clone(),
1123 pot_info: PreDigestPotInfo::V0 {
1124 proof_of_time: Default::default(),
1125 future_proof_of_time: Default::default(),
1126 },
1127 };
1128 let mut digest = Digest::default();
1129 digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1130 digest
1131 }
1132
1133 async fn build_block(
1135 &self,
1136 slot: Slot,
1137 parent_hash: BlockHashFor<Block>,
1138 extrinsics: Vec<ExtrinsicFor<Block>>,
1139 ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1140 let inherent_digest = self.mock_subspace_digest(slot);
1141
1142 let inherent_data = Self::mock_inherent_data(slot).await?;
1143
1144 let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1145 .on_parent_block(parent_hash)
1146 .fetch_parent_block_number(self.client.as_ref())?
1147 .with_inherent_digests(inherent_digest)
1148 .build()
1149 .expect("Creates new block builder");
1150
1151 let inherent_txns = block_builder.create_inherents(inherent_data)?;
1152
1153 for tx in inherent_txns.into_iter().chain(extrinsics) {
1154 if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1155 tracing::error!("Invalid transaction while building block: {}", err);
1156 }
1157 }
1158
1159 let (block, storage_changes, _) = block_builder.build()?.into_inner();
1160 Ok((block, storage_changes))
1161 }
1162
1163 async fn import_block(
1165 &self,
1166 block: Block,
1167 storage_changes: Option<StorageChanges>,
1168 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1169 let (header, body) = block.deconstruct();
1170
1171 let header_hash = header.hash();
1172 let header_number = header.number;
1173
1174 let block_import_params = {
1175 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1176 import_block.body = Some(body);
1177 import_block.state_action = match storage_changes {
1178 Some(changes) => {
1179 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1180 }
1181 None => StateAction::Execute,
1182 };
1183 import_block
1184 };
1185
1186 let import_result = self.block_import.import_block(block_import_params).await?;
1187
1188 if let Some(finalized_block_hash) = self
1189 .finalize_block_depth
1190 .and_then(|depth| header_number.checked_sub(depth))
1191 .and_then(|block_to_finalize| {
1192 self.client
1193 .hash(block_to_finalize)
1194 .expect("Block hash not found for number: {block_to_finalize:?}")
1195 })
1196 {
1197 self.client
1198 .finalize_block(finalized_block_hash, None, true)
1199 .unwrap();
1200 }
1201
1202 match import_result {
1203 ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1204 bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1205 }
1206 }
1207
1208 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1211 pub async fn produce_block_with_slot_at(
1212 &mut self,
1213 new_slot: NewSlot,
1214 parent_hash: BlockHashFor<Block>,
1215 maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1216 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1217 let block_timer = time::Instant::now();
1218
1219 let extrinsics = match maybe_extrinsics {
1220 Some(extrinsics) => extrinsics,
1221 None => self.collect_txn_from_pool(parent_hash).await,
1222 };
1223 let tx_hashes: Vec<_> = extrinsics
1224 .iter()
1225 .map(|t| self.transaction_pool.hash_of(t))
1226 .collect();
1227
1228 let (block, storage_changes) = self
1229 .build_block(new_slot.0, parent_hash, extrinsics)
1230 .await?;
1231
1232 log_new_block(&block, block_timer.elapsed().as_millis());
1233
1234 let res = match self.import_block(block, Some(storage_changes)).await {
1235 Ok(hash) => {
1236 self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1239 Ok(hash)
1240 }
1241 err => err,
1242 };
1243 self.confirm_block_import_processed().await;
1244 res
1245 }
1246
1247 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1250 pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1251 self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1252 .await?;
1253 Ok(())
1254 }
1255
1256 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1258 pub async fn produce_block_with_extrinsics(
1259 &mut self,
1260 extrinsics: Vec<ExtrinsicFor<Block>>,
1261 ) -> Result<(), Box<dyn Error>> {
1262 let slot = self.produce_slot();
1263 self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1264 .await?;
1265 Ok(())
1266 }
1267
1268 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1270 pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1271 for _ in 0..n {
1272 let slot = self.produce_slot();
1273 self.produce_block_with_slot(slot).await?;
1274 }
1275 Ok(())
1276 }
1277
1278 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1280 pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1281 for _ in 0..n {
1282 let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1283 self.produce_block_with_slot(slot).await?;
1284 }
1285 Ok(())
1286 }
1287
1288 pub fn account_nonce(&self) -> u32 {
1290 self.client
1291 .runtime_api()
1292 .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1293 .expect("Fail to get account nonce")
1294 }
1295
1296 pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1298 self.client
1299 .runtime_api()
1300 .account_nonce(self.client.info().best_hash, account_id)
1301 .expect("Fail to get account nonce")
1302 }
1303
1304 pub fn construct_extrinsic(
1306 &self,
1307 nonce: u32,
1308 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1309 ) -> UncheckedExtrinsic {
1310 construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1311 }
1312
1313 pub fn construct_unsigned_extrinsic(
1315 &self,
1316 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1317 ) -> UncheckedExtrinsic {
1318 construct_unsigned_extrinsic(&self.client, function)
1319 }
1320
1321 pub async fn construct_and_send_extrinsic_with(
1323 &self,
1324 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1325 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1326 let nonce = self.account_nonce();
1327 let extrinsic = self.construct_extrinsic(nonce, function);
1328 self.rpc_handlers.send_transaction(extrinsic.into()).await
1329 }
1330
1331 pub async fn send_extrinsic(
1333 &self,
1334 extrinsic: impl Into<OpaqueExtrinsic>,
1335 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1336 self.rpc_handlers.send_transaction(extrinsic.into()).await
1337 }
1338}
1339
1340fn log_new_block(block: &Block, used_time_ms: u128) {
1341 tracing::info!(
1342 "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1343 block.header().number(),
1344 used_time_ms,
1345 block.header().hash(),
1346 block.header().parent_hash(),
1347 block.extrinsics().len(),
1348 block
1349 .extrinsics()
1350 .iter()
1351 .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1352 .collect::<Vec<_>>()
1353 .join(", ")
1354 );
1355}
1356
1357fn mock_import_queue<Block: BlockT, I>(
1358 block_import: I,
1359 spawner: &impl SpawnEssentialNamed,
1360) -> BasicQueue<Block>
1361where
1362 I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1363{
1364 BasicQueue::new(
1365 MockVerifier::default(),
1366 Box::new(block_import),
1367 None,
1368 spawner,
1369 None,
1370 )
1371}
1372
1373struct MockVerifier<Block> {
1374 _marker: PhantomData<Block>,
1375}
1376
1377impl<Block> Default for MockVerifier<Block> {
1378 fn default() -> Self {
1379 Self {
1380 _marker: PhantomData,
1381 }
1382 }
1383}
1384
1385#[async_trait::async_trait]
1386impl<Block> VerifierT<Block> for MockVerifier<Block>
1387where
1388 Block: BlockT,
1389{
1390 async fn verify(
1391 &self,
1392 block_params: BlockImportParams<Block>,
1393 ) -> Result<BlockImportParams<Block>, String> {
1394 Ok(block_params)
1395 }
1396}
1397
1398#[allow(clippy::type_complexity)]
1401struct MockBlockImport<Client, Block: BlockT> {
1402 inner: Arc<Client>,
1403 block_importing_notification_subscribers:
1404 Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1405}
1406
1407impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1408 fn new(inner: Arc<Client>) -> Self {
1409 MockBlockImport {
1410 inner,
1411 block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1412 }
1413 }
1414
1415 fn block_importing_notification_stream(
1417 &self,
1418 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1419 let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1420 self.block_importing_notification_subscribers
1421 .lock()
1422 .push(tx);
1423 rx
1424 }
1425}
1426
1427impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1428 fn clone(&self) -> Self {
1429 MockBlockImport {
1430 inner: self.inner.clone(),
1431 block_importing_notification_subscribers: self
1432 .block_importing_notification_subscribers
1433 .clone(),
1434 }
1435 }
1436}
1437
1438#[async_trait::async_trait]
1439impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1440where
1441 Block: BlockT,
1442 for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1443 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1444 Client::Api: ApiExt<Block>,
1445{
1446 type Error = ConsensusError;
1447
1448 async fn import_block(
1449 &self,
1450 mut block: BlockImportParams<Block>,
1451 ) -> Result<ImportResult, Self::Error> {
1452 let block_number = *block.header.number();
1453 block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1454
1455 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1456
1457 {
1460 let value = (block_number, acknowledgement_sender);
1461 self.block_importing_notification_subscribers
1462 .lock()
1463 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1464 }
1465
1466 while acknowledgement_receiver.next().await.is_some() {
1467 }
1469
1470 self.inner.import_block(block).await
1471 }
1472
1473 async fn check_block(
1474 &self,
1475 block: BlockCheckParams<Block>,
1476 ) -> Result<ImportResult, Self::Error> {
1477 self.inner.check_block(block).await
1478 }
1479}
1480
1481#[macro_export]
1483macro_rules! produce_blocks {
1484 ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1485 {
1486 async {
1487 let domain_fut = {
1488 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1489 futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1490 $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1491 futures::future::join_all(futs)
1492 };
1493 $primary_node.produce_blocks_with_bundles($count).await?;
1494 domain_fut.await;
1495 Ok::<(), Box<dyn std::error::Error>>(())
1496 }
1497 }
1498 };
1499}
1500
1501#[macro_export]
1504macro_rules! produce_block_with {
1505 ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1506 {
1507 async {
1508 let domain_fut = {
1509 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1510 futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1511 $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1512 futures::future::join_all(futs)
1513 };
1514 $primary_node_produce_block.await?;
1515 domain_fut.await;
1516 Ok::<(), Box<dyn std::error::Error>>(())
1517 }
1518 }
1519 };
1520}
1521
1522#[macro_export]
1524macro_rules! produce_blocks_until {
1525 ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1526 async {
1527 while !$condition {
1528 produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1529 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1530 }
1531 Ok::<(), Box<dyn std::error::Error>>(())
1532 }
1533 };
1534}
1535
1536type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1537
1538fn get_signed_extra(
1539 current_block: u64,
1540 immortal: bool,
1541 nonce: u32,
1542 tip: BalanceOf<Runtime>,
1543) -> SignedExtra {
1544 let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1545 .checked_next_power_of_two()
1546 .map(|c| c / 2)
1547 .unwrap_or(2);
1548 (
1549 frame_system::CheckNonZeroSender::<Runtime>::new(),
1550 frame_system::CheckSpecVersion::<Runtime>::new(),
1551 frame_system::CheckTxVersion::<Runtime>::new(),
1552 frame_system::CheckGenesis::<Runtime>::new(),
1553 frame_system::CheckMortality::<Runtime>::from(if immortal {
1554 generic::Era::Immortal
1555 } else {
1556 generic::Era::mortal(period, current_block)
1557 }),
1558 frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1559 frame_system::CheckWeight::<Runtime>::new(),
1560 pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1561 BalanceTransferCheckExtension::<Runtime>::default(),
1562 pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1563 pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1564 pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1565 )
1566}
1567
1568fn construct_extrinsic_raw_payload<Client>(
1569 client: impl AsRef<Client>,
1570 function: <Runtime as frame_system::Config>::RuntimeCall,
1571 immortal: bool,
1572 nonce: u32,
1573 tip: BalanceOf<Runtime>,
1574) -> (
1575 SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1576 SignedExtra,
1577)
1578where
1579 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1580 u64: From<BlockNumberFor<Runtime>>,
1581 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1582{
1583 let current_block_hash = client.as_ref().info().best_hash;
1584 let current_block = client.as_ref().info().best_number.saturated_into();
1585 let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1586 let extra = get_signed_extra(current_block, immortal, nonce, tip);
1587 (
1588 generic::SignedPayload::<
1589 <Runtime as frame_system::Config>::RuntimeCall,
1590 SignedExtra,
1591 >::from_raw(
1592 function,
1593 extra.clone(),
1594 ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1595 ),
1596 extra,
1597 )
1598}
1599
1600pub fn construct_extrinsic_generic<Client>(
1602 client: impl AsRef<Client>,
1603 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1604 caller: Sr25519Keyring,
1605 immortal: bool,
1606 nonce: u32,
1607 tip: BalanceOf<Runtime>,
1608) -> UncheckedExtrinsic
1609where
1610 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1611 u64: From<BlockNumberFor<Runtime>>,
1612 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1613{
1614 let function = function.into();
1615 let (raw_payload, extra) =
1616 construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1617 let signature = raw_payload.using_encoded(|e| caller.sign(e));
1618 UncheckedExtrinsic::new_signed(
1619 function,
1620 MultiAddress::Id(caller.to_account_id()),
1621 Signature::Sr25519(signature),
1622 extra,
1623 )
1624}
1625
1626fn construct_unsigned_extrinsic<Client>(
1628 client: impl AsRef<Client>,
1629 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1630) -> UncheckedExtrinsic
1631where
1632 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1633 u64: From<BlockNumberFor<Runtime>>,
1634 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1635{
1636 let function = function.into();
1637 let current_block = client.as_ref().info().best_number.saturated_into();
1638 let extra = get_signed_extra(current_block, true, 0, 0);
1639 UncheckedExtrinsic::new_transaction(function, extra)
1640}