subspace_test_service/
lib.rs

1// Copyright (C) 2021 Subspace Labs, Inc.
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Subspace test service only.
18
19#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34    BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41    NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44    DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45    OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, RpcEndpoint,
46    WasmExecutionMethod, WasmtimeInstantiationStrategy,
47};
48use sc_service::{
49    BasePath, BlocksPruning, Configuration, NetworkStarter, Role, SpawnTasksParams, TaskManager,
50};
51use sc_transaction_pool::{BasicPool, FullChainApi, Options};
52use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
53use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
54use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
55use sp_api::{ApiExt, ProvideRuntimeApi};
56use sp_blockchain::{HashAndNumber, HeaderBackend};
57use sp_consensus::{BlockOrigin, Error as ConsensusError};
58use sp_consensus_slots::Slot;
59use sp_consensus_subspace::digests::{
60    CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
61};
62use sp_consensus_subspace::{PotExtension, SubspaceApi};
63use sp_core::H256;
64use sp_core::offchain::OffchainDbExt;
65use sp_core::offchain::storage::OffchainDb;
66use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
67use sp_domains::bundle::OpaqueBundle;
68use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
69use sp_domains_fraud_proof::fraud_proof::FraudProof;
70use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
71use sp_externalities::Extensions;
72use sp_inherents::{InherentData, InherentDataProvider};
73use sp_keyring::Sr25519Keyring;
74use sp_messenger::MessengerApi;
75use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
76use sp_mmr_primitives::MmrApi;
77use sp_runtime::generic::{Digest, SignedPayload};
78use sp_runtime::traits::{
79    BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
80};
81use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
82use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
83use sp_timestamp::Timestamp;
84use std::collections::HashMap;
85use std::error::Error;
86use std::marker::PhantomData;
87use std::net::SocketAddr;
88use std::pin::Pin;
89use std::sync::Arc;
90use std::time;
91use std::time::Duration;
92use subspace_core_primitives::pot::PotOutput;
93use subspace_core_primitives::solutions::Solution;
94use subspace_core_primitives::{BlockNumber, PublicKey};
95use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
96use subspace_runtime_primitives::opaque::Block;
97use subspace_runtime_primitives::{
98    AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
99};
100use subspace_service::{FullSelectChain, RuntimeExecutor};
101use subspace_test_client::{Backend, Client, chain_spec};
102use subspace_test_primitives::OnchainStateApi;
103use subspace_test_runtime::{
104    Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
105};
106use substrate_frame_rpc_system::AccountNonceApi;
107use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
108use tokio::time::sleep;
109
110/// Helper type alias
111pub type FraudProofFor<Block, DomainBlock> =
112    FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
113
114const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
115
116/// Create a Subspace `Configuration`.
117///
118/// By default an in-memory socket will be used, therefore you need to provide boot
119/// nodes if you want the future node to be connected to other nodes.
120#[expect(clippy::too_many_arguments)]
121pub fn node_config(
122    tokio_handle: tokio::runtime::Handle,
123    key: Sr25519Keyring,
124    boot_nodes: Vec<MultiaddrWithPeerId>,
125    run_farmer: bool,
126    force_authoring: bool,
127    force_synced: bool,
128    private_evm: bool,
129    evm_owner_account: Option<AccountId>,
130    base_path: BasePath,
131    rpc_addr: Option<SocketAddr>,
132    rpc_port: Option<u16>,
133) -> Configuration {
134    let root = base_path.path();
135    let role = if run_farmer {
136        Role::Authority
137    } else {
138        Role::Full
139    };
140    let key_seed = key.to_seed();
141    let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
142
143    let mut network_config = NetworkConfiguration::new(
144        key_seed.to_string(),
145        "network/test/0.1",
146        Default::default(),
147        None,
148    );
149
150    network_config.boot_nodes = boot_nodes;
151
152    network_config.allow_non_globals_in_dht = true;
153
154    let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
155    network_config.listen_addresses.push(addr.clone());
156
157    network_config.public_addresses.push(addr);
158
159    network_config.transport = TransportConfig::MemoryOnly;
160
161    network_config.force_synced = force_synced;
162
163    let rpc_configuration = match rpc_addr {
164        Some(listen_addr) => {
165            let port = rpc_port.unwrap_or(9944);
166            RpcConfiguration {
167                addr: Some(vec![RpcEndpoint {
168                    batch_config: RpcBatchRequestConfig::Disabled,
169                    max_connections: 100,
170                    listen_addr,
171                    rpc_methods: Default::default(),
172                    rate_limit: None,
173                    rate_limit_trust_proxy_headers: false,
174                    rate_limit_whitelisted_ips: vec![],
175                    max_payload_in_mb: 15,
176                    max_payload_out_mb: 15,
177                    max_subscriptions_per_connection: 100,
178                    max_buffer_capacity_per_connection: 100,
179                    cors: None,
180                    retry_random_port: true,
181                    is_optional: false,
182                }]),
183                max_request_size: 15,
184                max_response_size: 15,
185                id_provider: None,
186                max_subs_per_conn: 1024,
187                port,
188                message_buffer_capacity: 1024,
189                batch_config: RpcBatchRequestConfig::Disabled,
190                max_connections: 1000,
191                cors: None,
192                methods: Default::default(),
193                rate_limit: None,
194                rate_limit_whitelisted_ips: vec![],
195                rate_limit_trust_proxy_headers: false,
196            }
197        }
198        None => RpcConfiguration {
199            addr: None,
200            max_request_size: 0,
201            max_response_size: 0,
202            id_provider: None,
203            max_subs_per_conn: 0,
204            port: 0,
205            message_buffer_capacity: 0,
206            batch_config: RpcBatchRequestConfig::Disabled,
207            max_connections: 0,
208            cors: None,
209            methods: Default::default(),
210            rate_limit: None,
211            rate_limit_whitelisted_ips: vec![],
212            rate_limit_trust_proxy_headers: false,
213        },
214    };
215
216    Configuration {
217        impl_name: "subspace-test-node".to_string(),
218        impl_version: "0.1".to_string(),
219        role,
220        tokio_handle,
221        transaction_pool: Default::default(),
222        network: network_config,
223        keystore: KeystoreConfig::InMemory,
224        database: DatabaseSource::ParityDb {
225            path: root.join("paritydb"),
226        },
227        trie_cache_maximum_size: Some(64 * 1024 * 1024),
228        state_pruning: Default::default(),
229        blocks_pruning: BlocksPruning::KeepAll,
230        chain_spec: Box::new(spec),
231        executor: ExecutorConfiguration {
232            wasm_method: WasmExecutionMethod::Compiled {
233                instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
234            },
235            max_runtime_instances: 8,
236            default_heap_pages: None,
237            runtime_cache_size: 2,
238        },
239        wasm_runtime_overrides: Default::default(),
240        rpc: rpc_configuration,
241        prometheus_config: None,
242        telemetry_endpoints: None,
243        offchain_worker: OffchainWorkerConfig {
244            enabled: false,
245            indexing_enabled: true,
246        },
247        force_authoring,
248        disable_grandpa: false,
249        dev_key_seed: Some(key_seed),
250        tracing_targets: None,
251        tracing_receiver: Default::default(),
252        announce_block: true,
253        data_path: base_path.path().into(),
254        base_path,
255    }
256}
257
258type StorageChanges = sp_api::StorageChanges<Block>;
259
260struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
261    consensus_client: Arc<Client>,
262    consensus_backend: Arc<CBackend>,
263    executor: Arc<Executor>,
264    mock_pot_verifier: Arc<MockPotVerfier>,
265    confirmation_depth_k: BlockNumber,
266    _phantom: PhantomData<DomainBlock>,
267}
268
269impl<Client, DomainBlock, Executor, CBackend>
270    MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
271{
272    fn new(
273        consensus_client: Arc<Client>,
274        executor: Arc<Executor>,
275        mock_pot_verifier: Arc<MockPotVerfier>,
276        consensus_backend: Arc<CBackend>,
277        confirmation_depth_k: BlockNumber,
278    ) -> Self {
279        Self {
280            consensus_client,
281            consensus_backend,
282            executor,
283            mock_pot_verifier,
284            confirmation_depth_k,
285            _phantom: Default::default(),
286        }
287    }
288}
289
290#[derive(Default)]
291struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
292
293impl MockPotVerfier {
294    fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
295        self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
296    }
297
298    fn inject_pot(&self, slot: u64, pot: PotOutput) {
299        self.0.lock().insert(slot, pot);
300    }
301}
302
303impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
304    for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
305where
306    Block: BlockT,
307    Block::Hash: From<H256> + Into<H256>,
308    DomainBlock: BlockT,
309    DomainBlock::Hash: Into<H256> + From<H256>,
310    Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
311    Client::Api: DomainsApi<Block, DomainBlock::Header>
312        + BundleProducerElectionApi<Block, Balance>
313        + MessengerApi<Block, NumberFor<Block>, Block::Hash>
314        + MmrApi<Block, H256, NumberFor<Block>>,
315    Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
316    CBackend: BackendT<Block> + 'static,
317{
318    fn extensions_for(
319        &self,
320        _block_hash: Block::Hash,
321        _block_number: NumberFor<Block>,
322    ) -> Extensions {
323        let confirmation_depth_k = self.confirmation_depth_k;
324        let mut exts = Extensions::new();
325        exts.register(FraudProofExtension::new(Arc::new(
326            FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
327                self.consensus_client.clone(),
328                self.executor.clone(),
329                move |client, executor| {
330                    let extension_factory =
331                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
332                            client,
333                            executor,
334                            confirmation_depth_k,
335                        );
336                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
337                },
338            ),
339        )));
340        exts.register(SubspaceMmrExtension::new(Arc::new(
341            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
342                self.consensus_client.clone(),
343                confirmation_depth_k,
344            ),
345        )));
346        exts.register(MessengerExtension::new(Arc::new(
347            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
348                self.consensus_client.clone(),
349                self.executor.clone(),
350            ),
351        )));
352
353        if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
354            let offchain_db = OffchainDb::new(offchain_storage);
355            exts.register(OffchainDbExt::new(offchain_db));
356        }
357        exts.register(PotExtension::new({
358            let client = Arc::clone(&self.consensus_client);
359            let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
360            Box::new(
361                move |parent_hash, slot, proof_of_time, _quick_verification| {
362                    let parent_hash = {
363                        let mut converted_parent_hash = Block::Hash::default();
364                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
365                        converted_parent_hash
366                    };
367
368                    let parent_header = match client.header(parent_hash) {
369                        Ok(Some(parent_header)) => parent_header,
370                        _ => return false,
371                    };
372                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
373                        Ok(parent_pre_digest) => parent_pre_digest,
374                        _ => return false,
375                    };
376
377                    let parent_slot = parent_pre_digest.slot();
378                    if slot <= *parent_slot {
379                        return false;
380                    }
381
382                    mock_pot_verifier.is_valid(slot, proof_of_time)
383                },
384            )
385        }));
386        exts
387    }
388}
389
390type NewSlot = (Slot, PotOutput);
391
392/// A mock Subspace consensus node instance used for testing.
393pub struct MockConsensusNode {
394    /// `TaskManager`'s instance.
395    pub task_manager: TaskManager,
396    /// Client's instance.
397    pub client: Arc<Client>,
398    /// Backend.
399    pub backend: Arc<Backend>,
400    /// Code executor.
401    pub executor: RuntimeExecutor,
402    /// Transaction pool.
403    pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
404    /// The SelectChain Strategy
405    pub select_chain: FullSelectChain,
406    /// Network service.
407    pub network_service: Arc<dyn NetworkService + Send + Sync>,
408    /// Cross-domain gossip notification service.
409    pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
410    /// Sync service.
411    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
412    /// RPC handlers.
413    pub rpc_handlers: sc_service::RpcHandlers,
414    /// Network starter
415    pub network_starter: Option<NetworkStarter>,
416    /// The next slot number
417    next_slot: u64,
418    /// The mock pot verifier
419    mock_pot_verifier: Arc<MockPotVerfier>,
420    /// The slot notification subscribers
421    new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
422    /// The acknowledgement sender subscribers
423    acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
424    /// Block import pipeline
425    block_import: MockBlockImport<Client, Block>,
426    xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
427    /// Mock subspace solution used to mock the subspace `PreDigest`
428    mock_solution: Solution<AccountId>,
429    log_prefix: &'static str,
430    /// Ferdie key
431    pub key: Sr25519Keyring,
432    finalize_block_depth: Option<NumberFor<Block>>,
433    /// The node's base path
434    base_path: BasePath,
435}
436
437/// Configuration values required to run a mock consensus node with custom RPC options.
438pub struct MockConsensusNodeRpcConfig {
439    /// The node's base path.
440    pub base_path: BasePath,
441    /// Optional block finalization depth override.
442    pub finalize_block_depth: Option<NumberFor<Block>>,
443    /// Whether to enable the private EVM domain.
444    pub private_evm: bool,
445    /// Optional EVM owner key.
446    pub evm_owner: Option<Sr25519Keyring>,
447    /// Optional RPC listen address override.
448    pub rpc_addr: Option<SocketAddr>,
449    /// Optional RPC listen port override.
450    pub rpc_port: Option<u16>,
451}
452
453impl MockConsensusNode {
454    fn run_with_configuration(
455        mut config: Configuration,
456        key: Sr25519Keyring,
457        base_path: BasePath,
458        finalize_block_depth: Option<NumberFor<Block>>,
459        rpc_builder: Box<
460            dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
461        >,
462    ) -> MockConsensusNode {
463        let log_prefix = key.into();
464
465        let tx_pool_options = Options {
466            ban_time: Duration::from_millis(0),
467            ..Default::default()
468        };
469
470        config.network.node_name = format!("{} (Consensus)", config.network.node_name);
471        let span = sc_tracing::tracing::info_span!(
472            sc_tracing::logging::PREFIX_LOG_SPAN,
473            name = config.network.node_name.as_str()
474        );
475        let _enter = span.enter();
476
477        let executor = sc_service::new_wasm_executor(&config.executor);
478
479        let (client, backend, keystore_container, mut task_manager) =
480            sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
481                .expect("Fail to new full parts");
482
483        let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
484        let client = Arc::new(client);
485        let mock_pot_verifier = Arc::new(MockPotVerfier::default());
486        let chain_constants = client
487            .runtime_api()
488            .chain_constants(client.info().best_hash)
489            .expect("Fail to get chain constants");
490        client
491            .execution_extensions()
492            .set_extensions_factory(MockExtensionsFactory::<
493                _,
494                DomainBlock,
495                sc_domains::RuntimeExecutor,
496                _,
497            >::new(
498                client.clone(),
499                domain_executor.clone(),
500                Arc::clone(&mock_pot_verifier),
501                backend.clone(),
502                chain_constants.confirmation_depth_k(),
503            ));
504
505        let select_chain = sc_consensus::LongestChain::new(backend.clone());
506        let transaction_pool = Arc::from(BasicPool::new_full(
507            tx_pool_options,
508            config.role.is_authority().into(),
509            config.prometheus_registry(),
510            task_manager.spawn_essential_handle(),
511            client.clone(),
512        ));
513
514        let block_import = MockBlockImport::<_, _>::new(client.clone());
515
516        let mut net_config = sc_network::config::FullNetworkConfiguration::<
517            _,
518            _,
519            NetworkWorker<_, _>,
520        >::new(&config.network, None);
521        let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
522            xdm_gossip_peers_set_config();
523        net_config.add_notification_protocol(xdm_gossip_notification_config);
524
525        let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
526            sc_service::build_network(sc_service::BuildNetworkParams {
527                config: &config,
528                net_config,
529                client: client.clone(),
530                transaction_pool: transaction_pool.clone(),
531                spawn_handle: task_manager.spawn_handle(),
532                import_queue: mock_import_queue(
533                    block_import.clone(),
534                    &task_manager.spawn_essential_handle(),
535                ),
536                block_announce_validator_builder: None,
537                warp_sync_config: None,
538                block_relay: None,
539                metrics: NotificationMetrics::new(None),
540            })
541            .expect("Should be able to build network");
542
543        let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
544            network: network_service.clone(),
545            client: client.clone(),
546            keystore: keystore_container.keystore(),
547            task_manager: &mut task_manager,
548            transaction_pool: transaction_pool.clone(),
549            rpc_builder: Box::new(move |_| rpc_builder()),
550            backend: backend.clone(),
551            system_rpc_tx,
552            config,
553            telemetry: None,
554            tx_handler_controller,
555            sync_service: sync_service.clone(),
556        })
557        .expect("Should be able to spawn tasks");
558
559        let mock_solution =
560            Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
561
562        let mut gossip_builder = GossipWorkerBuilder::new();
563
564        task_manager
565            .spawn_essential_handle()
566            .spawn_essential_blocking(
567                "consensus-chain-channel-update-worker",
568                None,
569                Box::pin(
570                    domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
571                        ChainId::Consensus,
572                        client.clone(),
573                        sync_service.clone(),
574                        gossip_builder.gossip_msg_sink(),
575                    ),
576                ),
577            );
578
579        let (consensus_msg_sink, consensus_msg_receiver) =
580            tracing_unbounded("consensus_message_channel", 100);
581
582        let consensus_listener =
583            cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
584                ChainId::Consensus,
585                client.clone(),
586                client.clone(),
587                transaction_pool.clone(),
588                network_service.clone(),
589                consensus_msg_receiver,
590                domain_executor,
591                sync_service.clone(),
592            );
593
594        task_manager
595            .spawn_essential_handle()
596            .spawn_essential_blocking(
597                "consensus-message-listener",
598                None,
599                Box::pin(consensus_listener),
600            );
601
602        gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
603
604        task_manager.spawn_essential_handle().spawn_blocking(
605            "mmr-gadget",
606            None,
607            mmr_gadget::MmrGadget::start(
608                client.clone(),
609                backend.clone(),
610                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
611            ),
612        );
613
614        MockConsensusNode {
615            task_manager,
616            client,
617            backend,
618            executor,
619            transaction_pool,
620            select_chain,
621            network_service,
622            xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
623            sync_service,
624            rpc_handlers,
625            network_starter: Some(network_starter),
626            next_slot: 1,
627            mock_pot_verifier,
628            new_slot_notification_subscribers: Vec::new(),
629            acknowledgement_sender_subscribers: Vec::new(),
630            block_import,
631            xdm_gossip_worker_builder: Some(gossip_builder),
632            mock_solution,
633            log_prefix,
634            key,
635            finalize_block_depth,
636            base_path,
637        }
638    }
639
640    /// Run a mock consensus node
641    pub fn run(
642        tokio_handle: tokio::runtime::Handle,
643        key: Sr25519Keyring,
644        base_path: BasePath,
645    ) -> MockConsensusNode {
646        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
647    }
648
649    /// Run a mock consensus node with a private EVM domain
650    pub fn run_with_private_evm(
651        tokio_handle: tokio::runtime::Handle,
652        key: Sr25519Keyring,
653        evm_owner: Option<Sr25519Keyring>,
654        base_path: BasePath,
655    ) -> MockConsensusNode {
656        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
657    }
658
659    /// Run a mock consensus node with finalization depth
660    pub fn run_with_finalization_depth(
661        tokio_handle: tokio::runtime::Handle,
662        key: Sr25519Keyring,
663        base_path: BasePath,
664        finalize_block_depth: Option<NumberFor<Block>>,
665        private_evm: bool,
666        evm_owner: Option<Sr25519Keyring>,
667    ) -> MockConsensusNode {
668        let rpc_config = MockConsensusNodeRpcConfig {
669            base_path,
670            finalize_block_depth,
671            private_evm,
672            evm_owner,
673            rpc_addr: None,
674            rpc_port: None,
675        };
676
677        Self::run_with_rpc_builder(
678            tokio_handle,
679            key,
680            rpc_config,
681            Box::new(|| Ok(RpcModule::new(()))),
682        )
683    }
684
685    /// Run a mock consensus node with a custom RPC builder and RPC address/port options.
686    pub fn run_with_rpc_builder(
687        tokio_handle: tokio::runtime::Handle,
688        key: Sr25519Keyring,
689        rpc_config: MockConsensusNodeRpcConfig,
690        rpc_builder: Box<
691            dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
692        >,
693    ) -> MockConsensusNode {
694        let MockConsensusNodeRpcConfig {
695            base_path,
696            finalize_block_depth,
697            private_evm,
698            evm_owner,
699            rpc_addr,
700            rpc_port,
701        } = rpc_config;
702
703        let config = node_config(
704            tokio_handle,
705            key,
706            vec![],
707            false,
708            false,
709            false,
710            private_evm,
711            evm_owner.map(|key| key.to_account_id()),
712            base_path.clone(),
713            rpc_addr,
714            rpc_port,
715        );
716
717        Self::run_with_configuration(config, key, base_path, finalize_block_depth, rpc_builder)
718    }
719
720    /// Run a mock consensus node with RPC options using the default empty RPC module.
721    pub fn run_with_rpc_options(
722        tokio_handle: tokio::runtime::Handle,
723        key: Sr25519Keyring,
724        rpc_config: MockConsensusNodeRpcConfig,
725    ) -> MockConsensusNode {
726        Self::run_with_rpc_builder(
727            tokio_handle,
728            key,
729            rpc_config,
730            Box::new(|| Ok(RpcModule::new(()))),
731        )
732    }
733
734    /// Start the mock consensus node network
735    pub fn start_network(&mut self) {
736        self.network_starter
737            .take()
738            .expect("mock consensus node network have not started yet")
739            .start_network();
740    }
741
742    /// Get the cross domain gossip message worker builder
743    pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
744        self.xdm_gossip_worker_builder
745            .as_mut()
746            .expect("gossip message worker have not started yet")
747    }
748
749    /// Start the cross domain gossip message worker.
750    pub fn start_cross_domain_gossip_message_worker(&mut self) {
751        let xdm_gossip_worker_builder = self
752            .xdm_gossip_worker_builder
753            .take()
754            .expect("gossip message worker have not started yet");
755        let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
756            self.network_service.clone(),
757            self.xdm_gossip_notification_service
758                .take()
759                .expect("XDM gossip notification service must be used only once"),
760            self.sync_service.clone(),
761        );
762        self.task_manager
763            .spawn_essential_handle()
764            .spawn_essential_blocking(
765                "cross-domain-gossip-message-worker",
766                None,
767                Box::pin(cross_domain_message_gossip_worker.run()),
768            );
769    }
770
771    /// Return the next slot number
772    pub fn next_slot(&self) -> u64 {
773        self.next_slot
774    }
775
776    /// Set the next slot number
777    pub fn set_next_slot(&mut self, next_slot: u64) {
778        self.next_slot = next_slot;
779    }
780
781    /// Produce a slot only, without waiting for the potential slot handlers.
782    pub fn produce_slot(&mut self) -> NewSlot {
783        let slot = Slot::from(self.next_slot);
784        let proof_of_time = PotOutput::from(
785            <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
786                .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
787        );
788        self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
789        self.next_slot += 1;
790
791        (slot, proof_of_time)
792    }
793
794    /// Notify the executor about the new slot and wait for the bundle produced at this slot.
795    pub async fn notify_new_slot_and_wait_for_bundle(
796        &mut self,
797        new_slot: NewSlot,
798    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
799        self.new_slot_notification_subscribers
800            .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
801
802        self.confirm_acknowledgement().await;
803        self.get_bundle_from_tx_pool(new_slot)
804    }
805
806    /// Produce a new slot and wait for a bundle produced at this slot.
807    pub async fn produce_slot_and_wait_for_bundle_submission(
808        &mut self,
809    ) -> (
810        NewSlot,
811        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
812    ) {
813        let slot = self.produce_slot();
814        for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
815            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
816                return (slot, bundle);
817            }
818        }
819        panic!(
820            "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
821        );
822    }
823
824    /// Produce a slot and wait for bundle submission from specific operator.
825    pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
826        &mut self,
827        operator_id: OperatorId,
828    ) -> (
829        NewSlot,
830        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
831    ) {
832        loop {
833            let slot = self.produce_slot();
834            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
835                && bundle.sealed_header().proof_of_election().operator_id == operator_id
836            {
837                return (slot, bundle);
838            }
839        }
840    }
841
842    /// Subscribe the new slot notification
843    pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
844        let (tx, rx) = mpsc::unbounded();
845        self.new_slot_notification_subscribers.push(tx);
846        rx
847    }
848
849    /// Subscribe the acknowledgement sender stream
850    pub fn new_acknowledgement_sender_stream(
851        &mut self,
852    ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
853        let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
854        self.acknowledgement_sender_subscribers.push(tx);
855        rx
856    }
857
858    /// Wait for all the acknowledgements before return
859    ///
860    /// It is used to wait for the acknowledgement of the domain worker to ensure it have
861    /// finish all the previous tasks before return
862    pub async fn confirm_acknowledgement(&mut self) {
863        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
864
865        // Must drop `acknowledgement_sender` after the notification otherwise the receiver
866        // will block forever as there is still a sender not closed.
867        {
868            self.acknowledgement_sender_subscribers
869                .retain(|subscriber| {
870                    subscriber
871                        .unbounded_send(acknowledgement_sender.clone())
872                        .is_ok()
873                });
874            drop(acknowledgement_sender);
875        }
876
877        // Wait for all the acknowledgements to progress and proactively drop closed subscribers.
878        while acknowledgement_receiver.next().await.is_some() {
879            // Wait for all the acknowledgements to finish.
880        }
881    }
882
883    /// Wait for the operator finish processing the consensus block before return
884    pub async fn confirm_block_import_processed(&mut self) {
885        // Send one more notification to ensure the previous consensus block import notification
886        // have been received by the operator
887        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
888        {
889            // Must drop `block_import_acknowledgement_sender` after the notification otherwise
890            // the receiver will block forever as there is still a sender not closed.
891            // NOTE: it is okay to use the default block number since it is ignored in the consumer side.
892            let value = (NumberFor::<Block>::default(), acknowledgement_sender);
893            self.block_import
894                .block_importing_notification_subscribers
895                .lock()
896                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
897        }
898        while acknowledgement_receiver.next().await.is_some() {
899            // Wait for all the acknowledgements to finish.
900        }
901
902        // Ensure the operator finish processing the consensus block
903        self.confirm_acknowledgement().await;
904    }
905
906    /// Subscribe the block importing notification
907    pub fn block_importing_notification_stream(
908        &self,
909    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
910        self.block_import.block_importing_notification_stream()
911    }
912
913    /// Get the bundle that created at `slot` from the transaction pool
914    pub fn get_bundle_from_tx_pool(
915        &self,
916        new_slot: NewSlot,
917    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
918        for ready_tx in self.transaction_pool.ready() {
919            let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
920                .expect("should be able to decode");
921            if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
922                ext.function
923                && opaque_bundle.sealed_header().slot_number() == *new_slot.0
924            {
925                return Some(opaque_bundle);
926            }
927        }
928        None
929    }
930
931    /// Submit a tx to the tx pool
932    pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
933        self.transaction_pool
934            .submit_one(
935                self.client.info().best_hash,
936                TransactionSource::External,
937                tx,
938            )
939            .await
940            .map_err(|err| {
941                err.into_pool_error()
942                    .expect("should always be a pool error")
943            })
944    }
945
946    /// Remove all tx from the tx pool
947    pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
948        let txs: Vec<_> = self
949            .transaction_pool
950            .ready()
951            .map(|t| self.transaction_pool.hash_of(&t.data))
952            .collect();
953        self.prune_txs_from_pool(txs.as_slice()).await
954    }
955
956    /// Remove a ready transaction from transaction pool.
957    pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
958        self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
959            .await
960    }
961
962    async fn prune_txs_from_pool(
963        &self,
964        tx_hashes: &[BlockHashFor<Block>],
965    ) -> Result<(), Box<dyn Error>> {
966        let hash_and_number = HashAndNumber {
967            number: self.client.info().best_number,
968            hash: self.client.info().best_hash,
969        };
970        self.transaction_pool
971            .pool()
972            .prune_known(&hash_and_number, tx_hashes);
973        // `ban_time` have set to 0, explicitly wait 1ms here to ensure `clear_stale` will remove
974        // all the bans as the ban time must be passed.
975        tokio::time::sleep(time::Duration::from_millis(1)).await;
976        self.transaction_pool
977            .pool()
978            .validated_pool()
979            .clear_stale(&hash_and_number);
980        Ok(())
981    }
982
983    /// Return if the given ER exist in the consensus state
984    pub fn does_receipt_exist(
985        &self,
986        er_hash: BlockHashFor<DomainBlock>,
987    ) -> Result<bool, Box<dyn Error>> {
988        Ok(self
989            .client
990            .runtime_api()
991            .execution_receipt(self.client.info().best_hash, er_hash)?
992            .is_some())
993    }
994
995    /// Returns the stake summary of the Domain.
996    pub fn get_domain_staking_summary(
997        &self,
998        domain_id: DomainId,
999    ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
1000        Ok(self
1001            .client
1002            .runtime_api()
1003            .domain_stake_summary(self.client.info().best_hash, domain_id)?)
1004    }
1005
1006    /// Returns the domain block pruning depth.
1007    pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
1008        Ok(self
1009            .client
1010            .runtime_api()
1011            .block_pruning_depth(self.client.info().best_hash)?)
1012    }
1013
1014    /// Return a future that only resolve if a fraud proof that the given `fraud_proof_predicate`
1015    /// return true is submitted to the consensus tx pool
1016    pub fn wait_for_fraud_proof<FP>(
1017        &self,
1018        fraud_proof_predicate: FP,
1019    ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
1020    where
1021        FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
1022    {
1023        let tx_pool = self.transaction_pool.clone();
1024        let mut import_tx_stream = self.transaction_pool.import_notification_stream();
1025        Box::pin(async move {
1026            while let Some(ready_tx_hash) = import_tx_stream.next().await {
1027                let ready_tx = tx_pool
1028                    .ready_transaction(&ready_tx_hash)
1029                    .expect("Just get the ready tx hash from import stream; qed");
1030                let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
1031                    &mut ready_tx.data.encode().as_slice(),
1032                )
1033                .expect("Decode tx must success");
1034                if let subspace_test_runtime::RuntimeCall::Domains(
1035                    pallet_domains::Call::submit_fraud_proof { fraud_proof },
1036                ) = ext.function
1037                    && fraud_proof_predicate(&fraud_proof)
1038                {
1039                    return *fraud_proof;
1040                }
1041            }
1042            unreachable!()
1043        })
1044    }
1045
1046    /// Get the free balance of the given account
1047    pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
1048        self.client
1049            .runtime_api()
1050            .free_balance(self.client.info().best_hash, account_id)
1051            .expect("Fail to get account free balance")
1052    }
1053
1054    /// Give the peer at `addr` the minimum reputation, which will ban it.
1055    // TODO: also ban/unban in the DSN
1056    pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
1057        // If unban_peer() has been called on the peer, we need to bump it twice
1058        // to give it the minimal reputation.
1059        self.network_service.report_peer(
1060            addr.peer_id,
1061            ReputationChange::new_fatal("Peer banned by test (1)"),
1062        );
1063        self.network_service.report_peer(
1064            addr.peer_id,
1065            ReputationChange::new_fatal("Peer banned by test (2)"),
1066        );
1067    }
1068
1069    /// Give the peer at `addr` a high reputation, which guarantees it is un-banned it.
1070    pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
1071        // If ReputationChange::new_fatal() has been called on the peer, we need to bump it twice
1072        // to give it a positive reputation.
1073        self.network_service.report_peer(
1074            addr.peer_id,
1075            ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
1076        );
1077        self.network_service.report_peer(
1078            addr.peer_id,
1079            ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
1080        );
1081    }
1082
1083    /// Take and stop the `MockConsensusNode` and delete its database lock file.
1084    ///
1085    /// Stopping and restarting a node can cause weird race conditions, with errors like:
1086    /// "The system cannot find the path specified".
1087    /// If this happens, try increasing the wait time in this method.
1088    pub async fn stop(self) -> Result<(), std::io::Error> {
1089        let lock_file_path = self.base_path.path().join("paritydb").join("lock");
1090        // On Windows, sometimes open files can’t be deleted so `drop` first then delete
1091        std::mem::drop(self);
1092
1093        // Give the node time to cleanup, exit, and release the lock file.
1094        // TODO: fix the underlying issue or wait for the actual shutdown instead
1095        sleep(Duration::from_secs(2)).await;
1096
1097        // The lock file already being deleted is not a fatal test error, so just log it
1098        if let Err(err) = std::fs::remove_file(lock_file_path) {
1099            tracing::error!("deleting paritydb lock file failed: {err:?}");
1100        }
1101        Ok(())
1102    }
1103}
1104
1105impl MockConsensusNode {
1106    async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
1107        self.transaction_pool
1108            .ready_at(parent_hash)
1109            .await
1110            .map(|pending_tx| pending_tx.data().as_ref().clone())
1111            .collect()
1112    }
1113
1114    async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
1115        let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1116            <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1117        ));
1118        let subspace_inherents =
1119            sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1120
1121        let inherent_data = (subspace_inherents, timestamp)
1122            .create_inherent_data()
1123            .await?;
1124
1125        Ok(inherent_data)
1126    }
1127
1128    fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1129        let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1130            slot,
1131            solution: self.mock_solution.clone(),
1132            pot_info: PreDigestPotInfo::V0 {
1133                proof_of_time: Default::default(),
1134                future_proof_of_time: Default::default(),
1135            },
1136        };
1137        let mut digest = Digest::default();
1138        digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1139        digest
1140    }
1141
1142    /// Build block
1143    async fn build_block(
1144        &self,
1145        slot: Slot,
1146        parent_hash: BlockHashFor<Block>,
1147        extrinsics: Vec<ExtrinsicFor<Block>>,
1148    ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1149        let inherent_digest = self.mock_subspace_digest(slot);
1150
1151        let inherent_data = Self::mock_inherent_data(slot).await?;
1152
1153        let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1154            .on_parent_block(parent_hash)
1155            .fetch_parent_block_number(self.client.as_ref())?
1156            .with_inherent_digests(inherent_digest)
1157            .build()
1158            .expect("Creates new block builder");
1159
1160        let inherent_txns = block_builder.create_inherents(inherent_data)?;
1161
1162        for tx in inherent_txns.into_iter().chain(extrinsics) {
1163            if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1164                tracing::error!("Invalid transaction while building block: {}", err);
1165            }
1166        }
1167
1168        let (block, storage_changes, _) = block_builder.build()?.into_inner();
1169        Ok((block, storage_changes))
1170    }
1171
1172    /// Import block
1173    async fn import_block(
1174        &self,
1175        block: Block,
1176        storage_changes: Option<StorageChanges>,
1177    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1178        let (header, body) = block.deconstruct();
1179
1180        let header_hash = header.hash();
1181        let header_number = header.number;
1182
1183        let block_import_params = {
1184            let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1185            import_block.body = Some(body);
1186            import_block.state_action = match storage_changes {
1187                Some(changes) => {
1188                    StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1189                }
1190                None => StateAction::Execute,
1191            };
1192            import_block
1193        };
1194
1195        let import_result = self.block_import.import_block(block_import_params).await?;
1196
1197        if let Some(finalized_block_hash) = self
1198            .finalize_block_depth
1199            .and_then(|depth| header_number.checked_sub(depth))
1200            .and_then(|block_to_finalize| {
1201                self.client
1202                    .hash(block_to_finalize)
1203                    .expect("Block hash not found for number: {block_to_finalize:?}")
1204            })
1205        {
1206            self.client
1207                .finalize_block(finalized_block_hash, None, true)
1208                .unwrap();
1209        }
1210
1211        match import_result {
1212            ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1213            bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1214        }
1215    }
1216
1217    /// Produce a new block with the slot on top of `parent_hash`, with optional
1218    /// specified extrinsic list.
1219    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1220    pub async fn produce_block_with_slot_at(
1221        &mut self,
1222        new_slot: NewSlot,
1223        parent_hash: BlockHashFor<Block>,
1224        maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1225    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1226        let block_timer = time::Instant::now();
1227
1228        let extrinsics = match maybe_extrinsics {
1229            Some(extrinsics) => extrinsics,
1230            None => self.collect_txn_from_pool(parent_hash).await,
1231        };
1232        let tx_hashes: Vec<_> = extrinsics
1233            .iter()
1234            .map(|t| self.transaction_pool.hash_of(t))
1235            .collect();
1236
1237        let (block, storage_changes) = self
1238            .build_block(new_slot.0, parent_hash, extrinsics)
1239            .await?;
1240
1241        log_new_block(&block, block_timer.elapsed().as_millis());
1242
1243        let res = match self.import_block(block, Some(storage_changes)).await {
1244            Ok(hash) => {
1245                // Remove the tx of the imported block from the tx pool, so we don't re-include
1246                // them in future blocks by accident.
1247                self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1248                Ok(hash)
1249            }
1250            err => err,
1251        };
1252        self.confirm_block_import_processed().await;
1253        res
1254    }
1255
1256    /// Produce a new block on top of the current best block, with the extrinsics collected from
1257    /// the transaction pool.
1258    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1259    pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1260        self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1261            .await?;
1262        Ok(())
1263    }
1264
1265    /// Produce a new block on top of the current best block, with the specified extrinsics.
1266    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1267    pub async fn produce_block_with_extrinsics(
1268        &mut self,
1269        extrinsics: Vec<ExtrinsicFor<Block>>,
1270    ) -> Result<(), Box<dyn Error>> {
1271        let slot = self.produce_slot();
1272        self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1273            .await?;
1274        Ok(())
1275    }
1276
1277    /// Produce `n` number of blocks.
1278    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1279    pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1280        for _ in 0..n {
1281            let slot = self.produce_slot();
1282            self.produce_block_with_slot(slot).await?;
1283        }
1284        Ok(())
1285    }
1286
1287    /// Produce `n` number of blocks and wait for bundle submitted to the block.
1288    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1289    pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1290        for _ in 0..n {
1291            let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1292            self.produce_block_with_slot(slot).await?;
1293        }
1294        Ok(())
1295    }
1296
1297    /// Get the nonce of the node account
1298    pub fn account_nonce(&self) -> u32 {
1299        self.client
1300            .runtime_api()
1301            .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1302            .expect("Fail to get account nonce")
1303    }
1304
1305    /// Get the nonce of the given account
1306    pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1307        self.client
1308            .runtime_api()
1309            .account_nonce(self.client.info().best_hash, account_id)
1310            .expect("Fail to get account nonce")
1311    }
1312
1313    /// Construct an extrinsic.
1314    pub fn construct_extrinsic(
1315        &self,
1316        nonce: u32,
1317        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1318    ) -> UncheckedExtrinsic {
1319        construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1320    }
1321
1322    /// Construct an unsigned general extrinsic.
1323    pub fn construct_unsigned_extrinsic(
1324        &self,
1325        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1326    ) -> UncheckedExtrinsic {
1327        construct_unsigned_extrinsic(&self.client, function)
1328    }
1329
1330    /// Construct and send extrinsic through rpc
1331    pub async fn construct_and_send_extrinsic_with(
1332        &self,
1333        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1334    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1335        let nonce = self.account_nonce();
1336        let extrinsic = self.construct_extrinsic(nonce, function);
1337        self.rpc_handlers.send_transaction(extrinsic.into()).await
1338    }
1339
1340    /// Get the nonce of the given account
1341    pub async fn send_extrinsic(
1342        &self,
1343        extrinsic: impl Into<OpaqueExtrinsic>,
1344    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1345        self.rpc_handlers.send_transaction(extrinsic.into()).await
1346    }
1347}
1348
1349fn log_new_block(block: &Block, used_time_ms: u128) {
1350    tracing::info!(
1351        "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1352        block.header().number(),
1353        used_time_ms,
1354        block.header().hash(),
1355        block.header().parent_hash(),
1356        block.extrinsics().len(),
1357        block
1358            .extrinsics()
1359            .iter()
1360            .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1361            .collect::<Vec<_>>()
1362            .join(", ")
1363    );
1364}
1365
1366fn mock_import_queue<Block: BlockT, I>(
1367    block_import: I,
1368    spawner: &impl SpawnEssentialNamed,
1369) -> BasicQueue<Block>
1370where
1371    I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1372{
1373    BasicQueue::new(
1374        MockVerifier::default(),
1375        Box::new(block_import),
1376        None,
1377        spawner,
1378        None,
1379    )
1380}
1381
1382struct MockVerifier<Block> {
1383    _marker: PhantomData<Block>,
1384}
1385
1386impl<Block> Default for MockVerifier<Block> {
1387    fn default() -> Self {
1388        Self {
1389            _marker: PhantomData,
1390        }
1391    }
1392}
1393
1394#[async_trait::async_trait]
1395impl<Block> VerifierT<Block> for MockVerifier<Block>
1396where
1397    Block: BlockT,
1398{
1399    async fn verify(
1400        &self,
1401        block_params: BlockImportParams<Block>,
1402    ) -> Result<BlockImportParams<Block>, String> {
1403        Ok(block_params)
1404    }
1405}
1406
1407// `MockBlockImport` is mostly port from `sc-consensus-subspace::SubspaceBlockImport` with all
1408// the consensus related logic removed.
1409#[allow(clippy::type_complexity)]
1410struct MockBlockImport<Client, Block: BlockT> {
1411    inner: Arc<Client>,
1412    block_importing_notification_subscribers:
1413        Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1414}
1415
1416impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1417    fn new(inner: Arc<Client>) -> Self {
1418        MockBlockImport {
1419            inner,
1420            block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1421        }
1422    }
1423
1424    // Subscribe the block importing notification
1425    fn block_importing_notification_stream(
1426        &self,
1427    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1428        let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1429        self.block_importing_notification_subscribers
1430            .lock()
1431            .push(tx);
1432        rx
1433    }
1434}
1435
1436impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1437    fn clone(&self) -> Self {
1438        MockBlockImport {
1439            inner: self.inner.clone(),
1440            block_importing_notification_subscribers: self
1441                .block_importing_notification_subscribers
1442                .clone(),
1443        }
1444    }
1445}
1446
1447#[async_trait::async_trait]
1448impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1449where
1450    Block: BlockT,
1451    for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1452    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1453    Client::Api: ApiExt<Block>,
1454{
1455    type Error = ConsensusError;
1456
1457    async fn import_block(
1458        &self,
1459        mut block: BlockImportParams<Block>,
1460    ) -> Result<ImportResult, Self::Error> {
1461        let block_number = *block.header.number();
1462        block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1463
1464        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1465
1466        // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver
1467        // will block forever as there is still a sender not closed.
1468        {
1469            let value = (block_number, acknowledgement_sender);
1470            self.block_importing_notification_subscribers
1471                .lock()
1472                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1473        }
1474
1475        while acknowledgement_receiver.next().await.is_some() {
1476            // Wait for all the acknowledgements to finish.
1477        }
1478
1479        self.inner.import_block(block).await
1480    }
1481
1482    async fn check_block(
1483        &self,
1484        block: BlockCheckParams<Block>,
1485    ) -> Result<ImportResult, Self::Error> {
1486        self.inner.check_block(block).await
1487    }
1488}
1489
1490/// Produce the given number of blocks for both the primary node and the domain nodes
1491#[macro_export]
1492macro_rules! produce_blocks {
1493    ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1494        {
1495            async {
1496                let domain_fut = {
1497                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1498                    futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1499                    $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1500                    futures::future::join_all(futs)
1501                };
1502                $primary_node.produce_blocks_with_bundles($count).await?;
1503                domain_fut.await;
1504                Ok::<(), Box<dyn std::error::Error>>(())
1505            }
1506        }
1507    };
1508}
1509
1510/// Producing one block for both the primary node and the domain nodes, where the primary node can
1511/// use the `produce_block_with_xxx` function (i.e. `produce_block_with_slot`) to produce block
1512#[macro_export]
1513macro_rules! produce_block_with {
1514    ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1515        {
1516            async {
1517                let domain_fut = {
1518                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1519                    futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1520                    $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1521                    futures::future::join_all(futs)
1522                };
1523                $primary_node_produce_block.await?;
1524                domain_fut.await;
1525                Ok::<(), Box<dyn std::error::Error>>(())
1526            }
1527        }
1528    };
1529}
1530
1531/// Keep producing block with a fixed interval until the given condition become `true`
1532#[macro_export]
1533macro_rules! produce_blocks_until {
1534    ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1535        async {
1536            while !$condition {
1537                produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1538                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1539            }
1540            Ok::<(), Box<dyn std::error::Error>>(())
1541        }
1542    };
1543}
1544
1545type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1546
1547fn get_signed_extra(
1548    current_block: u64,
1549    immortal: bool,
1550    nonce: u32,
1551    tip: BalanceOf<Runtime>,
1552) -> SignedExtra {
1553    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1554        .checked_next_power_of_two()
1555        .map(|c| c / 2)
1556        .unwrap_or(2);
1557    (
1558        frame_system::CheckNonZeroSender::<Runtime>::new(),
1559        frame_system::CheckSpecVersion::<Runtime>::new(),
1560        frame_system::CheckTxVersion::<Runtime>::new(),
1561        frame_system::CheckGenesis::<Runtime>::new(),
1562        frame_system::CheckMortality::<Runtime>::from(if immortal {
1563            generic::Era::Immortal
1564        } else {
1565            generic::Era::mortal(period, current_block)
1566        }),
1567        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1568        frame_system::CheckWeight::<Runtime>::new(),
1569        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1570        BalanceTransferCheckExtension::<Runtime>::default(),
1571        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1572        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1573        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1574    )
1575}
1576
1577fn construct_extrinsic_raw_payload<Client>(
1578    client: impl AsRef<Client>,
1579    function: <Runtime as frame_system::Config>::RuntimeCall,
1580    immortal: bool,
1581    nonce: u32,
1582    tip: BalanceOf<Runtime>,
1583) -> (
1584    SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1585    SignedExtra,
1586)
1587where
1588    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1589    u64: From<BlockNumberFor<Runtime>>,
1590    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1591{
1592    let current_block_hash = client.as_ref().info().best_hash;
1593    let current_block = client.as_ref().info().best_number.saturated_into();
1594    let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1595    let extra = get_signed_extra(current_block, immortal, nonce, tip);
1596    (
1597        generic::SignedPayload::<
1598            <Runtime as frame_system::Config>::RuntimeCall,
1599            SignedExtra,
1600        >::from_raw(
1601            function,
1602            extra.clone(),
1603            ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1604        ),
1605        extra,
1606    )
1607}
1608
1609/// Construct an extrinsic that can be applied to the test runtime.
1610pub fn construct_extrinsic_generic<Client>(
1611    client: impl AsRef<Client>,
1612    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1613    caller: Sr25519Keyring,
1614    immortal: bool,
1615    nonce: u32,
1616    tip: BalanceOf<Runtime>,
1617) -> UncheckedExtrinsic
1618where
1619    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1620    u64: From<BlockNumberFor<Runtime>>,
1621    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1622{
1623    let function = function.into();
1624    let (raw_payload, extra) =
1625        construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1626    let signature = raw_payload.using_encoded(|e| caller.sign(e));
1627    UncheckedExtrinsic::new_signed(
1628        function,
1629        MultiAddress::Id(caller.to_account_id()),
1630        Signature::Sr25519(signature),
1631        extra,
1632    )
1633}
1634
1635/// Construct a general unsigned extrinsic that can be applied to the test runtime.
1636fn construct_unsigned_extrinsic<Client>(
1637    client: impl AsRef<Client>,
1638    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1639) -> UncheckedExtrinsic
1640where
1641    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1642    u64: From<BlockNumberFor<Runtime>>,
1643    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1644{
1645    let function = function.into();
1646    let current_block = client.as_ref().info().best_number.saturated_into();
1647    let extra = get_signed_extra(current_block, true, 0, 0);
1648    UncheckedExtrinsic::new_transaction(function, extra)
1649}