1#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34 BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41 NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44 DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45 OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, RpcEndpoint,
46 WasmExecutionMethod, WasmtimeInstantiationStrategy,
47};
48use sc_service::{BasePath, BlocksPruning, Configuration, Role, SpawnTasksParams, TaskManager};
49use sc_transaction_pool::{BasicPool, FullChainApi, Options};
50use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
51use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
52use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
53use sp_api::{ApiExt, ProvideRuntimeApi};
54use sp_blockchain::{HashAndNumber, HeaderBackend};
55use sp_consensus::{BlockOrigin, Error as ConsensusError};
56use sp_consensus_slots::Slot;
57use sp_consensus_subspace::digests::{
58 CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
59};
60use sp_consensus_subspace::{PotExtension, SubspaceApi};
61use sp_core::H256;
62use sp_core::offchain::OffchainDbExt;
63use sp_core::offchain::storage::OffchainDb;
64use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
65use sp_domains::bundle::OpaqueBundle;
66use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
67use sp_domains_fraud_proof::fraud_proof::FraudProof;
68use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
69use sp_externalities::Extensions;
70use sp_inherents::{InherentData, InherentDataProvider};
71use sp_keyring::Sr25519Keyring;
72use sp_messenger::MessengerApi;
73use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
74use sp_mmr_primitives::MmrApi;
75use sp_runtime::generic::{Digest, SignedPayload};
76use sp_runtime::traits::{
77 BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
78};
79use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
80use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
81use sp_timestamp::Timestamp;
82use std::collections::HashMap;
83use std::error::Error;
84use std::marker::PhantomData;
85use std::net::SocketAddr;
86use std::pin::Pin;
87use std::sync::Arc;
88use std::time;
89use std::time::Duration;
90use subspace_core_primitives::pot::PotOutput;
91use subspace_core_primitives::solutions::Solution;
92use subspace_core_primitives::{BlockNumber, PublicKey};
93use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
94use subspace_runtime_primitives::opaque::Block;
95use subspace_runtime_primitives::{
96 AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
97};
98use subspace_service::{FullSelectChain, RuntimeExecutor};
99use subspace_test_client::{Backend, Client, chain_spec};
100use subspace_test_primitives::OnchainStateApi;
101use subspace_test_runtime::{
102 Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
103};
104use substrate_frame_rpc_system::AccountNonceApi;
105use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
106use tokio::time::sleep;
107
108pub type FraudProofFor<Block, DomainBlock> =
110 FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
111
112const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
113
114#[expect(clippy::too_many_arguments)]
119pub fn node_config(
120 tokio_handle: tokio::runtime::Handle,
121 key: Sr25519Keyring,
122 boot_nodes: Vec<MultiaddrWithPeerId>,
123 run_farmer: bool,
124 force_authoring: bool,
125 force_synced: bool,
126 private_evm: bool,
127 evm_owner_account: Option<AccountId>,
128 base_path: BasePath,
129 rpc_addr: Option<SocketAddr>,
130 rpc_port: Option<u16>,
131) -> Configuration {
132 let root = base_path.path();
133 let role = if run_farmer {
134 Role::Authority
135 } else {
136 Role::Full
137 };
138 let key_seed = key.to_seed();
139 let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
140
141 let mut network_config = NetworkConfiguration::new(
142 key_seed.to_string(),
143 "network/test/0.1",
144 Default::default(),
145 None,
146 );
147
148 network_config.boot_nodes = boot_nodes;
149
150 network_config.allow_non_globals_in_dht = true;
151
152 let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
153 network_config.listen_addresses.push(addr.clone());
154
155 network_config.public_addresses.push(addr);
156
157 network_config.transport = TransportConfig::MemoryOnly;
158
159 network_config.force_synced = force_synced;
160
161 let rpc_configuration = match rpc_addr {
162 Some(listen_addr) => {
163 let port = rpc_port.unwrap_or(9944);
164 RpcConfiguration {
165 addr: Some(vec![RpcEndpoint {
166 batch_config: RpcBatchRequestConfig::Disabled,
167 max_connections: 100,
168 listen_addr,
169 rpc_methods: Default::default(),
170 rate_limit: None,
171 rate_limit_trust_proxy_headers: false,
172 rate_limit_whitelisted_ips: vec![],
173 max_payload_in_mb: 15,
174 max_payload_out_mb: 15,
175 max_subscriptions_per_connection: 100,
176 max_buffer_capacity_per_connection: 100,
177 cors: None,
178 retry_random_port: true,
179 is_optional: false,
180 }]),
181 max_request_size: 15,
182 max_response_size: 15,
183 id_provider: None,
184 max_subs_per_conn: 1024,
185 port,
186 message_buffer_capacity: 1024,
187 batch_config: RpcBatchRequestConfig::Disabled,
188 max_connections: 1000,
189 cors: None,
190 methods: Default::default(),
191 rate_limit: None,
192 rate_limit_whitelisted_ips: vec![],
193 rate_limit_trust_proxy_headers: false,
194 }
195 }
196 None => RpcConfiguration {
197 addr: None,
198 max_request_size: 0,
199 max_response_size: 0,
200 id_provider: None,
201 max_subs_per_conn: 0,
202 port: 0,
203 message_buffer_capacity: 0,
204 batch_config: RpcBatchRequestConfig::Disabled,
205 max_connections: 0,
206 cors: None,
207 methods: Default::default(),
208 rate_limit: None,
209 rate_limit_whitelisted_ips: vec![],
210 rate_limit_trust_proxy_headers: false,
211 },
212 };
213
214 Configuration {
215 impl_name: "subspace-test-node".to_string(),
216 impl_version: "0.1".to_string(),
217 role,
218 tokio_handle,
219 transaction_pool: Default::default(),
220 network: network_config,
221 keystore: KeystoreConfig::InMemory,
222 database: DatabaseSource::ParityDb {
223 path: root.join("paritydb"),
224 },
225 trie_cache_maximum_size: Some(64 * 1024 * 1024),
226 state_pruning: Default::default(),
227 blocks_pruning: BlocksPruning::KeepAll,
228 chain_spec: Box::new(spec),
229 executor: ExecutorConfiguration {
230 wasm_method: WasmExecutionMethod::Compiled {
231 instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
232 },
233 max_runtime_instances: 8,
234 default_heap_pages: None,
235 runtime_cache_size: 2,
236 },
237 wasm_runtime_overrides: Default::default(),
238 rpc: rpc_configuration,
239 prometheus_config: None,
240 telemetry_endpoints: None,
241 offchain_worker: OffchainWorkerConfig {
242 enabled: false,
243 indexing_enabled: true,
244 },
245 force_authoring,
246 disable_grandpa: false,
247 dev_key_seed: Some(key_seed),
248 tracing_targets: None,
249 tracing_receiver: Default::default(),
250 announce_block: true,
251 data_path: base_path.path().into(),
252 base_path,
253 }
254}
255
256type StorageChanges = sp_api::StorageChanges<Block>;
257
258struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
259 consensus_client: Arc<Client>,
260 consensus_backend: Arc<CBackend>,
261 executor: Arc<Executor>,
262 mock_pot_verifier: Arc<MockPotVerfier>,
263 confirmation_depth_k: BlockNumber,
264 _phantom: PhantomData<DomainBlock>,
265}
266
267impl<Client, DomainBlock, Executor, CBackend>
268 MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
269{
270 fn new(
271 consensus_client: Arc<Client>,
272 executor: Arc<Executor>,
273 mock_pot_verifier: Arc<MockPotVerfier>,
274 consensus_backend: Arc<CBackend>,
275 confirmation_depth_k: BlockNumber,
276 ) -> Self {
277 Self {
278 consensus_client,
279 consensus_backend,
280 executor,
281 mock_pot_verifier,
282 confirmation_depth_k,
283 _phantom: Default::default(),
284 }
285 }
286}
287
288#[derive(Default)]
289struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
290
291impl MockPotVerfier {
292 fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
293 self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
294 }
295
296 fn inject_pot(&self, slot: u64, pot: PotOutput) {
297 self.0.lock().insert(slot, pot);
298 }
299}
300
301impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
302 for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
303where
304 Block: BlockT,
305 Block::Hash: From<H256> + Into<H256>,
306 DomainBlock: BlockT,
307 DomainBlock::Hash: Into<H256> + From<H256>,
308 Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
309 Client::Api: DomainsApi<Block, DomainBlock::Header>
310 + BundleProducerElectionApi<Block, Balance>
311 + MessengerApi<Block, NumberFor<Block>, Block::Hash>
312 + MmrApi<Block, H256, NumberFor<Block>>,
313 Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
314 CBackend: BackendT<Block> + 'static,
315{
316 fn extensions_for(
317 &self,
318 _block_hash: Block::Hash,
319 _block_number: NumberFor<Block>,
320 ) -> Extensions {
321 let confirmation_depth_k = self.confirmation_depth_k;
322 let mut exts = Extensions::new();
323 exts.register(FraudProofExtension::new(Arc::new(
324 FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
325 self.consensus_client.clone(),
326 self.executor.clone(),
327 move |client, executor| {
328 let extension_factory =
329 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
330 client,
331 executor,
332 confirmation_depth_k,
333 );
334 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
335 },
336 ),
337 )));
338 exts.register(SubspaceMmrExtension::new(Arc::new(
339 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
340 self.consensus_client.clone(),
341 confirmation_depth_k,
342 ),
343 )));
344 exts.register(MessengerExtension::new(Arc::new(
345 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
346 self.consensus_client.clone(),
347 self.executor.clone(),
348 ),
349 )));
350
351 if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
352 let offchain_db = OffchainDb::new(offchain_storage);
353 exts.register(OffchainDbExt::new(offchain_db));
354 }
355 exts.register(PotExtension::new({
356 let client = Arc::clone(&self.consensus_client);
357 let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
358 Box::new(
359 move |parent_hash, slot, proof_of_time, _quick_verification| {
360 let parent_hash = {
361 let mut converted_parent_hash = Block::Hash::default();
362 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
363 converted_parent_hash
364 };
365
366 let parent_header = match client.header(parent_hash) {
367 Ok(Some(parent_header)) => parent_header,
368 _ => return false,
369 };
370 let parent_pre_digest = match extract_pre_digest(&parent_header) {
371 Ok(parent_pre_digest) => parent_pre_digest,
372 _ => return false,
373 };
374
375 let parent_slot = parent_pre_digest.slot();
376 if slot <= *parent_slot {
377 return false;
378 }
379
380 mock_pot_verifier.is_valid(slot, proof_of_time)
381 },
382 )
383 }));
384 exts
385 }
386}
387
388type NewSlot = (Slot, PotOutput);
389
390pub struct MockConsensusNode {
392 pub task_manager: TaskManager,
394 pub client: Arc<Client>,
396 pub backend: Arc<Backend>,
398 pub executor: RuntimeExecutor,
400 pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
402 pub select_chain: FullSelectChain,
404 pub network_service: Arc<dyn NetworkService + Send + Sync>,
406 pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
408 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
410 pub rpc_handlers: sc_service::RpcHandlers,
412 next_slot: u64,
414 mock_pot_verifier: Arc<MockPotVerfier>,
416 new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
418 acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
420 block_import: MockBlockImport<Client, Block>,
422 xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
423 mock_solution: Solution<AccountId>,
425 log_prefix: &'static str,
426 pub key: Sr25519Keyring,
428 finalize_block_depth: Option<NumberFor<Block>>,
429 base_path: BasePath,
431}
432
433pub struct MockConsensusNodeRpcConfig {
435 pub base_path: BasePath,
437 pub finalize_block_depth: Option<NumberFor<Block>>,
439 pub private_evm: bool,
441 pub evm_owner: Option<Sr25519Keyring>,
443 pub rpc_addr: Option<SocketAddr>,
445 pub rpc_port: Option<u16>,
447}
448
449impl MockConsensusNode {
450 fn run_with_configuration(
451 mut config: Configuration,
452 key: Sr25519Keyring,
453 base_path: BasePath,
454 finalize_block_depth: Option<NumberFor<Block>>,
455 rpc_builder: Box<
456 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
457 >,
458 ) -> MockConsensusNode {
459 let log_prefix = key.into();
460
461 let tx_pool_options = Options {
462 ban_time: Duration::from_millis(0),
463 ..Default::default()
464 };
465
466 config.network.node_name = format!("{} (Consensus)", config.network.node_name);
467 let span = sc_tracing::tracing::info_span!(
468 sc_tracing::logging::PREFIX_LOG_SPAN,
469 name = config.network.node_name.as_str()
470 );
471 let _enter = span.enter();
472
473 let executor = sc_service::new_wasm_executor(&config.executor);
474
475 let (client, backend, keystore_container, mut task_manager) =
476 sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
477 .expect("Fail to new full parts");
478
479 let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
480 let client = Arc::new(client);
481 let mock_pot_verifier = Arc::new(MockPotVerfier::default());
482 let chain_constants = client
483 .runtime_api()
484 .chain_constants(client.info().best_hash)
485 .expect("Fail to get chain constants");
486 client
487 .execution_extensions()
488 .set_extensions_factory(MockExtensionsFactory::<
489 _,
490 DomainBlock,
491 sc_domains::RuntimeExecutor,
492 _,
493 >::new(
494 client.clone(),
495 domain_executor.clone(),
496 Arc::clone(&mock_pot_verifier),
497 backend.clone(),
498 chain_constants.confirmation_depth_k(),
499 ));
500
501 let select_chain = sc_consensus::LongestChain::new(backend.clone());
502 let transaction_pool = Arc::from(BasicPool::new_full(
503 tx_pool_options,
504 config.role.is_authority().into(),
505 config.prometheus_registry(),
506 task_manager.spawn_essential_handle(),
507 client.clone(),
508 ));
509
510 let block_import = MockBlockImport::<_, _>::new(client.clone());
511
512 let mut net_config = sc_network::config::FullNetworkConfiguration::<
513 _,
514 _,
515 NetworkWorker<_, _>,
516 >::new(&config.network, None);
517 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
518 xdm_gossip_peers_set_config();
519 net_config.add_notification_protocol(xdm_gossip_notification_config);
520
521 let (network_service, system_rpc_tx, tx_handler_controller, sync_service) =
522 sc_service::build_network(sc_service::BuildNetworkParams {
523 config: &config,
524 net_config,
525 client: client.clone(),
526 transaction_pool: transaction_pool.clone(),
527 spawn_handle: task_manager.spawn_handle(),
528 import_queue: mock_import_queue(
529 block_import.clone(),
530 &task_manager.spawn_essential_handle(),
531 ),
532 block_announce_validator_builder: None,
533 warp_sync_config: None,
534 block_relay: None,
535 metrics: NotificationMetrics::new(None),
536 })
537 .expect("Should be able to build network");
538
539 let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
540 network: network_service.clone(),
541 client: client.clone(),
542 keystore: keystore_container.keystore(),
543 task_manager: &mut task_manager,
544 transaction_pool: transaction_pool.clone(),
545 rpc_builder: Box::new(move |_| rpc_builder()),
546 backend: backend.clone(),
547 system_rpc_tx,
548 config,
549 telemetry: None,
550 tx_handler_controller,
551 sync_service: sync_service.clone(),
552 })
553 .expect("Should be able to spawn tasks");
554
555 let mock_solution =
556 Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
557
558 let mut gossip_builder = GossipWorkerBuilder::new();
559
560 task_manager
561 .spawn_essential_handle()
562 .spawn_essential_blocking(
563 "consensus-chain-channel-update-worker",
564 None,
565 Box::pin(
566 domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
567 ChainId::Consensus,
568 client.clone(),
569 sync_service.clone(),
570 gossip_builder.gossip_msg_sink(),
571 ),
572 ),
573 );
574
575 let (consensus_msg_sink, consensus_msg_receiver) =
576 tracing_unbounded("consensus_message_channel", 100);
577
578 let consensus_listener =
579 cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
580 ChainId::Consensus,
581 client.clone(),
582 client.clone(),
583 transaction_pool.clone(),
584 network_service.clone(),
585 consensus_msg_receiver,
586 domain_executor,
587 sync_service.clone(),
588 );
589
590 task_manager
591 .spawn_essential_handle()
592 .spawn_essential_blocking(
593 "consensus-message-listener",
594 None,
595 Box::pin(consensus_listener),
596 );
597
598 gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
599
600 task_manager.spawn_essential_handle().spawn_blocking(
601 "mmr-gadget",
602 None,
603 mmr_gadget::MmrGadget::start(
604 client.clone(),
605 backend.clone(),
606 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
607 ),
608 );
609
610 MockConsensusNode {
611 task_manager,
612 client,
613 backend,
614 executor,
615 transaction_pool,
616 select_chain,
617 network_service,
618 xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
619 sync_service,
620 rpc_handlers,
621 next_slot: 1,
622 mock_pot_verifier,
623 new_slot_notification_subscribers: Vec::new(),
624 acknowledgement_sender_subscribers: Vec::new(),
625 block_import,
626 xdm_gossip_worker_builder: Some(gossip_builder),
627 mock_solution,
628 log_prefix,
629 key,
630 finalize_block_depth,
631 base_path,
632 }
633 }
634
635 pub fn run(
637 tokio_handle: tokio::runtime::Handle,
638 key: Sr25519Keyring,
639 base_path: BasePath,
640 ) -> MockConsensusNode {
641 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
642 }
643
644 pub fn run_with_private_evm(
646 tokio_handle: tokio::runtime::Handle,
647 key: Sr25519Keyring,
648 evm_owner: Option<Sr25519Keyring>,
649 base_path: BasePath,
650 ) -> MockConsensusNode {
651 Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
652 }
653
654 pub fn run_with_finalization_depth(
656 tokio_handle: tokio::runtime::Handle,
657 key: Sr25519Keyring,
658 base_path: BasePath,
659 finalize_block_depth: Option<NumberFor<Block>>,
660 private_evm: bool,
661 evm_owner: Option<Sr25519Keyring>,
662 ) -> MockConsensusNode {
663 let rpc_config = MockConsensusNodeRpcConfig {
664 base_path,
665 finalize_block_depth,
666 private_evm,
667 evm_owner,
668 rpc_addr: None,
669 rpc_port: None,
670 };
671
672 Self::run_with_rpc_builder(
673 tokio_handle,
674 key,
675 rpc_config,
676 Box::new(|| Ok(RpcModule::new(()))),
677 )
678 }
679
680 pub fn run_with_rpc_builder(
682 tokio_handle: tokio::runtime::Handle,
683 key: Sr25519Keyring,
684 rpc_config: MockConsensusNodeRpcConfig,
685 rpc_builder: Box<
686 dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
687 >,
688 ) -> MockConsensusNode {
689 let MockConsensusNodeRpcConfig {
690 base_path,
691 finalize_block_depth,
692 private_evm,
693 evm_owner,
694 rpc_addr,
695 rpc_port,
696 } = rpc_config;
697
698 let config = node_config(
699 tokio_handle,
700 key,
701 vec![],
702 false,
703 false,
704 false,
705 private_evm,
706 evm_owner.map(|key| key.to_account_id()),
707 base_path.clone(),
708 rpc_addr,
709 rpc_port,
710 );
711
712 Self::run_with_configuration(config, key, base_path, finalize_block_depth, rpc_builder)
713 }
714
715 pub fn run_with_rpc_options(
717 tokio_handle: tokio::runtime::Handle,
718 key: Sr25519Keyring,
719 rpc_config: MockConsensusNodeRpcConfig,
720 ) -> MockConsensusNode {
721 Self::run_with_rpc_builder(
722 tokio_handle,
723 key,
724 rpc_config,
725 Box::new(|| Ok(RpcModule::new(()))),
726 )
727 }
728
729 pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
731 self.xdm_gossip_worker_builder
732 .as_mut()
733 .expect("gossip message worker have not started yet")
734 }
735
736 pub fn start_cross_domain_gossip_message_worker(&mut self) {
738 let xdm_gossip_worker_builder = self
739 .xdm_gossip_worker_builder
740 .take()
741 .expect("gossip message worker have not started yet");
742 let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
743 self.network_service.clone(),
744 self.xdm_gossip_notification_service
745 .take()
746 .expect("XDM gossip notification service must be used only once"),
747 self.sync_service.clone(),
748 );
749 self.task_manager
750 .spawn_essential_handle()
751 .spawn_essential_blocking(
752 "cross-domain-gossip-message-worker",
753 None,
754 Box::pin(cross_domain_message_gossip_worker.run()),
755 );
756 }
757
758 pub fn next_slot(&self) -> u64 {
760 self.next_slot
761 }
762
763 pub fn set_next_slot(&mut self, next_slot: u64) {
765 self.next_slot = next_slot;
766 }
767
768 pub fn produce_slot(&mut self) -> NewSlot {
770 let slot = Slot::from(self.next_slot);
771 let proof_of_time = PotOutput::from(
772 <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
773 .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
774 );
775 self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
776 self.next_slot += 1;
777
778 (slot, proof_of_time)
779 }
780
781 pub async fn notify_new_slot_and_wait_for_bundle(
783 &mut self,
784 new_slot: NewSlot,
785 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
786 self.new_slot_notification_subscribers
787 .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
788
789 self.confirm_acknowledgement().await;
790 self.get_bundle_from_tx_pool(new_slot)
791 }
792
793 pub async fn produce_slot_and_wait_for_bundle_submission(
795 &mut self,
796 ) -> (
797 NewSlot,
798 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
799 ) {
800 let slot = self.produce_slot();
801 for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
802 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
803 return (slot, bundle);
804 }
805 }
806 panic!(
807 "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
808 );
809 }
810
811 pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
813 &mut self,
814 operator_id: OperatorId,
815 ) -> (
816 NewSlot,
817 OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
818 ) {
819 loop {
820 let slot = self.produce_slot();
821 if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
822 && bundle.sealed_header().proof_of_election().operator_id == operator_id
823 {
824 return (slot, bundle);
825 }
826 }
827 }
828
829 pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
831 let (tx, rx) = mpsc::unbounded();
832 self.new_slot_notification_subscribers.push(tx);
833 rx
834 }
835
836 pub fn new_acknowledgement_sender_stream(
838 &mut self,
839 ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
840 let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
841 self.acknowledgement_sender_subscribers.push(tx);
842 rx
843 }
844
845 pub async fn confirm_acknowledgement(&mut self) {
850 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
851
852 {
855 self.acknowledgement_sender_subscribers
856 .retain(|subscriber| {
857 subscriber
858 .unbounded_send(acknowledgement_sender.clone())
859 .is_ok()
860 });
861 drop(acknowledgement_sender);
862 }
863
864 while acknowledgement_receiver.next().await.is_some() {
866 }
868 }
869
870 pub async fn confirm_block_import_processed(&mut self) {
872 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
875 {
876 let value = (NumberFor::<Block>::default(), acknowledgement_sender);
880 self.block_import
881 .block_importing_notification_subscribers
882 .lock()
883 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
884 }
885 while acknowledgement_receiver.next().await.is_some() {
886 }
888
889 self.confirm_acknowledgement().await;
891 }
892
893 pub fn block_importing_notification_stream(
895 &self,
896 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
897 self.block_import.block_importing_notification_stream()
898 }
899
900 pub fn get_bundle_from_tx_pool(
902 &self,
903 new_slot: NewSlot,
904 ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
905 for ready_tx in self.transaction_pool.ready() {
906 let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
907 .expect("should be able to decode");
908 if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
909 ext.function
910 && opaque_bundle.sealed_header().slot_number() == *new_slot.0
911 {
912 return Some(opaque_bundle);
913 }
914 }
915 None
916 }
917
918 pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
920 self.transaction_pool
921 .submit_one(
922 self.client.info().best_hash,
923 TransactionSource::External,
924 tx,
925 )
926 .await
927 .map_err(|err| {
928 err.into_pool_error()
929 .expect("should always be a pool error")
930 })
931 }
932
933 pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
935 let txs: Vec<_> = self
936 .transaction_pool
937 .ready()
938 .map(|t| self.transaction_pool.hash_of(&t.data))
939 .collect();
940 self.prune_txs_from_pool(txs.as_slice()).await
941 }
942
943 pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
945 self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
946 .await
947 }
948
949 async fn prune_txs_from_pool(
950 &self,
951 tx_hashes: &[BlockHashFor<Block>],
952 ) -> Result<(), Box<dyn Error>> {
953 let hash_and_number = HashAndNumber {
954 number: self.client.info().best_number,
955 hash: self.client.info().best_hash,
956 };
957 self.transaction_pool
958 .pool()
959 .prune_known(&hash_and_number, tx_hashes);
960 tokio::time::sleep(time::Duration::from_millis(1)).await;
963 self.transaction_pool
964 .pool()
965 .validated_pool()
966 .clear_stale(&hash_and_number);
967 Ok(())
968 }
969
970 pub fn does_receipt_exist(
972 &self,
973 er_hash: BlockHashFor<DomainBlock>,
974 ) -> Result<bool, Box<dyn Error>> {
975 Ok(self
976 .client
977 .runtime_api()
978 .execution_receipt(self.client.info().best_hash, er_hash)?
979 .is_some())
980 }
981
982 pub fn get_domain_staking_summary(
984 &self,
985 domain_id: DomainId,
986 ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
987 Ok(self
988 .client
989 .runtime_api()
990 .domain_stake_summary(self.client.info().best_hash, domain_id)?)
991 }
992
993 pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
995 Ok(self
996 .client
997 .runtime_api()
998 .block_pruning_depth(self.client.info().best_hash)?)
999 }
1000
1001 pub fn wait_for_fraud_proof<FP>(
1004 &self,
1005 fraud_proof_predicate: FP,
1006 ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
1007 where
1008 FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
1009 {
1010 let tx_pool = self.transaction_pool.clone();
1011 let mut import_tx_stream = self.transaction_pool.import_notification_stream();
1012 Box::pin(async move {
1013 while let Some(ready_tx_hash) = import_tx_stream.next().await {
1014 let ready_tx = tx_pool
1015 .ready_transaction(&ready_tx_hash)
1016 .expect("Just get the ready tx hash from import stream; qed");
1017 let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
1018 &mut ready_tx.data.encode().as_slice(),
1019 )
1020 .expect("Decode tx must success");
1021 if let subspace_test_runtime::RuntimeCall::Domains(
1022 pallet_domains::Call::submit_fraud_proof { fraud_proof },
1023 ) = ext.function
1024 && fraud_proof_predicate(&fraud_proof)
1025 {
1026 return *fraud_proof;
1027 }
1028 }
1029 unreachable!()
1030 })
1031 }
1032
1033 pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
1035 self.client
1036 .runtime_api()
1037 .free_balance(self.client.info().best_hash, account_id)
1038 .expect("Fail to get account free balance")
1039 }
1040
1041 pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
1044 self.network_service.report_peer(
1047 addr.peer_id,
1048 ReputationChange::new_fatal("Peer banned by test (1)"),
1049 );
1050 self.network_service.report_peer(
1051 addr.peer_id,
1052 ReputationChange::new_fatal("Peer banned by test (2)"),
1053 );
1054 }
1055
1056 pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
1058 self.network_service.report_peer(
1061 addr.peer_id,
1062 ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
1063 );
1064 self.network_service.report_peer(
1065 addr.peer_id,
1066 ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
1067 );
1068 }
1069
1070 pub async fn stop(self) -> Result<(), std::io::Error> {
1076 let lock_file_path = self.base_path.path().join("paritydb").join("lock");
1077 std::mem::drop(self);
1079
1080 sleep(Duration::from_secs(2)).await;
1083
1084 if let Err(err) = std::fs::remove_file(lock_file_path) {
1086 tracing::error!("deleting paritydb lock file failed: {err:?}");
1087 }
1088 Ok(())
1089 }
1090}
1091
1092impl MockConsensusNode {
1093 async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
1094 self.transaction_pool
1095 .ready_at(parent_hash)
1096 .await
1097 .map(|pending_tx| pending_tx.data().as_ref().clone())
1098 .collect()
1099 }
1100
1101 async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
1102 let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1103 <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1104 ));
1105 let subspace_inherents =
1106 sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1107
1108 let inherent_data = (subspace_inherents, timestamp)
1109 .create_inherent_data()
1110 .await?;
1111
1112 Ok(inherent_data)
1113 }
1114
1115 fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1116 let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1117 slot,
1118 solution: self.mock_solution.clone(),
1119 pot_info: PreDigestPotInfo::V0 {
1120 proof_of_time: Default::default(),
1121 future_proof_of_time: Default::default(),
1122 },
1123 };
1124 let mut digest = Digest::default();
1125 digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1126 digest
1127 }
1128
1129 async fn build_block(
1131 &self,
1132 slot: Slot,
1133 parent_hash: BlockHashFor<Block>,
1134 extrinsics: Vec<ExtrinsicFor<Block>>,
1135 ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1136 let inherent_digest = self.mock_subspace_digest(slot);
1137
1138 let inherent_data = Self::mock_inherent_data(slot).await?;
1139
1140 let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1141 .on_parent_block(parent_hash)
1142 .fetch_parent_block_number(self.client.as_ref())?
1143 .with_inherent_digests(inherent_digest)
1144 .build()
1145 .expect("Creates new block builder");
1146
1147 let inherent_txns = block_builder.create_inherents(inherent_data)?;
1148
1149 for tx in inherent_txns.into_iter().chain(extrinsics) {
1150 if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1151 tracing::error!("Invalid transaction while building block: {}", err);
1152 }
1153 }
1154
1155 let (block, storage_changes, _) = block_builder.build()?.into_inner();
1156 Ok((block, storage_changes))
1157 }
1158
1159 async fn import_block(
1161 &self,
1162 block: Block,
1163 storage_changes: Option<StorageChanges>,
1164 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1165 let (header, body) = block.deconstruct();
1166
1167 let header_hash = header.hash();
1168 let header_number = header.number;
1169
1170 let block_import_params = {
1171 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1172 import_block.body = Some(body);
1173 import_block.state_action = match storage_changes {
1174 Some(changes) => {
1175 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1176 }
1177 None => StateAction::Execute,
1178 };
1179 import_block
1180 };
1181
1182 let import_result = self.block_import.import_block(block_import_params).await?;
1183
1184 if let Some(finalized_block_hash) = self
1185 .finalize_block_depth
1186 .and_then(|depth| header_number.checked_sub(depth))
1187 .and_then(|block_to_finalize| {
1188 self.client
1189 .hash(block_to_finalize)
1190 .expect("Block hash not found for number: {block_to_finalize:?}")
1191 })
1192 {
1193 self.client
1194 .finalize_block(finalized_block_hash, None, true)
1195 .unwrap();
1196 }
1197
1198 match import_result {
1199 ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1200 bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1201 }
1202 }
1203
1204 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1207 pub async fn produce_block_with_slot_at(
1208 &mut self,
1209 new_slot: NewSlot,
1210 parent_hash: BlockHashFor<Block>,
1211 maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1212 ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1213 let block_timer = time::Instant::now();
1214
1215 let extrinsics = match maybe_extrinsics {
1216 Some(extrinsics) => extrinsics,
1217 None => self.collect_txn_from_pool(parent_hash).await,
1218 };
1219 let tx_hashes: Vec<_> = extrinsics
1220 .iter()
1221 .map(|t| self.transaction_pool.hash_of(t))
1222 .collect();
1223
1224 let (block, storage_changes) = self
1225 .build_block(new_slot.0, parent_hash, extrinsics)
1226 .await?;
1227
1228 log_new_block(&block, block_timer.elapsed().as_millis());
1229
1230 let res = match self.import_block(block, Some(storage_changes)).await {
1231 Ok(hash) => {
1232 self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1235 Ok(hash)
1236 }
1237 err => err,
1238 };
1239 self.confirm_block_import_processed().await;
1240 res
1241 }
1242
1243 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1246 pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1247 self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1248 .await?;
1249 Ok(())
1250 }
1251
1252 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1254 pub async fn produce_block_with_extrinsics(
1255 &mut self,
1256 extrinsics: Vec<ExtrinsicFor<Block>>,
1257 ) -> Result<(), Box<dyn Error>> {
1258 let slot = self.produce_slot();
1259 self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1260 .await?;
1261 Ok(())
1262 }
1263
1264 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1266 pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1267 for _ in 0..n {
1268 let slot = self.produce_slot();
1269 self.produce_block_with_slot(slot).await?;
1270 }
1271 Ok(())
1272 }
1273
1274 #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1276 pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1277 for _ in 0..n {
1278 let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1279 self.produce_block_with_slot(slot).await?;
1280 }
1281 Ok(())
1282 }
1283
1284 pub fn account_nonce(&self) -> u32 {
1286 self.client
1287 .runtime_api()
1288 .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1289 .expect("Fail to get account nonce")
1290 }
1291
1292 pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1294 self.client
1295 .runtime_api()
1296 .account_nonce(self.client.info().best_hash, account_id)
1297 .expect("Fail to get account nonce")
1298 }
1299
1300 pub fn construct_extrinsic(
1302 &self,
1303 nonce: u32,
1304 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1305 ) -> UncheckedExtrinsic {
1306 construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1307 }
1308
1309 pub fn construct_unsigned_extrinsic(
1311 &self,
1312 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1313 ) -> UncheckedExtrinsic {
1314 construct_unsigned_extrinsic(&self.client, function)
1315 }
1316
1317 pub async fn construct_and_send_extrinsic_with(
1319 &self,
1320 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1321 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1322 let nonce = self.account_nonce();
1323 let extrinsic = self.construct_extrinsic(nonce, function);
1324 self.rpc_handlers.send_transaction(extrinsic.into()).await
1325 }
1326
1327 pub async fn send_extrinsic(
1329 &self,
1330 extrinsic: impl Into<OpaqueExtrinsic>,
1331 ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1332 self.rpc_handlers.send_transaction(extrinsic.into()).await
1333 }
1334}
1335
1336fn log_new_block(block: &Block, used_time_ms: u128) {
1337 tracing::info!(
1338 "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1339 block.header().number(),
1340 used_time_ms,
1341 block.header().hash(),
1342 block.header().parent_hash(),
1343 block.extrinsics().len(),
1344 block
1345 .extrinsics()
1346 .iter()
1347 .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1348 .collect::<Vec<_>>()
1349 .join(", ")
1350 );
1351}
1352
1353fn mock_import_queue<Block: BlockT, I>(
1354 block_import: I,
1355 spawner: &impl SpawnEssentialNamed,
1356) -> BasicQueue<Block>
1357where
1358 I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1359{
1360 BasicQueue::new(
1361 MockVerifier::default(),
1362 Box::new(block_import),
1363 None,
1364 spawner,
1365 None,
1366 )
1367}
1368
1369struct MockVerifier<Block> {
1370 _marker: PhantomData<Block>,
1371}
1372
1373impl<Block> Default for MockVerifier<Block> {
1374 fn default() -> Self {
1375 Self {
1376 _marker: PhantomData,
1377 }
1378 }
1379}
1380
1381#[async_trait::async_trait]
1382impl<Block> VerifierT<Block> for MockVerifier<Block>
1383where
1384 Block: BlockT,
1385{
1386 async fn verify(
1387 &self,
1388 block_params: BlockImportParams<Block>,
1389 ) -> Result<BlockImportParams<Block>, String> {
1390 Ok(block_params)
1391 }
1392}
1393
1394#[allow(clippy::type_complexity)]
1397struct MockBlockImport<Client, Block: BlockT> {
1398 inner: Arc<Client>,
1399 block_importing_notification_subscribers:
1400 Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1401}
1402
1403impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1404 fn new(inner: Arc<Client>) -> Self {
1405 MockBlockImport {
1406 inner,
1407 block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1408 }
1409 }
1410
1411 fn block_importing_notification_stream(
1413 &self,
1414 ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1415 let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1416 self.block_importing_notification_subscribers
1417 .lock()
1418 .push(tx);
1419 rx
1420 }
1421}
1422
1423impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1424 fn clone(&self) -> Self {
1425 MockBlockImport {
1426 inner: self.inner.clone(),
1427 block_importing_notification_subscribers: self
1428 .block_importing_notification_subscribers
1429 .clone(),
1430 }
1431 }
1432}
1433
1434#[async_trait::async_trait]
1435impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1436where
1437 Block: BlockT,
1438 for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1439 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1440 Client::Api: ApiExt<Block>,
1441{
1442 type Error = ConsensusError;
1443
1444 async fn import_block(
1445 &self,
1446 mut block: BlockImportParams<Block>,
1447 ) -> Result<ImportResult, Self::Error> {
1448 let block_number = *block.header.number();
1449 block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1450
1451 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1452
1453 {
1456 let value = (block_number, acknowledgement_sender);
1457 self.block_importing_notification_subscribers
1458 .lock()
1459 .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1460 }
1461
1462 while acknowledgement_receiver.next().await.is_some() {
1463 }
1465
1466 self.inner.import_block(block).await
1467 }
1468
1469 async fn check_block(
1470 &self,
1471 block: BlockCheckParams<Block>,
1472 ) -> Result<ImportResult, Self::Error> {
1473 self.inner.check_block(block).await
1474 }
1475}
1476
1477#[macro_export]
1479macro_rules! produce_blocks {
1480 ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1481 {
1482 async {
1483 let domain_fut = {
1484 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1485 futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1486 $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1487 futures::future::join_all(futs)
1488 };
1489 $primary_node.produce_blocks_with_bundles($count).await?;
1490 domain_fut.await;
1491 Ok::<(), Box<dyn std::error::Error>>(())
1492 }
1493 }
1494 };
1495}
1496
1497#[macro_export]
1500macro_rules! produce_block_with {
1501 ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1502 {
1503 async {
1504 let domain_fut = {
1505 let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1506 futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1507 $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1508 futures::future::join_all(futs)
1509 };
1510 $primary_node_produce_block.await?;
1511 domain_fut.await;
1512 Ok::<(), Box<dyn std::error::Error>>(())
1513 }
1514 }
1515 };
1516}
1517
1518#[macro_export]
1520macro_rules! produce_blocks_until {
1521 ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1522 async {
1523 while !$condition {
1524 produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1525 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1526 }
1527 Ok::<(), Box<dyn std::error::Error>>(())
1528 }
1529 };
1530}
1531
1532type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1533
1534fn get_signed_extra(
1535 current_block: u64,
1536 immortal: bool,
1537 nonce: u32,
1538 tip: BalanceOf<Runtime>,
1539) -> SignedExtra {
1540 let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1541 .checked_next_power_of_two()
1542 .map(|c| c / 2)
1543 .unwrap_or(2);
1544 (
1545 frame_system::CheckNonZeroSender::<Runtime>::new(),
1546 frame_system::CheckSpecVersion::<Runtime>::new(),
1547 frame_system::CheckTxVersion::<Runtime>::new(),
1548 frame_system::CheckGenesis::<Runtime>::new(),
1549 frame_system::CheckMortality::<Runtime>::from(if immortal {
1550 generic::Era::Immortal
1551 } else {
1552 generic::Era::mortal(period, current_block)
1553 }),
1554 frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1555 frame_system::CheckWeight::<Runtime>::new(),
1556 pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1557 BalanceTransferCheckExtension::<Runtime>::default(),
1558 pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1559 pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1560 pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1561 )
1562}
1563
1564fn construct_extrinsic_raw_payload<Client>(
1565 client: impl AsRef<Client>,
1566 function: <Runtime as frame_system::Config>::RuntimeCall,
1567 immortal: bool,
1568 nonce: u32,
1569 tip: BalanceOf<Runtime>,
1570) -> (
1571 SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1572 SignedExtra,
1573)
1574where
1575 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1576 u64: From<BlockNumberFor<Runtime>>,
1577 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1578{
1579 let current_block_hash = client.as_ref().info().best_hash;
1580 let current_block = client.as_ref().info().best_number.saturated_into();
1581 let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1582 let extra = get_signed_extra(current_block, immortal, nonce, tip);
1583 (
1584 generic::SignedPayload::<
1585 <Runtime as frame_system::Config>::RuntimeCall,
1586 SignedExtra,
1587 >::from_raw(
1588 function,
1589 extra.clone(),
1590 ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1591 ),
1592 extra,
1593 )
1594}
1595
1596pub fn construct_extrinsic_generic<Client>(
1598 client: impl AsRef<Client>,
1599 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1600 caller: Sr25519Keyring,
1601 immortal: bool,
1602 nonce: u32,
1603 tip: BalanceOf<Runtime>,
1604) -> UncheckedExtrinsic
1605where
1606 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1607 u64: From<BlockNumberFor<Runtime>>,
1608 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1609{
1610 let function = function.into();
1611 let (raw_payload, extra) =
1612 construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1613 let signature = raw_payload.using_encoded(|e| caller.sign(e));
1614 UncheckedExtrinsic::new_signed(
1615 function,
1616 MultiAddress::Id(caller.to_account_id()),
1617 Signature::Sr25519(signature),
1618 extra,
1619 )
1620}
1621
1622fn construct_unsigned_extrinsic<Client>(
1624 client: impl AsRef<Client>,
1625 function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1626) -> UncheckedExtrinsic
1627where
1628 BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1629 u64: From<BlockNumberFor<Runtime>>,
1630 Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1631{
1632 let function = function.into();
1633 let current_block = client.as_ref().info().best_number.saturated_into();
1634 let extra = get_signed_extra(current_block, true, 0, 0);
1635 UncheckedExtrinsic::new_transaction(function, extra)
1636}