subspace_test_service/
lib.rs

1// Copyright (C) 2021 Subspace Labs, Inc.
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Subspace test service only.
18
19#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34    BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41    NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44    DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45    OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, RpcEndpoint,
46    WasmExecutionMethod, WasmtimeInstantiationStrategy,
47};
48use sc_service::{BasePath, BlocksPruning, Configuration, Role, SpawnTasksParams, TaskManager};
49use sc_transaction_pool::{BasicPool, FullChainApi, Options};
50use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
51use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
52use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
53use sp_api::{ApiExt, ProvideRuntimeApi};
54use sp_blockchain::{HashAndNumber, HeaderBackend};
55use sp_consensus::{BlockOrigin, Error as ConsensusError};
56use sp_consensus_slots::Slot;
57use sp_consensus_subspace::digests::{
58    CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
59};
60use sp_consensus_subspace::{PotExtension, SubspaceApi};
61use sp_core::H256;
62use sp_core::offchain::OffchainDbExt;
63use sp_core::offchain::storage::OffchainDb;
64use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
65use sp_domains::bundle::OpaqueBundle;
66use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
67use sp_domains_fraud_proof::fraud_proof::FraudProof;
68use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
69use sp_externalities::Extensions;
70use sp_inherents::{InherentData, InherentDataProvider};
71use sp_keyring::Sr25519Keyring;
72use sp_messenger::MessengerApi;
73use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
74use sp_mmr_primitives::MmrApi;
75use sp_runtime::generic::{Digest, SignedPayload};
76use sp_runtime::traits::{
77    BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
78};
79use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
80use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
81use sp_timestamp::Timestamp;
82use std::collections::HashMap;
83use std::error::Error;
84use std::marker::PhantomData;
85use std::net::SocketAddr;
86use std::pin::Pin;
87use std::sync::Arc;
88use std::time;
89use std::time::Duration;
90use subspace_core_primitives::pot::PotOutput;
91use subspace_core_primitives::solutions::Solution;
92use subspace_core_primitives::{BlockNumber, PublicKey};
93use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
94use subspace_runtime_primitives::opaque::Block;
95use subspace_runtime_primitives::{
96    AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
97};
98use subspace_service::{FullSelectChain, RuntimeExecutor};
99use subspace_test_client::{Backend, Client, chain_spec};
100use subspace_test_primitives::OnchainStateApi;
101use subspace_test_runtime::{
102    Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
103};
104use substrate_frame_rpc_system::AccountNonceApi;
105use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
106use tokio::time::sleep;
107
108/// Helper type alias
109pub type FraudProofFor<Block, DomainBlock> =
110    FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
111
112const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
113
114/// Create a Subspace `Configuration`.
115///
116/// By default an in-memory socket will be used, therefore you need to provide boot
117/// nodes if you want the future node to be connected to other nodes.
118#[expect(clippy::too_many_arguments)]
119pub fn node_config(
120    tokio_handle: tokio::runtime::Handle,
121    key: Sr25519Keyring,
122    boot_nodes: Vec<MultiaddrWithPeerId>,
123    run_farmer: bool,
124    force_authoring: bool,
125    force_synced: bool,
126    private_evm: bool,
127    evm_owner_account: Option<AccountId>,
128    base_path: BasePath,
129    rpc_addr: Option<SocketAddr>,
130    rpc_port: Option<u16>,
131) -> Configuration {
132    let root = base_path.path();
133    let role = if run_farmer {
134        Role::Authority
135    } else {
136        Role::Full
137    };
138    let key_seed = key.to_seed();
139    let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
140
141    let mut network_config = NetworkConfiguration::new(
142        key_seed.to_string(),
143        "network/test/0.1",
144        Default::default(),
145        None,
146    );
147
148    network_config.boot_nodes = boot_nodes;
149
150    network_config.allow_non_globals_in_dht = true;
151
152    let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
153    network_config.listen_addresses.push(addr.clone());
154
155    network_config.public_addresses.push(addr);
156
157    network_config.transport = TransportConfig::MemoryOnly;
158
159    network_config.force_synced = force_synced;
160
161    let rpc_configuration = match rpc_addr {
162        Some(listen_addr) => {
163            let port = rpc_port.unwrap_or(9944);
164            RpcConfiguration {
165                addr: Some(vec![RpcEndpoint {
166                    batch_config: RpcBatchRequestConfig::Disabled,
167                    max_connections: 100,
168                    listen_addr,
169                    rpc_methods: Default::default(),
170                    rate_limit: None,
171                    rate_limit_trust_proxy_headers: false,
172                    rate_limit_whitelisted_ips: vec![],
173                    max_payload_in_mb: 15,
174                    max_payload_out_mb: 15,
175                    max_subscriptions_per_connection: 100,
176                    max_buffer_capacity_per_connection: 100,
177                    cors: None,
178                    retry_random_port: true,
179                    is_optional: false,
180                }]),
181                max_request_size: 15,
182                max_response_size: 15,
183                id_provider: None,
184                max_subs_per_conn: 1024,
185                port,
186                message_buffer_capacity: 1024,
187                batch_config: RpcBatchRequestConfig::Disabled,
188                max_connections: 1000,
189                cors: None,
190                methods: Default::default(),
191                rate_limit: None,
192                rate_limit_whitelisted_ips: vec![],
193                rate_limit_trust_proxy_headers: false,
194            }
195        }
196        None => RpcConfiguration {
197            addr: None,
198            max_request_size: 0,
199            max_response_size: 0,
200            id_provider: None,
201            max_subs_per_conn: 0,
202            port: 0,
203            message_buffer_capacity: 0,
204            batch_config: RpcBatchRequestConfig::Disabled,
205            max_connections: 0,
206            cors: None,
207            methods: Default::default(),
208            rate_limit: None,
209            rate_limit_whitelisted_ips: vec![],
210            rate_limit_trust_proxy_headers: false,
211        },
212    };
213
214    Configuration {
215        impl_name: "subspace-test-node".to_string(),
216        impl_version: "0.1".to_string(),
217        role,
218        tokio_handle,
219        transaction_pool: Default::default(),
220        network: network_config,
221        keystore: KeystoreConfig::InMemory,
222        database: DatabaseSource::ParityDb {
223            path: root.join("paritydb"),
224        },
225        trie_cache_maximum_size: Some(64 * 1024 * 1024),
226        state_pruning: Default::default(),
227        blocks_pruning: BlocksPruning::KeepAll,
228        chain_spec: Box::new(spec),
229        executor: ExecutorConfiguration {
230            wasm_method: WasmExecutionMethod::Compiled {
231                instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
232            },
233            max_runtime_instances: 8,
234            default_heap_pages: None,
235            runtime_cache_size: 2,
236        },
237        wasm_runtime_overrides: Default::default(),
238        rpc: rpc_configuration,
239        prometheus_config: None,
240        telemetry_endpoints: None,
241        offchain_worker: OffchainWorkerConfig {
242            enabled: false,
243            indexing_enabled: true,
244        },
245        force_authoring,
246        disable_grandpa: false,
247        dev_key_seed: Some(key_seed),
248        tracing_targets: None,
249        tracing_receiver: Default::default(),
250        announce_block: true,
251        data_path: base_path.path().into(),
252        base_path,
253    }
254}
255
256type StorageChanges = sp_api::StorageChanges<Block>;
257
258struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
259    consensus_client: Arc<Client>,
260    consensus_backend: Arc<CBackend>,
261    executor: Arc<Executor>,
262    mock_pot_verifier: Arc<MockPotVerfier>,
263    confirmation_depth_k: BlockNumber,
264    _phantom: PhantomData<DomainBlock>,
265}
266
267impl<Client, DomainBlock, Executor, CBackend>
268    MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
269{
270    fn new(
271        consensus_client: Arc<Client>,
272        executor: Arc<Executor>,
273        mock_pot_verifier: Arc<MockPotVerfier>,
274        consensus_backend: Arc<CBackend>,
275        confirmation_depth_k: BlockNumber,
276    ) -> Self {
277        Self {
278            consensus_client,
279            consensus_backend,
280            executor,
281            mock_pot_verifier,
282            confirmation_depth_k,
283            _phantom: Default::default(),
284        }
285    }
286}
287
288#[derive(Default)]
289struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
290
291impl MockPotVerfier {
292    fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
293        self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
294    }
295
296    fn inject_pot(&self, slot: u64, pot: PotOutput) {
297        self.0.lock().insert(slot, pot);
298    }
299}
300
301impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
302    for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
303where
304    Block: BlockT,
305    Block::Hash: From<H256> + Into<H256>,
306    DomainBlock: BlockT,
307    DomainBlock::Hash: Into<H256> + From<H256>,
308    Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
309    Client::Api: DomainsApi<Block, DomainBlock::Header>
310        + BundleProducerElectionApi<Block, Balance>
311        + MessengerApi<Block, NumberFor<Block>, Block::Hash>
312        + MmrApi<Block, H256, NumberFor<Block>>,
313    Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
314    CBackend: BackendT<Block> + 'static,
315{
316    fn extensions_for(
317        &self,
318        _block_hash: Block::Hash,
319        _block_number: NumberFor<Block>,
320    ) -> Extensions {
321        let confirmation_depth_k = self.confirmation_depth_k;
322        let mut exts = Extensions::new();
323        exts.register(FraudProofExtension::new(Arc::new(
324            FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
325                self.consensus_client.clone(),
326                self.executor.clone(),
327                move |client, executor| {
328                    let extension_factory =
329                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
330                            client,
331                            executor,
332                            confirmation_depth_k,
333                        );
334                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
335                },
336            ),
337        )));
338        exts.register(SubspaceMmrExtension::new(Arc::new(
339            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
340                self.consensus_client.clone(),
341                confirmation_depth_k,
342            ),
343        )));
344        exts.register(MessengerExtension::new(Arc::new(
345            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
346                self.consensus_client.clone(),
347                self.executor.clone(),
348            ),
349        )));
350
351        if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
352            let offchain_db = OffchainDb::new(offchain_storage);
353            exts.register(OffchainDbExt::new(offchain_db));
354        }
355        exts.register(PotExtension::new({
356            let client = Arc::clone(&self.consensus_client);
357            let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
358            Box::new(
359                move |parent_hash, slot, proof_of_time, _quick_verification| {
360                    let parent_hash = {
361                        let mut converted_parent_hash = Block::Hash::default();
362                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
363                        converted_parent_hash
364                    };
365
366                    let parent_header = match client.header(parent_hash) {
367                        Ok(Some(parent_header)) => parent_header,
368                        _ => return false,
369                    };
370                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
371                        Ok(parent_pre_digest) => parent_pre_digest,
372                        _ => return false,
373                    };
374
375                    let parent_slot = parent_pre_digest.slot();
376                    if slot <= *parent_slot {
377                        return false;
378                    }
379
380                    mock_pot_verifier.is_valid(slot, proof_of_time)
381                },
382            )
383        }));
384        exts
385    }
386}
387
388type NewSlot = (Slot, PotOutput);
389
390/// A mock Subspace consensus node instance used for testing.
391pub struct MockConsensusNode {
392    /// `TaskManager`'s instance.
393    pub task_manager: TaskManager,
394    /// Client's instance.
395    pub client: Arc<Client>,
396    /// Backend.
397    pub backend: Arc<Backend>,
398    /// Code executor.
399    pub executor: RuntimeExecutor,
400    /// Transaction pool.
401    pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
402    /// The SelectChain Strategy
403    pub select_chain: FullSelectChain,
404    /// Network service.
405    pub network_service: Arc<dyn NetworkService + Send + Sync>,
406    /// Cross-domain gossip notification service.
407    pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
408    /// Sync service.
409    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
410    /// RPC handlers.
411    pub rpc_handlers: sc_service::RpcHandlers,
412    /// The next slot number
413    next_slot: u64,
414    /// The mock pot verifier
415    mock_pot_verifier: Arc<MockPotVerfier>,
416    /// The slot notification subscribers
417    new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
418    /// The acknowledgement sender subscribers
419    acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
420    /// Block import pipeline
421    block_import: MockBlockImport<Client, Block>,
422    xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
423    /// Mock subspace solution used to mock the subspace `PreDigest`
424    mock_solution: Solution<AccountId>,
425    log_prefix: &'static str,
426    /// Ferdie key
427    pub key: Sr25519Keyring,
428    finalize_block_depth: Option<NumberFor<Block>>,
429    /// The node's base path
430    base_path: BasePath,
431}
432
433/// Configuration values required to run a mock consensus node with custom RPC options.
434pub struct MockConsensusNodeRpcConfig {
435    /// The node's base path.
436    pub base_path: BasePath,
437    /// Optional block finalization depth override.
438    pub finalize_block_depth: Option<NumberFor<Block>>,
439    /// Whether to enable the private EVM domain.
440    pub private_evm: bool,
441    /// Optional EVM owner key.
442    pub evm_owner: Option<Sr25519Keyring>,
443    /// Optional RPC listen address override.
444    pub rpc_addr: Option<SocketAddr>,
445    /// Optional RPC listen port override.
446    pub rpc_port: Option<u16>,
447}
448
449impl MockConsensusNode {
450    fn run_with_configuration(
451        mut config: Configuration,
452        key: Sr25519Keyring,
453        base_path: BasePath,
454        finalize_block_depth: Option<NumberFor<Block>>,
455        rpc_builder: Box<
456            dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
457        >,
458    ) -> MockConsensusNode {
459        let log_prefix = key.into();
460
461        let tx_pool_options = Options {
462            ban_time: Duration::from_millis(0),
463            ..Default::default()
464        };
465
466        config.network.node_name = format!("{} (Consensus)", config.network.node_name);
467        let span = sc_tracing::tracing::info_span!(
468            sc_tracing::logging::PREFIX_LOG_SPAN,
469            name = config.network.node_name.as_str()
470        );
471        let _enter = span.enter();
472
473        let executor = sc_service::new_wasm_executor(&config.executor);
474
475        let (client, backend, keystore_container, mut task_manager) =
476            sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
477                .expect("Fail to new full parts");
478
479        let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
480        let client = Arc::new(client);
481        let mock_pot_verifier = Arc::new(MockPotVerfier::default());
482        let chain_constants = client
483            .runtime_api()
484            .chain_constants(client.info().best_hash)
485            .expect("Fail to get chain constants");
486        client
487            .execution_extensions()
488            .set_extensions_factory(MockExtensionsFactory::<
489                _,
490                DomainBlock,
491                sc_domains::RuntimeExecutor,
492                _,
493            >::new(
494                client.clone(),
495                domain_executor.clone(),
496                Arc::clone(&mock_pot_verifier),
497                backend.clone(),
498                chain_constants.confirmation_depth_k(),
499            ));
500
501        let select_chain = sc_consensus::LongestChain::new(backend.clone());
502        let transaction_pool = Arc::from(BasicPool::new_full(
503            tx_pool_options,
504            config.role.is_authority().into(),
505            config.prometheus_registry(),
506            task_manager.spawn_essential_handle(),
507            client.clone(),
508        ));
509
510        let block_import = MockBlockImport::<_, _>::new(client.clone());
511
512        let mut net_config = sc_network::config::FullNetworkConfiguration::<
513            _,
514            _,
515            NetworkWorker<_, _>,
516        >::new(&config.network, None);
517        let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
518            xdm_gossip_peers_set_config();
519        net_config.add_notification_protocol(xdm_gossip_notification_config);
520
521        let (network_service, system_rpc_tx, tx_handler_controller, sync_service) =
522            sc_service::build_network(sc_service::BuildNetworkParams {
523                config: &config,
524                net_config,
525                client: client.clone(),
526                transaction_pool: transaction_pool.clone(),
527                spawn_handle: task_manager.spawn_handle(),
528                import_queue: mock_import_queue(
529                    block_import.clone(),
530                    &task_manager.spawn_essential_handle(),
531                ),
532                block_announce_validator_builder: None,
533                warp_sync_config: None,
534                block_relay: None,
535                metrics: NotificationMetrics::new(None),
536            })
537            .expect("Should be able to build network");
538
539        let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
540            network: network_service.clone(),
541            client: client.clone(),
542            keystore: keystore_container.keystore(),
543            task_manager: &mut task_manager,
544            transaction_pool: transaction_pool.clone(),
545            rpc_builder: Box::new(move |_| rpc_builder()),
546            backend: backend.clone(),
547            system_rpc_tx,
548            config,
549            telemetry: None,
550            tx_handler_controller,
551            sync_service: sync_service.clone(),
552        })
553        .expect("Should be able to spawn tasks");
554
555        let mock_solution =
556            Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
557
558        let mut gossip_builder = GossipWorkerBuilder::new();
559
560        task_manager
561            .spawn_essential_handle()
562            .spawn_essential_blocking(
563                "consensus-chain-channel-update-worker",
564                None,
565                Box::pin(
566                    domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
567                        ChainId::Consensus,
568                        client.clone(),
569                        sync_service.clone(),
570                        gossip_builder.gossip_msg_sink(),
571                    ),
572                ),
573            );
574
575        let (consensus_msg_sink, consensus_msg_receiver) =
576            tracing_unbounded("consensus_message_channel", 100);
577
578        let consensus_listener =
579            cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
580                ChainId::Consensus,
581                client.clone(),
582                client.clone(),
583                transaction_pool.clone(),
584                network_service.clone(),
585                consensus_msg_receiver,
586                domain_executor,
587                sync_service.clone(),
588            );
589
590        task_manager
591            .spawn_essential_handle()
592            .spawn_essential_blocking(
593                "consensus-message-listener",
594                None,
595                Box::pin(consensus_listener),
596            );
597
598        gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
599
600        task_manager.spawn_essential_handle().spawn_blocking(
601            "mmr-gadget",
602            None,
603            mmr_gadget::MmrGadget::start(
604                client.clone(),
605                backend.clone(),
606                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
607            ),
608        );
609
610        MockConsensusNode {
611            task_manager,
612            client,
613            backend,
614            executor,
615            transaction_pool,
616            select_chain,
617            network_service,
618            xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
619            sync_service,
620            rpc_handlers,
621            next_slot: 1,
622            mock_pot_verifier,
623            new_slot_notification_subscribers: Vec::new(),
624            acknowledgement_sender_subscribers: Vec::new(),
625            block_import,
626            xdm_gossip_worker_builder: Some(gossip_builder),
627            mock_solution,
628            log_prefix,
629            key,
630            finalize_block_depth,
631            base_path,
632        }
633    }
634
635    /// Run a mock consensus node
636    pub fn run(
637        tokio_handle: tokio::runtime::Handle,
638        key: Sr25519Keyring,
639        base_path: BasePath,
640    ) -> MockConsensusNode {
641        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
642    }
643
644    /// Run a mock consensus node with a private EVM domain
645    pub fn run_with_private_evm(
646        tokio_handle: tokio::runtime::Handle,
647        key: Sr25519Keyring,
648        evm_owner: Option<Sr25519Keyring>,
649        base_path: BasePath,
650    ) -> MockConsensusNode {
651        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
652    }
653
654    /// Run a mock consensus node with finalization depth
655    pub fn run_with_finalization_depth(
656        tokio_handle: tokio::runtime::Handle,
657        key: Sr25519Keyring,
658        base_path: BasePath,
659        finalize_block_depth: Option<NumberFor<Block>>,
660        private_evm: bool,
661        evm_owner: Option<Sr25519Keyring>,
662    ) -> MockConsensusNode {
663        let rpc_config = MockConsensusNodeRpcConfig {
664            base_path,
665            finalize_block_depth,
666            private_evm,
667            evm_owner,
668            rpc_addr: None,
669            rpc_port: None,
670        };
671
672        Self::run_with_rpc_builder(
673            tokio_handle,
674            key,
675            rpc_config,
676            Box::new(|| Ok(RpcModule::new(()))),
677        )
678    }
679
680    /// Run a mock consensus node with a custom RPC builder and RPC address/port options.
681    pub fn run_with_rpc_builder(
682        tokio_handle: tokio::runtime::Handle,
683        key: Sr25519Keyring,
684        rpc_config: MockConsensusNodeRpcConfig,
685        rpc_builder: Box<
686            dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
687        >,
688    ) -> MockConsensusNode {
689        let MockConsensusNodeRpcConfig {
690            base_path,
691            finalize_block_depth,
692            private_evm,
693            evm_owner,
694            rpc_addr,
695            rpc_port,
696        } = rpc_config;
697
698        let config = node_config(
699            tokio_handle,
700            key,
701            vec![],
702            false,
703            false,
704            false,
705            private_evm,
706            evm_owner.map(|key| key.to_account_id()),
707            base_path.clone(),
708            rpc_addr,
709            rpc_port,
710        );
711
712        Self::run_with_configuration(config, key, base_path, finalize_block_depth, rpc_builder)
713    }
714
715    /// Run a mock consensus node with RPC options using the default empty RPC module.
716    pub fn run_with_rpc_options(
717        tokio_handle: tokio::runtime::Handle,
718        key: Sr25519Keyring,
719        rpc_config: MockConsensusNodeRpcConfig,
720    ) -> MockConsensusNode {
721        Self::run_with_rpc_builder(
722            tokio_handle,
723            key,
724            rpc_config,
725            Box::new(|| Ok(RpcModule::new(()))),
726        )
727    }
728
729    /// Get the cross domain gossip message worker builder
730    pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
731        self.xdm_gossip_worker_builder
732            .as_mut()
733            .expect("gossip message worker have not started yet")
734    }
735
736    /// Start the cross domain gossip message worker.
737    pub fn start_cross_domain_gossip_message_worker(&mut self) {
738        let xdm_gossip_worker_builder = self
739            .xdm_gossip_worker_builder
740            .take()
741            .expect("gossip message worker have not started yet");
742        let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
743            self.network_service.clone(),
744            self.xdm_gossip_notification_service
745                .take()
746                .expect("XDM gossip notification service must be used only once"),
747            self.sync_service.clone(),
748        );
749        self.task_manager
750            .spawn_essential_handle()
751            .spawn_essential_blocking(
752                "cross-domain-gossip-message-worker",
753                None,
754                Box::pin(cross_domain_message_gossip_worker.run()),
755            );
756    }
757
758    /// Return the next slot number
759    pub fn next_slot(&self) -> u64 {
760        self.next_slot
761    }
762
763    /// Set the next slot number
764    pub fn set_next_slot(&mut self, next_slot: u64) {
765        self.next_slot = next_slot;
766    }
767
768    /// Produce a slot only, without waiting for the potential slot handlers.
769    pub fn produce_slot(&mut self) -> NewSlot {
770        let slot = Slot::from(self.next_slot);
771        let proof_of_time = PotOutput::from(
772            <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
773                .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
774        );
775        self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
776        self.next_slot += 1;
777
778        (slot, proof_of_time)
779    }
780
781    /// Notify the executor about the new slot and wait for the bundle produced at this slot.
782    pub async fn notify_new_slot_and_wait_for_bundle(
783        &mut self,
784        new_slot: NewSlot,
785    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
786        self.new_slot_notification_subscribers
787            .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
788
789        self.confirm_acknowledgement().await;
790        self.get_bundle_from_tx_pool(new_slot)
791    }
792
793    /// Produce a new slot and wait for a bundle produced at this slot.
794    pub async fn produce_slot_and_wait_for_bundle_submission(
795        &mut self,
796    ) -> (
797        NewSlot,
798        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
799    ) {
800        let slot = self.produce_slot();
801        for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
802            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
803                return (slot, bundle);
804            }
805        }
806        panic!(
807            "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
808        );
809    }
810
811    /// Produce a slot and wait for bundle submission from specific operator.
812    pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
813        &mut self,
814        operator_id: OperatorId,
815    ) -> (
816        NewSlot,
817        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
818    ) {
819        loop {
820            let slot = self.produce_slot();
821            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
822                && bundle.sealed_header().proof_of_election().operator_id == operator_id
823            {
824                return (slot, bundle);
825            }
826        }
827    }
828
829    /// Subscribe the new slot notification
830    pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
831        let (tx, rx) = mpsc::unbounded();
832        self.new_slot_notification_subscribers.push(tx);
833        rx
834    }
835
836    /// Subscribe the acknowledgement sender stream
837    pub fn new_acknowledgement_sender_stream(
838        &mut self,
839    ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
840        let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
841        self.acknowledgement_sender_subscribers.push(tx);
842        rx
843    }
844
845    /// Wait for all the acknowledgements before return
846    ///
847    /// It is used to wait for the acknowledgement of the domain worker to ensure it have
848    /// finish all the previous tasks before return
849    pub async fn confirm_acknowledgement(&mut self) {
850        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
851
852        // Must drop `acknowledgement_sender` after the notification otherwise the receiver
853        // will block forever as there is still a sender not closed.
854        {
855            self.acknowledgement_sender_subscribers
856                .retain(|subscriber| {
857                    subscriber
858                        .unbounded_send(acknowledgement_sender.clone())
859                        .is_ok()
860                });
861            drop(acknowledgement_sender);
862        }
863
864        // Wait for all the acknowledgements to progress and proactively drop closed subscribers.
865        while acknowledgement_receiver.next().await.is_some() {
866            // Wait for all the acknowledgements to finish.
867        }
868    }
869
870    /// Wait for the operator finish processing the consensus block before return
871    pub async fn confirm_block_import_processed(&mut self) {
872        // Send one more notification to ensure the previous consensus block import notification
873        // have been received by the operator
874        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
875        {
876            // Must drop `block_import_acknowledgement_sender` after the notification otherwise
877            // the receiver will block forever as there is still a sender not closed.
878            // NOTE: it is okay to use the default block number since it is ignored in the consumer side.
879            let value = (NumberFor::<Block>::default(), acknowledgement_sender);
880            self.block_import
881                .block_importing_notification_subscribers
882                .lock()
883                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
884        }
885        while acknowledgement_receiver.next().await.is_some() {
886            // Wait for all the acknowledgements to finish.
887        }
888
889        // Ensure the operator finish processing the consensus block
890        self.confirm_acknowledgement().await;
891    }
892
893    /// Subscribe the block importing notification
894    pub fn block_importing_notification_stream(
895        &self,
896    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
897        self.block_import.block_importing_notification_stream()
898    }
899
900    /// Get the bundle that created at `slot` from the transaction pool
901    pub fn get_bundle_from_tx_pool(
902        &self,
903        new_slot: NewSlot,
904    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
905        for ready_tx in self.transaction_pool.ready() {
906            let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
907                .expect("should be able to decode");
908            if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
909                ext.function
910                && opaque_bundle.sealed_header().slot_number() == *new_slot.0
911            {
912                return Some(opaque_bundle);
913            }
914        }
915        None
916    }
917
918    /// Submit a tx to the tx pool
919    pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
920        self.transaction_pool
921            .submit_one(
922                self.client.info().best_hash,
923                TransactionSource::External,
924                tx,
925            )
926            .await
927            .map_err(|err| {
928                err.into_pool_error()
929                    .expect("should always be a pool error")
930            })
931    }
932
933    /// Remove all tx from the tx pool
934    pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
935        let txs: Vec<_> = self
936            .transaction_pool
937            .ready()
938            .map(|t| self.transaction_pool.hash_of(&t.data))
939            .collect();
940        self.prune_txs_from_pool(txs.as_slice()).await
941    }
942
943    /// Remove a ready transaction from transaction pool.
944    pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
945        self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
946            .await
947    }
948
949    async fn prune_txs_from_pool(
950        &self,
951        tx_hashes: &[BlockHashFor<Block>],
952    ) -> Result<(), Box<dyn Error>> {
953        let hash_and_number = HashAndNumber {
954            number: self.client.info().best_number,
955            hash: self.client.info().best_hash,
956        };
957        self.transaction_pool
958            .pool()
959            .prune_known(&hash_and_number, tx_hashes);
960        // `ban_time` have set to 0, explicitly wait 1ms here to ensure `clear_stale` will remove
961        // all the bans as the ban time must be passed.
962        tokio::time::sleep(time::Duration::from_millis(1)).await;
963        self.transaction_pool
964            .pool()
965            .validated_pool()
966            .clear_stale(&hash_and_number);
967        Ok(())
968    }
969
970    /// Return if the given ER exist in the consensus state
971    pub fn does_receipt_exist(
972        &self,
973        er_hash: BlockHashFor<DomainBlock>,
974    ) -> Result<bool, Box<dyn Error>> {
975        Ok(self
976            .client
977            .runtime_api()
978            .execution_receipt(self.client.info().best_hash, er_hash)?
979            .is_some())
980    }
981
982    /// Returns the stake summary of the Domain.
983    pub fn get_domain_staking_summary(
984        &self,
985        domain_id: DomainId,
986    ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
987        Ok(self
988            .client
989            .runtime_api()
990            .domain_stake_summary(self.client.info().best_hash, domain_id)?)
991    }
992
993    /// Returns the domain block pruning depth.
994    pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
995        Ok(self
996            .client
997            .runtime_api()
998            .block_pruning_depth(self.client.info().best_hash)?)
999    }
1000
1001    /// Return a future that only resolve if a fraud proof that the given `fraud_proof_predicate`
1002    /// return true is submitted to the consensus tx pool
1003    pub fn wait_for_fraud_proof<FP>(
1004        &self,
1005        fraud_proof_predicate: FP,
1006    ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
1007    where
1008        FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
1009    {
1010        let tx_pool = self.transaction_pool.clone();
1011        let mut import_tx_stream = self.transaction_pool.import_notification_stream();
1012        Box::pin(async move {
1013            while let Some(ready_tx_hash) = import_tx_stream.next().await {
1014                let ready_tx = tx_pool
1015                    .ready_transaction(&ready_tx_hash)
1016                    .expect("Just get the ready tx hash from import stream; qed");
1017                let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
1018                    &mut ready_tx.data.encode().as_slice(),
1019                )
1020                .expect("Decode tx must success");
1021                if let subspace_test_runtime::RuntimeCall::Domains(
1022                    pallet_domains::Call::submit_fraud_proof { fraud_proof },
1023                ) = ext.function
1024                    && fraud_proof_predicate(&fraud_proof)
1025                {
1026                    return *fraud_proof;
1027                }
1028            }
1029            unreachable!()
1030        })
1031    }
1032
1033    /// Get the free balance of the given account
1034    pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
1035        self.client
1036            .runtime_api()
1037            .free_balance(self.client.info().best_hash, account_id)
1038            .expect("Fail to get account free balance")
1039    }
1040
1041    /// Give the peer at `addr` the minimum reputation, which will ban it.
1042    // TODO: also ban/unban in the DSN
1043    pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
1044        // If unban_peer() has been called on the peer, we need to bump it twice
1045        // to give it the minimal reputation.
1046        self.network_service.report_peer(
1047            addr.peer_id,
1048            ReputationChange::new_fatal("Peer banned by test (1)"),
1049        );
1050        self.network_service.report_peer(
1051            addr.peer_id,
1052            ReputationChange::new_fatal("Peer banned by test (2)"),
1053        );
1054    }
1055
1056    /// Give the peer at `addr` a high reputation, which guarantees it is un-banned it.
1057    pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
1058        // If ReputationChange::new_fatal() has been called on the peer, we need to bump it twice
1059        // to give it a positive reputation.
1060        self.network_service.report_peer(
1061            addr.peer_id,
1062            ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
1063        );
1064        self.network_service.report_peer(
1065            addr.peer_id,
1066            ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
1067        );
1068    }
1069
1070    /// Take and stop the `MockConsensusNode` and delete its database lock file.
1071    ///
1072    /// Stopping and restarting a node can cause weird race conditions, with errors like:
1073    /// "The system cannot find the path specified".
1074    /// If this happens, try increasing the wait time in this method.
1075    pub async fn stop(self) -> Result<(), std::io::Error> {
1076        let lock_file_path = self.base_path.path().join("paritydb").join("lock");
1077        // On Windows, sometimes open files can’t be deleted so `drop` first then delete
1078        std::mem::drop(self);
1079
1080        // Give the node time to cleanup, exit, and release the lock file.
1081        // TODO: fix the underlying issue or wait for the actual shutdown instead
1082        sleep(Duration::from_secs(2)).await;
1083
1084        // The lock file already being deleted is not a fatal test error, so just log it
1085        if let Err(err) = std::fs::remove_file(lock_file_path) {
1086            tracing::error!("deleting paritydb lock file failed: {err:?}");
1087        }
1088        Ok(())
1089    }
1090}
1091
1092impl MockConsensusNode {
1093    async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
1094        self.transaction_pool
1095            .ready_at(parent_hash)
1096            .await
1097            .map(|pending_tx| pending_tx.data().as_ref().clone())
1098            .collect()
1099    }
1100
1101    async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
1102        let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1103            <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1104        ));
1105        let subspace_inherents =
1106            sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1107
1108        let inherent_data = (subspace_inherents, timestamp)
1109            .create_inherent_data()
1110            .await?;
1111
1112        Ok(inherent_data)
1113    }
1114
1115    fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1116        let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1117            slot,
1118            solution: self.mock_solution.clone(),
1119            pot_info: PreDigestPotInfo::V0 {
1120                proof_of_time: Default::default(),
1121                future_proof_of_time: Default::default(),
1122            },
1123        };
1124        let mut digest = Digest::default();
1125        digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1126        digest
1127    }
1128
1129    /// Build block
1130    async fn build_block(
1131        &self,
1132        slot: Slot,
1133        parent_hash: BlockHashFor<Block>,
1134        extrinsics: Vec<ExtrinsicFor<Block>>,
1135    ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1136        let inherent_digest = self.mock_subspace_digest(slot);
1137
1138        let inherent_data = Self::mock_inherent_data(slot).await?;
1139
1140        let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1141            .on_parent_block(parent_hash)
1142            .fetch_parent_block_number(self.client.as_ref())?
1143            .with_inherent_digests(inherent_digest)
1144            .build()
1145            .expect("Creates new block builder");
1146
1147        let inherent_txns = block_builder.create_inherents(inherent_data)?;
1148
1149        for tx in inherent_txns.into_iter().chain(extrinsics) {
1150            if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1151                tracing::error!("Invalid transaction while building block: {}", err);
1152            }
1153        }
1154
1155        let (block, storage_changes, _) = block_builder.build()?.into_inner();
1156        Ok((block, storage_changes))
1157    }
1158
1159    /// Import block
1160    async fn import_block(
1161        &self,
1162        block: Block,
1163        storage_changes: Option<StorageChanges>,
1164    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1165        let (header, body) = block.deconstruct();
1166
1167        let header_hash = header.hash();
1168        let header_number = header.number;
1169
1170        let block_import_params = {
1171            let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1172            import_block.body = Some(body);
1173            import_block.state_action = match storage_changes {
1174                Some(changes) => {
1175                    StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1176                }
1177                None => StateAction::Execute,
1178            };
1179            import_block
1180        };
1181
1182        let import_result = self.block_import.import_block(block_import_params).await?;
1183
1184        if let Some(finalized_block_hash) = self
1185            .finalize_block_depth
1186            .and_then(|depth| header_number.checked_sub(depth))
1187            .and_then(|block_to_finalize| {
1188                self.client
1189                    .hash(block_to_finalize)
1190                    .expect("Block hash not found for number: {block_to_finalize:?}")
1191            })
1192        {
1193            self.client
1194                .finalize_block(finalized_block_hash, None, true)
1195                .unwrap();
1196        }
1197
1198        match import_result {
1199            ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1200            bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1201        }
1202    }
1203
1204    /// Produce a new block with the slot on top of `parent_hash`, with optional
1205    /// specified extrinsic list.
1206    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1207    pub async fn produce_block_with_slot_at(
1208        &mut self,
1209        new_slot: NewSlot,
1210        parent_hash: BlockHashFor<Block>,
1211        maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1212    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1213        let block_timer = time::Instant::now();
1214
1215        let extrinsics = match maybe_extrinsics {
1216            Some(extrinsics) => extrinsics,
1217            None => self.collect_txn_from_pool(parent_hash).await,
1218        };
1219        let tx_hashes: Vec<_> = extrinsics
1220            .iter()
1221            .map(|t| self.transaction_pool.hash_of(t))
1222            .collect();
1223
1224        let (block, storage_changes) = self
1225            .build_block(new_slot.0, parent_hash, extrinsics)
1226            .await?;
1227
1228        log_new_block(&block, block_timer.elapsed().as_millis());
1229
1230        let res = match self.import_block(block, Some(storage_changes)).await {
1231            Ok(hash) => {
1232                // Remove the tx of the imported block from the tx pool, so we don't re-include
1233                // them in future blocks by accident.
1234                self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1235                Ok(hash)
1236            }
1237            err => err,
1238        };
1239        self.confirm_block_import_processed().await;
1240        res
1241    }
1242
1243    /// Produce a new block on top of the current best block, with the extrinsics collected from
1244    /// the transaction pool.
1245    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1246    pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1247        self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1248            .await?;
1249        Ok(())
1250    }
1251
1252    /// Produce a new block on top of the current best block, with the specified extrinsics.
1253    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1254    pub async fn produce_block_with_extrinsics(
1255        &mut self,
1256        extrinsics: Vec<ExtrinsicFor<Block>>,
1257    ) -> Result<(), Box<dyn Error>> {
1258        let slot = self.produce_slot();
1259        self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1260            .await?;
1261        Ok(())
1262    }
1263
1264    /// Produce `n` number of blocks.
1265    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1266    pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1267        for _ in 0..n {
1268            let slot = self.produce_slot();
1269            self.produce_block_with_slot(slot).await?;
1270        }
1271        Ok(())
1272    }
1273
1274    /// Produce `n` number of blocks and wait for bundle submitted to the block.
1275    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1276    pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1277        for _ in 0..n {
1278            let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1279            self.produce_block_with_slot(slot).await?;
1280        }
1281        Ok(())
1282    }
1283
1284    /// Get the nonce of the node account
1285    pub fn account_nonce(&self) -> u32 {
1286        self.client
1287            .runtime_api()
1288            .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1289            .expect("Fail to get account nonce")
1290    }
1291
1292    /// Get the nonce of the given account
1293    pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1294        self.client
1295            .runtime_api()
1296            .account_nonce(self.client.info().best_hash, account_id)
1297            .expect("Fail to get account nonce")
1298    }
1299
1300    /// Construct an extrinsic.
1301    pub fn construct_extrinsic(
1302        &self,
1303        nonce: u32,
1304        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1305    ) -> UncheckedExtrinsic {
1306        construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1307    }
1308
1309    /// Construct an unsigned general extrinsic.
1310    pub fn construct_unsigned_extrinsic(
1311        &self,
1312        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1313    ) -> UncheckedExtrinsic {
1314        construct_unsigned_extrinsic(&self.client, function)
1315    }
1316
1317    /// Construct and send extrinsic through rpc
1318    pub async fn construct_and_send_extrinsic_with(
1319        &self,
1320        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1321    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1322        let nonce = self.account_nonce();
1323        let extrinsic = self.construct_extrinsic(nonce, function);
1324        self.rpc_handlers.send_transaction(extrinsic.into()).await
1325    }
1326
1327    /// Get the nonce of the given account
1328    pub async fn send_extrinsic(
1329        &self,
1330        extrinsic: impl Into<OpaqueExtrinsic>,
1331    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1332        self.rpc_handlers.send_transaction(extrinsic.into()).await
1333    }
1334}
1335
1336fn log_new_block(block: &Block, used_time_ms: u128) {
1337    tracing::info!(
1338        "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1339        block.header().number(),
1340        used_time_ms,
1341        block.header().hash(),
1342        block.header().parent_hash(),
1343        block.extrinsics().len(),
1344        block
1345            .extrinsics()
1346            .iter()
1347            .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1348            .collect::<Vec<_>>()
1349            .join(", ")
1350    );
1351}
1352
1353fn mock_import_queue<Block: BlockT, I>(
1354    block_import: I,
1355    spawner: &impl SpawnEssentialNamed,
1356) -> BasicQueue<Block>
1357where
1358    I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1359{
1360    BasicQueue::new(
1361        MockVerifier::default(),
1362        Box::new(block_import),
1363        None,
1364        spawner,
1365        None,
1366    )
1367}
1368
1369struct MockVerifier<Block> {
1370    _marker: PhantomData<Block>,
1371}
1372
1373impl<Block> Default for MockVerifier<Block> {
1374    fn default() -> Self {
1375        Self {
1376            _marker: PhantomData,
1377        }
1378    }
1379}
1380
1381#[async_trait::async_trait]
1382impl<Block> VerifierT<Block> for MockVerifier<Block>
1383where
1384    Block: BlockT,
1385{
1386    async fn verify(
1387        &self,
1388        block_params: BlockImportParams<Block>,
1389    ) -> Result<BlockImportParams<Block>, String> {
1390        Ok(block_params)
1391    }
1392}
1393
1394// `MockBlockImport` is mostly port from `sc-consensus-subspace::SubspaceBlockImport` with all
1395// the consensus related logic removed.
1396#[allow(clippy::type_complexity)]
1397struct MockBlockImport<Client, Block: BlockT> {
1398    inner: Arc<Client>,
1399    block_importing_notification_subscribers:
1400        Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1401}
1402
1403impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1404    fn new(inner: Arc<Client>) -> Self {
1405        MockBlockImport {
1406            inner,
1407            block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1408        }
1409    }
1410
1411    // Subscribe the block importing notification
1412    fn block_importing_notification_stream(
1413        &self,
1414    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1415        let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1416        self.block_importing_notification_subscribers
1417            .lock()
1418            .push(tx);
1419        rx
1420    }
1421}
1422
1423impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1424    fn clone(&self) -> Self {
1425        MockBlockImport {
1426            inner: self.inner.clone(),
1427            block_importing_notification_subscribers: self
1428                .block_importing_notification_subscribers
1429                .clone(),
1430        }
1431    }
1432}
1433
1434#[async_trait::async_trait]
1435impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1436where
1437    Block: BlockT,
1438    for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1439    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1440    Client::Api: ApiExt<Block>,
1441{
1442    type Error = ConsensusError;
1443
1444    async fn import_block(
1445        &self,
1446        mut block: BlockImportParams<Block>,
1447    ) -> Result<ImportResult, Self::Error> {
1448        let block_number = *block.header.number();
1449        block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1450
1451        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1452
1453        // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver
1454        // will block forever as there is still a sender not closed.
1455        {
1456            let value = (block_number, acknowledgement_sender);
1457            self.block_importing_notification_subscribers
1458                .lock()
1459                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1460        }
1461
1462        while acknowledgement_receiver.next().await.is_some() {
1463            // Wait for all the acknowledgements to finish.
1464        }
1465
1466        self.inner.import_block(block).await
1467    }
1468
1469    async fn check_block(
1470        &self,
1471        block: BlockCheckParams<Block>,
1472    ) -> Result<ImportResult, Self::Error> {
1473        self.inner.check_block(block).await
1474    }
1475}
1476
1477/// Produce the given number of blocks for both the primary node and the domain nodes
1478#[macro_export]
1479macro_rules! produce_blocks {
1480    ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1481        {
1482            async {
1483                let domain_fut = {
1484                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1485                    futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1486                    $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1487                    futures::future::join_all(futs)
1488                };
1489                $primary_node.produce_blocks_with_bundles($count).await?;
1490                domain_fut.await;
1491                Ok::<(), Box<dyn std::error::Error>>(())
1492            }
1493        }
1494    };
1495}
1496
1497/// Producing one block for both the primary node and the domain nodes, where the primary node can
1498/// use the `produce_block_with_xxx` function (i.e. `produce_block_with_slot`) to produce block
1499#[macro_export]
1500macro_rules! produce_block_with {
1501    ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1502        {
1503            async {
1504                let domain_fut = {
1505                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1506                    futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1507                    $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1508                    futures::future::join_all(futs)
1509                };
1510                $primary_node_produce_block.await?;
1511                domain_fut.await;
1512                Ok::<(), Box<dyn std::error::Error>>(())
1513            }
1514        }
1515    };
1516}
1517
1518/// Keep producing block with a fixed interval until the given condition become `true`
1519#[macro_export]
1520macro_rules! produce_blocks_until {
1521    ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1522        async {
1523            while !$condition {
1524                produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1525                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1526            }
1527            Ok::<(), Box<dyn std::error::Error>>(())
1528        }
1529    };
1530}
1531
1532type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1533
1534fn get_signed_extra(
1535    current_block: u64,
1536    immortal: bool,
1537    nonce: u32,
1538    tip: BalanceOf<Runtime>,
1539) -> SignedExtra {
1540    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1541        .checked_next_power_of_two()
1542        .map(|c| c / 2)
1543        .unwrap_or(2);
1544    (
1545        frame_system::CheckNonZeroSender::<Runtime>::new(),
1546        frame_system::CheckSpecVersion::<Runtime>::new(),
1547        frame_system::CheckTxVersion::<Runtime>::new(),
1548        frame_system::CheckGenesis::<Runtime>::new(),
1549        frame_system::CheckMortality::<Runtime>::from(if immortal {
1550            generic::Era::Immortal
1551        } else {
1552            generic::Era::mortal(period, current_block)
1553        }),
1554        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1555        frame_system::CheckWeight::<Runtime>::new(),
1556        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1557        BalanceTransferCheckExtension::<Runtime>::default(),
1558        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1559        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1560        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1561    )
1562}
1563
1564fn construct_extrinsic_raw_payload<Client>(
1565    client: impl AsRef<Client>,
1566    function: <Runtime as frame_system::Config>::RuntimeCall,
1567    immortal: bool,
1568    nonce: u32,
1569    tip: BalanceOf<Runtime>,
1570) -> (
1571    SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1572    SignedExtra,
1573)
1574where
1575    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1576    u64: From<BlockNumberFor<Runtime>>,
1577    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1578{
1579    let current_block_hash = client.as_ref().info().best_hash;
1580    let current_block = client.as_ref().info().best_number.saturated_into();
1581    let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1582    let extra = get_signed_extra(current_block, immortal, nonce, tip);
1583    (
1584        generic::SignedPayload::<
1585            <Runtime as frame_system::Config>::RuntimeCall,
1586            SignedExtra,
1587        >::from_raw(
1588            function,
1589            extra.clone(),
1590            ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1591        ),
1592        extra,
1593    )
1594}
1595
1596/// Construct an extrinsic that can be applied to the test runtime.
1597pub fn construct_extrinsic_generic<Client>(
1598    client: impl AsRef<Client>,
1599    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1600    caller: Sr25519Keyring,
1601    immortal: bool,
1602    nonce: u32,
1603    tip: BalanceOf<Runtime>,
1604) -> UncheckedExtrinsic
1605where
1606    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1607    u64: From<BlockNumberFor<Runtime>>,
1608    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1609{
1610    let function = function.into();
1611    let (raw_payload, extra) =
1612        construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1613    let signature = raw_payload.using_encoded(|e| caller.sign(e));
1614    UncheckedExtrinsic::new_signed(
1615        function,
1616        MultiAddress::Id(caller.to_account_id()),
1617        Signature::Sr25519(signature),
1618        extra,
1619    )
1620}
1621
1622/// Construct a general unsigned extrinsic that can be applied to the test runtime.
1623fn construct_unsigned_extrinsic<Client>(
1624    client: impl AsRef<Client>,
1625    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1626) -> UncheckedExtrinsic
1627where
1628    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1629    u64: From<BlockNumberFor<Runtime>>,
1630    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1631{
1632    let function = function.into();
1633    let current_block = client.as_ref().info().best_number.saturated_into();
1634    let extra = get_signed_extra(current_block, true, 0, 0);
1635    UncheckedExtrinsic::new_transaction(function, extra)
1636}