Skip to main content

subspace_test_service/
lib.rs

1// Copyright (C) 2021 Subspace Labs, Inc.
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Subspace test service only.
18
19#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34    BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41    NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44    DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45    OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, RpcEndpoint,
46    WasmExecutionMethod, WasmtimeInstantiationStrategy,
47};
48use sc_service::{BasePath, BlocksPruning, Configuration, Role, SpawnTasksParams, TaskManager};
49use sc_transaction_pool::{BasicPool, FullChainApi, Options};
50use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
51use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
52use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
53use sp_api::{ApiExt, ProvideRuntimeApi};
54use sp_blockchain::{HashAndNumber, HeaderBackend};
55use sp_consensus::{BlockOrigin, Error as ConsensusError};
56use sp_consensus_slots::Slot;
57use sp_consensus_subspace::digests::{
58    CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
59};
60use sp_consensus_subspace::{PotExtension, SubspaceApi};
61use sp_core::H256;
62use sp_core::offchain::OffchainDbExt;
63use sp_core::offchain::storage::OffchainDb;
64use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
65use sp_domains::bundle::OpaqueBundle;
66use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
67use sp_domains_fraud_proof::fraud_proof::FraudProof;
68use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
69use sp_externalities::Extensions;
70use sp_inherents::{InherentData, InherentDataProvider};
71use sp_keyring::Sr25519Keyring;
72use sp_messenger::MessengerApi;
73use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
74use sp_mmr_primitives::MmrApi;
75use sp_runtime::generic::{Digest, SignedPayload};
76use sp_runtime::traits::{
77    BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
78};
79use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
80use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
81use sp_timestamp::Timestamp;
82use std::collections::HashMap;
83use std::error::Error;
84use std::marker::PhantomData;
85use std::net::SocketAddr;
86use std::pin::Pin;
87use std::sync::Arc;
88use std::time;
89use std::time::Duration;
90use subspace_core_primitives::pot::PotOutput;
91use subspace_core_primitives::solutions::Solution;
92use subspace_core_primitives::{BlockNumber, PublicKey};
93use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
94use subspace_runtime_primitives::opaque::Block;
95use subspace_runtime_primitives::{
96    AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
97};
98use subspace_service::{FullSelectChain, RuntimeExecutor};
99use subspace_test_client::{Backend, Client, chain_spec};
100use subspace_test_primitives::OnchainStateApi;
101use subspace_test_runtime::{
102    Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
103};
104use substrate_frame_rpc_system::AccountNonceApi;
105use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
106use tokio::time::sleep;
107
108/// Helper type alias
109pub type FraudProofFor<Block, DomainBlock> =
110    FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
111
112const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
113
114/// Create a Subspace `Configuration`.
115///
116/// By default an in-memory socket will be used, therefore you need to provide boot
117/// nodes if you want the future node to be connected to other nodes.
118#[expect(clippy::too_many_arguments)]
119pub fn node_config(
120    tokio_handle: tokio::runtime::Handle,
121    key: Sr25519Keyring,
122    boot_nodes: Vec<MultiaddrWithPeerId>,
123    run_farmer: bool,
124    force_authoring: bool,
125    force_synced: bool,
126    private_evm: bool,
127    evm_owner_account: Option<AccountId>,
128    base_path: BasePath,
129    rpc_addr: Option<SocketAddr>,
130    rpc_port: Option<u16>,
131) -> Configuration {
132    let root = base_path.path();
133    let role = if run_farmer {
134        Role::Authority
135    } else {
136        Role::Full
137    };
138    let key_seed = key.to_seed();
139    let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
140
141    let mut network_config = NetworkConfiguration::new(
142        key_seed.to_string(),
143        "network/test/0.1",
144        Default::default(),
145        None,
146    );
147
148    network_config.boot_nodes = boot_nodes;
149
150    network_config.allow_non_globals_in_dht = true;
151
152    let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
153    network_config.listen_addresses.push(addr.clone());
154
155    network_config.public_addresses.push(addr);
156
157    network_config.transport = TransportConfig::MemoryOnly;
158
159    network_config.force_synced = force_synced;
160
161    let rpc_configuration = match rpc_addr {
162        Some(listen_addr) => {
163            let port = rpc_port.unwrap_or(9944);
164            RpcConfiguration {
165                addr: Some(vec![RpcEndpoint {
166                    batch_config: RpcBatchRequestConfig::Disabled,
167                    max_connections: 100,
168                    listen_addr,
169                    rpc_methods: Default::default(),
170                    rate_limit: None,
171                    rate_limit_trust_proxy_headers: false,
172                    rate_limit_whitelisted_ips: vec![],
173                    max_payload_in_mb: 15,
174                    max_payload_out_mb: 15,
175                    max_subscriptions_per_connection: 100,
176                    max_buffer_capacity_per_connection: 100,
177                    cors: None,
178                    retry_random_port: true,
179                    is_optional: false,
180                }]),
181                max_request_size: 15,
182                max_response_size: 15,
183                id_provider: None,
184                max_subs_per_conn: 1024,
185                port,
186                message_buffer_capacity: 1024,
187                batch_config: RpcBatchRequestConfig::Disabled,
188                max_connections: 1000,
189                cors: None,
190                methods: Default::default(),
191                rate_limit: None,
192                rate_limit_whitelisted_ips: vec![],
193                rate_limit_trust_proxy_headers: false,
194                request_logger_limit: 1024,
195            }
196        }
197        None => RpcConfiguration {
198            addr: None,
199            max_request_size: 0,
200            max_response_size: 0,
201            id_provider: None,
202            max_subs_per_conn: 0,
203            port: 0,
204            message_buffer_capacity: 0,
205            batch_config: RpcBatchRequestConfig::Disabled,
206            max_connections: 0,
207            cors: None,
208            methods: Default::default(),
209            rate_limit: None,
210            rate_limit_whitelisted_ips: vec![],
211            rate_limit_trust_proxy_headers: false,
212            request_logger_limit: 1024,
213        },
214    };
215
216    Configuration {
217        impl_name: "subspace-test-node".to_string(),
218        impl_version: "0.1".to_string(),
219        role,
220        tokio_handle,
221        transaction_pool: Default::default(),
222        network: network_config,
223        keystore: KeystoreConfig::InMemory,
224        database: DatabaseSource::ParityDb {
225            path: root.join("paritydb"),
226        },
227        trie_cache_maximum_size: Some(64 * 1024 * 1024),
228        state_pruning: Default::default(),
229        blocks_pruning: BlocksPruning::KeepAll,
230        chain_spec: Box::new(spec),
231        executor: ExecutorConfiguration {
232            wasm_method: WasmExecutionMethod::Compiled {
233                instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
234            },
235            max_runtime_instances: 8,
236            default_heap_pages: None,
237            runtime_cache_size: 2,
238        },
239        wasm_runtime_overrides: Default::default(),
240        rpc: rpc_configuration,
241        prometheus_config: None,
242        telemetry_endpoints: None,
243        offchain_worker: OffchainWorkerConfig {
244            enabled: false,
245            indexing_enabled: true,
246        },
247        force_authoring,
248        disable_grandpa: false,
249        dev_key_seed: Some(key_seed),
250        tracing_targets: None,
251        tracing_receiver: Default::default(),
252        announce_block: true,
253        data_path: base_path.path().into(),
254        base_path,
255        warm_up_trie_cache: None,
256    }
257}
258
259type StorageChanges = sp_api::StorageChanges<Block>;
260
261struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
262    consensus_client: Arc<Client>,
263    consensus_backend: Arc<CBackend>,
264    executor: Arc<Executor>,
265    mock_pot_verifier: Arc<MockPotVerfier>,
266    confirmation_depth_k: BlockNumber,
267    _phantom: PhantomData<DomainBlock>,
268}
269
270impl<Client, DomainBlock, Executor, CBackend>
271    MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
272{
273    fn new(
274        consensus_client: Arc<Client>,
275        executor: Arc<Executor>,
276        mock_pot_verifier: Arc<MockPotVerfier>,
277        consensus_backend: Arc<CBackend>,
278        confirmation_depth_k: BlockNumber,
279    ) -> Self {
280        Self {
281            consensus_client,
282            consensus_backend,
283            executor,
284            mock_pot_verifier,
285            confirmation_depth_k,
286            _phantom: Default::default(),
287        }
288    }
289}
290
291#[derive(Default)]
292struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
293
294impl MockPotVerfier {
295    fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
296        self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
297    }
298
299    fn inject_pot(&self, slot: u64, pot: PotOutput) {
300        self.0.lock().insert(slot, pot);
301    }
302}
303
304impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
305    for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
306where
307    Block: BlockT,
308    Block::Hash: From<H256> + Into<H256>,
309    DomainBlock: BlockT,
310    DomainBlock::Hash: Into<H256> + From<H256>,
311    Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
312    Client::Api: DomainsApi<Block, DomainBlock::Header>
313        + BundleProducerElectionApi<Block, Balance>
314        + MessengerApi<Block, NumberFor<Block>, Block::Hash>
315        + MmrApi<Block, H256, NumberFor<Block>>,
316    Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
317    CBackend: BackendT<Block> + 'static,
318{
319    fn extensions_for(
320        &self,
321        _block_hash: Block::Hash,
322        _block_number: NumberFor<Block>,
323    ) -> Extensions {
324        let confirmation_depth_k = self.confirmation_depth_k;
325        let mut exts = Extensions::new();
326        exts.register(FraudProofExtension::new(Arc::new(
327            FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
328                self.consensus_client.clone(),
329                self.executor.clone(),
330                move |client, executor| {
331                    let extension_factory =
332                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
333                            client,
334                            executor,
335                            confirmation_depth_k,
336                        );
337                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
338                },
339            ),
340        )));
341        exts.register(SubspaceMmrExtension::new(Arc::new(
342            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
343                self.consensus_client.clone(),
344                confirmation_depth_k,
345            ),
346        )));
347        exts.register(MessengerExtension::new(Arc::new(
348            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
349                self.consensus_client.clone(),
350                self.executor.clone(),
351            ),
352        )));
353
354        if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
355            let offchain_db = OffchainDb::new(offchain_storage);
356            exts.register(OffchainDbExt::new(offchain_db));
357        }
358        exts.register(PotExtension::new({
359            let client = Arc::clone(&self.consensus_client);
360            let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
361            Box::new(
362                move |parent_hash, slot, proof_of_time, _quick_verification| {
363                    let parent_hash = {
364                        let mut converted_parent_hash = Block::Hash::default();
365                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
366                        converted_parent_hash
367                    };
368
369                    let parent_header = match client.header(parent_hash) {
370                        Ok(Some(parent_header)) => parent_header,
371                        _ => return false,
372                    };
373                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
374                        Ok(parent_pre_digest) => parent_pre_digest,
375                        _ => return false,
376                    };
377
378                    let parent_slot = parent_pre_digest.slot();
379                    if slot <= *parent_slot {
380                        return false;
381                    }
382
383                    mock_pot_verifier.is_valid(slot, proof_of_time)
384                },
385            )
386        }));
387        exts
388    }
389}
390
391type NewSlot = (Slot, PotOutput);
392
393/// A mock Subspace consensus node instance used for testing.
394pub struct MockConsensusNode {
395    /// `TaskManager`'s instance.
396    pub task_manager: TaskManager,
397    /// Client's instance.
398    pub client: Arc<Client>,
399    /// Backend.
400    pub backend: Arc<Backend>,
401    /// Code executor.
402    pub executor: RuntimeExecutor,
403    /// Transaction pool.
404    pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
405    /// The SelectChain Strategy
406    pub select_chain: FullSelectChain,
407    /// Network service.
408    pub network_service: Arc<dyn NetworkService + Send + Sync>,
409    /// Cross-domain gossip notification service.
410    pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
411    /// Sync service.
412    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
413    /// RPC handlers.
414    pub rpc_handlers: sc_service::RpcHandlers,
415    /// The next slot number
416    next_slot: u64,
417    /// The mock pot verifier
418    mock_pot_verifier: Arc<MockPotVerfier>,
419    /// The slot notification subscribers
420    new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
421    /// The acknowledgement sender subscribers
422    acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
423    /// Block import pipeline
424    block_import: MockBlockImport<Client, Block>,
425    xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
426    /// Mock subspace solution used to mock the subspace `PreDigest`
427    mock_solution: Solution<AccountId>,
428    log_prefix: &'static str,
429    /// Ferdie key
430    pub key: Sr25519Keyring,
431    finalize_block_depth: Option<NumberFor<Block>>,
432    /// The node's base path
433    base_path: BasePath,
434}
435
436/// Configuration values required to run a mock consensus node with custom RPC options.
437pub struct MockConsensusNodeRpcConfig {
438    /// The node's base path.
439    pub base_path: BasePath,
440    /// Optional block finalization depth override.
441    pub finalize_block_depth: Option<NumberFor<Block>>,
442    /// Whether to enable the private EVM domain.
443    pub private_evm: bool,
444    /// Optional EVM owner key.
445    pub evm_owner: Option<Sr25519Keyring>,
446    /// Optional RPC listen address override.
447    pub rpc_addr: Option<SocketAddr>,
448    /// Optional RPC listen port override.
449    pub rpc_port: Option<u16>,
450}
451
452impl MockConsensusNode {
453    #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
454    fn run_with_configuration(
455        mut config: Configuration,
456        key: Sr25519Keyring,
457        base_path: BasePath,
458        finalize_block_depth: Option<NumberFor<Block>>,
459        rpc_builder: Box<
460            dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
461        >,
462    ) -> MockConsensusNode {
463        let log_prefix = key.into();
464
465        let tx_pool_options = Options {
466            ban_time: Duration::from_millis(0),
467            ..Default::default()
468        };
469
470        config.network.node_name = format!("{} (Consensus)", config.network.node_name);
471        let span = sc_tracing::tracing::info_span!(
472            sc_tracing::logging::PREFIX_LOG_SPAN,
473            name = config.network.node_name.as_str()
474        );
475        let _enter = span.enter();
476
477        let executor = sc_service::new_wasm_executor(&config.executor);
478
479        let (client, backend, keystore_container, mut task_manager) =
480            sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
481                .expect("Fail to new full parts");
482
483        let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
484        let client = Arc::new(client);
485        let mock_pot_verifier = Arc::new(MockPotVerfier::default());
486        let chain_constants = client
487            .runtime_api()
488            .chain_constants(client.info().best_hash)
489            .expect("Fail to get chain constants");
490        client
491            .execution_extensions()
492            .set_extensions_factory(MockExtensionsFactory::<
493                _,
494                DomainBlock,
495                sc_domains::RuntimeExecutor,
496                _,
497            >::new(
498                client.clone(),
499                domain_executor.clone(),
500                Arc::clone(&mock_pot_verifier),
501                backend.clone(),
502                chain_constants.confirmation_depth_k(),
503            ));
504
505        let select_chain = sc_consensus::LongestChain::new(backend.clone());
506        let transaction_pool = Arc::from(BasicPool::new_full(
507            tx_pool_options,
508            config.role.is_authority().into(),
509            config.prometheus_registry(),
510            task_manager.spawn_essential_handle(),
511            client.clone(),
512        ));
513
514        let block_import = MockBlockImport::<_, _>::new(client.clone());
515
516        let mut net_config = sc_network::config::FullNetworkConfiguration::<
517            _,
518            _,
519            NetworkWorker<_, _>,
520        >::new(&config.network, None);
521        let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
522            xdm_gossip_peers_set_config();
523        net_config.add_notification_protocol(xdm_gossip_notification_config);
524
525        let (network_service, system_rpc_tx, tx_handler_controller, sync_service) =
526            sc_service::build_network(sc_service::BuildNetworkParams {
527                config: &config,
528                net_config,
529                client: client.clone(),
530                transaction_pool: transaction_pool.clone(),
531                spawn_handle: task_manager.spawn_handle(),
532                import_queue: mock_import_queue(
533                    block_import.clone(),
534                    &task_manager.spawn_essential_handle(),
535                ),
536                block_announce_validator_builder: None,
537                warp_sync_config: None,
538                block_relay: None,
539                metrics: NotificationMetrics::new(None),
540            })
541            .expect("Should be able to build network");
542
543        let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
544            network: network_service.clone(),
545            client: client.clone(),
546            keystore: keystore_container.keystore(),
547            task_manager: &mut task_manager,
548            transaction_pool: transaction_pool.clone(),
549            rpc_builder: Box::new(move |_| rpc_builder()),
550            backend: backend.clone(),
551            system_rpc_tx,
552            config,
553            telemetry: None,
554            tx_handler_controller,
555            sync_service: sync_service.clone(),
556            tracing_execute_block: None,
557        })
558        .expect("Should be able to spawn tasks");
559
560        let mock_solution =
561            Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
562
563        let mut gossip_builder = GossipWorkerBuilder::new();
564
565        task_manager
566            .spawn_essential_handle()
567            .spawn_essential_blocking(
568                "consensus-chain-channel-update-worker",
569                None,
570                Box::pin(
571                    domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
572                        ChainId::Consensus,
573                        client.clone(),
574                        sync_service.clone(),
575                        gossip_builder.gossip_msg_sink(),
576                    ),
577                ),
578            );
579
580        let (consensus_msg_sink, consensus_msg_receiver) =
581            tracing_unbounded("consensus_message_channel", 100);
582
583        let consensus_listener =
584            cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
585                ChainId::Consensus,
586                client.clone(),
587                client.clone(),
588                transaction_pool.clone(),
589                network_service.clone(),
590                consensus_msg_receiver,
591                domain_executor,
592                sync_service.clone(),
593            );
594
595        task_manager
596            .spawn_essential_handle()
597            .spawn_essential_blocking(
598                "consensus-message-listener",
599                None,
600                Box::pin(consensus_listener),
601            );
602
603        gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
604
605        task_manager.spawn_essential_handle().spawn_blocking(
606            "mmr-gadget",
607            None,
608            mmr_gadget::MmrGadget::start(
609                client.clone(),
610                backend.clone(),
611                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
612            ),
613        );
614
615        MockConsensusNode {
616            task_manager,
617            client,
618            backend,
619            executor,
620            transaction_pool,
621            select_chain,
622            network_service,
623            xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
624            sync_service,
625            rpc_handlers,
626            next_slot: 1,
627            mock_pot_verifier,
628            new_slot_notification_subscribers: Vec::new(),
629            acknowledgement_sender_subscribers: Vec::new(),
630            block_import,
631            xdm_gossip_worker_builder: Some(gossip_builder),
632            mock_solution,
633            log_prefix,
634            key,
635            finalize_block_depth,
636            base_path,
637        }
638    }
639
640    /// Run a mock consensus node
641    pub fn run(
642        tokio_handle: tokio::runtime::Handle,
643        key: Sr25519Keyring,
644        base_path: BasePath,
645    ) -> MockConsensusNode {
646        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
647    }
648
649    /// Run a mock consensus node with a private EVM domain
650    pub fn run_with_private_evm(
651        tokio_handle: tokio::runtime::Handle,
652        key: Sr25519Keyring,
653        evm_owner: Option<Sr25519Keyring>,
654        base_path: BasePath,
655    ) -> MockConsensusNode {
656        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
657    }
658
659    /// Run a mock consensus node with finalization depth
660    #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
661    pub fn run_with_finalization_depth(
662        tokio_handle: tokio::runtime::Handle,
663        key: Sr25519Keyring,
664        base_path: BasePath,
665        finalize_block_depth: Option<NumberFor<Block>>,
666        private_evm: bool,
667        evm_owner: Option<Sr25519Keyring>,
668    ) -> MockConsensusNode {
669        let rpc_config = MockConsensusNodeRpcConfig {
670            base_path,
671            finalize_block_depth,
672            private_evm,
673            evm_owner,
674            rpc_addr: None,
675            rpc_port: None,
676        };
677
678        Self::run_with_rpc_builder(
679            tokio_handle,
680            key,
681            rpc_config,
682            Box::new(|| Ok(RpcModule::new(()))),
683        )
684    }
685
686    /// Run a mock consensus node with a custom RPC builder and RPC address/port options.
687    pub fn run_with_rpc_builder(
688        tokio_handle: tokio::runtime::Handle,
689        key: Sr25519Keyring,
690        rpc_config: MockConsensusNodeRpcConfig,
691        rpc_builder: Box<
692            dyn Fn() -> Result<RpcModule<()>, sc_service::Error> + Send + Sync + 'static,
693        >,
694    ) -> MockConsensusNode {
695        let MockConsensusNodeRpcConfig {
696            base_path,
697            finalize_block_depth,
698            private_evm,
699            evm_owner,
700            rpc_addr,
701            rpc_port,
702        } = rpc_config;
703
704        let config = node_config(
705            tokio_handle,
706            key,
707            vec![],
708            false,
709            false,
710            false,
711            private_evm,
712            evm_owner.map(|key| key.to_account_id()),
713            base_path.clone(),
714            rpc_addr,
715            rpc_port,
716        );
717
718        Self::run_with_configuration(config, key, base_path, finalize_block_depth, rpc_builder)
719    }
720
721    /// Run a mock consensus node with RPC options using the default empty RPC module.
722    #[expect(clippy::result_large_err, reason = "Comes from Substrate")]
723    pub fn run_with_rpc_options(
724        tokio_handle: tokio::runtime::Handle,
725        key: Sr25519Keyring,
726        rpc_config: MockConsensusNodeRpcConfig,
727    ) -> MockConsensusNode {
728        Self::run_with_rpc_builder(
729            tokio_handle,
730            key,
731            rpc_config,
732            Box::new(|| Ok(RpcModule::new(()))),
733        )
734    }
735
736    /// Get the cross domain gossip message worker builder
737    pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
738        self.xdm_gossip_worker_builder
739            .as_mut()
740            .expect("gossip message worker have not started yet")
741    }
742
743    /// Start the cross domain gossip message worker.
744    pub fn start_cross_domain_gossip_message_worker(&mut self) {
745        let xdm_gossip_worker_builder = self
746            .xdm_gossip_worker_builder
747            .take()
748            .expect("gossip message worker have not started yet");
749        let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
750            self.network_service.clone(),
751            self.xdm_gossip_notification_service
752                .take()
753                .expect("XDM gossip notification service must be used only once"),
754            self.sync_service.clone(),
755        );
756        self.task_manager
757            .spawn_essential_handle()
758            .spawn_essential_blocking(
759                "cross-domain-gossip-message-worker",
760                None,
761                Box::pin(cross_domain_message_gossip_worker.run()),
762            );
763    }
764
765    /// Return the next slot number
766    pub fn next_slot(&self) -> u64 {
767        self.next_slot
768    }
769
770    /// Set the next slot number
771    pub fn set_next_slot(&mut self, next_slot: u64) {
772        self.next_slot = next_slot;
773    }
774
775    /// Produce a slot only, without waiting for the potential slot handlers.
776    pub fn produce_slot(&mut self) -> NewSlot {
777        let slot = Slot::from(self.next_slot);
778        let proof_of_time = PotOutput::from(
779            <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
780                .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
781        );
782        self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
783        self.next_slot += 1;
784
785        (slot, proof_of_time)
786    }
787
788    /// Notify the executor about the new slot and wait for the bundle produced at this slot.
789    pub async fn notify_new_slot_and_wait_for_bundle(
790        &mut self,
791        new_slot: NewSlot,
792    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
793        self.new_slot_notification_subscribers
794            .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
795
796        self.confirm_acknowledgement().await;
797        self.get_bundle_from_tx_pool(new_slot)
798    }
799
800    /// Produce a new slot and wait for a bundle produced at this slot.
801    pub async fn produce_slot_and_wait_for_bundle_submission(
802        &mut self,
803    ) -> (
804        NewSlot,
805        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
806    ) {
807        let slot = self.produce_slot();
808        for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
809            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
810                return (slot, bundle);
811            }
812        }
813        panic!(
814            "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
815        );
816    }
817
818    /// Produce a slot and wait for bundle submission from specific operator.
819    pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
820        &mut self,
821        operator_id: OperatorId,
822    ) -> (
823        NewSlot,
824        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
825    ) {
826        loop {
827            let slot = self.produce_slot();
828            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
829                && bundle.sealed_header().proof_of_election().operator_id == operator_id
830            {
831                return (slot, bundle);
832            }
833        }
834    }
835
836    /// Subscribe the new slot notification
837    pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
838        let (tx, rx) = mpsc::unbounded();
839        self.new_slot_notification_subscribers.push(tx);
840        rx
841    }
842
843    /// Subscribe the acknowledgement sender stream
844    pub fn new_acknowledgement_sender_stream(
845        &mut self,
846    ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
847        let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
848        self.acknowledgement_sender_subscribers.push(tx);
849        rx
850    }
851
852    /// Wait for all the acknowledgements before return
853    ///
854    /// It is used to wait for the acknowledgement of the domain worker to ensure it have
855    /// finish all the previous tasks before return
856    pub async fn confirm_acknowledgement(&mut self) {
857        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
858
859        // Must drop `acknowledgement_sender` after the notification otherwise the receiver
860        // will block forever as there is still a sender not closed.
861        {
862            self.acknowledgement_sender_subscribers
863                .retain(|subscriber| {
864                    subscriber
865                        .unbounded_send(acknowledgement_sender.clone())
866                        .is_ok()
867                });
868            drop(acknowledgement_sender);
869        }
870
871        // Wait for all the acknowledgements to progress and proactively drop closed subscribers.
872        while acknowledgement_receiver.next().await.is_some() {
873            // Wait for all the acknowledgements to finish.
874        }
875    }
876
877    /// Wait for the operator finish processing the consensus block before return
878    pub async fn confirm_block_import_processed(&mut self) {
879        // Send one more notification to ensure the previous consensus block import notification
880        // have been received by the operator
881        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
882        {
883            // Must drop `block_import_acknowledgement_sender` after the notification otherwise
884            // the receiver will block forever as there is still a sender not closed.
885            // NOTE: it is okay to use the default block number since it is ignored in the consumer side.
886            let value = (NumberFor::<Block>::default(), acknowledgement_sender);
887            self.block_import
888                .block_importing_notification_subscribers
889                .lock()
890                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
891        }
892        while acknowledgement_receiver.next().await.is_some() {
893            // Wait for all the acknowledgements to finish.
894        }
895
896        // Ensure the operator finish processing the consensus block
897        self.confirm_acknowledgement().await;
898    }
899
900    /// Subscribe the block importing notification
901    pub fn block_importing_notification_stream(
902        &self,
903    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
904        self.block_import.block_importing_notification_stream()
905    }
906
907    /// Get the bundle that created at `slot` from the transaction pool
908    pub fn get_bundle_from_tx_pool(
909        &self,
910        new_slot: NewSlot,
911    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
912        for ready_tx in self.transaction_pool.ready() {
913            let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
914                .expect("should be able to decode");
915            if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
916                ext.function
917                && opaque_bundle.sealed_header().slot_number() == *new_slot.0
918            {
919                return Some(opaque_bundle);
920            }
921        }
922        None
923    }
924
925    /// Submit a tx to the tx pool
926    pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
927        self.transaction_pool
928            .submit_one(
929                self.client.info().best_hash,
930                TransactionSource::External,
931                tx,
932            )
933            .await
934            .map_err(|err| {
935                err.into_pool_error()
936                    .expect("should always be a pool error")
937            })
938    }
939
940    /// Remove all tx from the tx pool
941    pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
942        let txs: Vec<_> = self
943            .transaction_pool
944            .ready()
945            .map(|t| self.transaction_pool.hash_of(&t.data))
946            .collect();
947        self.prune_txs_from_pool(txs.as_slice()).await
948    }
949
950    /// Remove a ready transaction from transaction pool.
951    pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
952        self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
953            .await
954    }
955
956    async fn prune_txs_from_pool(
957        &self,
958        tx_hashes: &[BlockHashFor<Block>],
959    ) -> Result<(), Box<dyn Error>> {
960        let hash_and_number = HashAndNumber {
961            number: self.client.info().best_number,
962            hash: self.client.info().best_hash,
963        };
964        self.transaction_pool
965            .pool()
966            .prune_known(&hash_and_number, tx_hashes);
967        // `ban_time` have set to 0, explicitly wait 1ms here to ensure `clear_stale` will remove
968        // all the bans as the ban time must be passed.
969        tokio::time::sleep(time::Duration::from_millis(1)).await;
970        self.transaction_pool
971            .pool()
972            .validated_pool()
973            .clear_stale(&hash_and_number);
974        Ok(())
975    }
976
977    /// Return if the given ER exist in the consensus state
978    pub fn does_receipt_exist(
979        &self,
980        er_hash: BlockHashFor<DomainBlock>,
981    ) -> Result<bool, Box<dyn Error>> {
982        Ok(self
983            .client
984            .runtime_api()
985            .execution_receipt(self.client.info().best_hash, er_hash)?
986            .is_some())
987    }
988
989    /// Wait for a specific execution receipt to appear in the consensus state.
990    ///
991    /// Polls by producing blocks with bundles (the same primitive that
992    /// `produce_blocks_until!` uses internally) until
993    /// `does_receipt_exist(receipt_hash)` returns `Ok(true)`, or the timeout
994    /// expires. On timeout, returns `Err`. Tests expecting deterministic
995    /// receipt arrival should use this in place of a bare
996    /// `assert!(ferdie.does_receipt_exist(...).unwrap())`, which races with
997    /// the underlying block-production pipeline.
998    pub async fn wait_for_receipt(
999        &mut self,
1000        receipt_hash: BlockHashFor<DomainBlock>,
1001        timeout: Duration,
1002    ) -> Result<(), Box<dyn Error>> {
1003        let deadline = std::time::Instant::now() + timeout;
1004        while std::time::Instant::now() < deadline {
1005            if self.does_receipt_exist(receipt_hash)? {
1006                return Ok(());
1007            }
1008            // Advance the consensus chain by producing one block with bundles,
1009            // then re-check. This mirrors what `produce_blocks_until!` does:
1010            // the receipt lands in consensus state via a bundle carried in a
1011            // consensus block.
1012            self.produce_blocks_with_bundles(1).await?;
1013        }
1014        Err(
1015            format!("wait_for_receipt timed out after {timeout:?} for receipt {receipt_hash:?}")
1016                .into(),
1017        )
1018    }
1019
1020    /// Returns the stake summary of the Domain.
1021    pub fn get_domain_staking_summary(
1022        &self,
1023        domain_id: DomainId,
1024    ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
1025        Ok(self
1026            .client
1027            .runtime_api()
1028            .domain_stake_summary(self.client.info().best_hash, domain_id)?)
1029    }
1030
1031    /// Returns the domain block pruning depth.
1032    pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
1033        Ok(self
1034            .client
1035            .runtime_api()
1036            .block_pruning_depth(self.client.info().best_hash)?)
1037    }
1038
1039    /// Return a future that only resolve if a fraud proof that the given `fraud_proof_predicate`
1040    /// return true is submitted to the consensus tx pool
1041    pub fn wait_for_fraud_proof<FP>(
1042        &self,
1043        fraud_proof_predicate: FP,
1044    ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
1045    where
1046        FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
1047    {
1048        let tx_pool = self.transaction_pool.clone();
1049        let mut import_tx_stream = self.transaction_pool.import_notification_stream();
1050        Box::pin(async move {
1051            while let Some(ready_tx_hash) = import_tx_stream.next().await {
1052                let ready_tx = tx_pool
1053                    .ready_transaction(&ready_tx_hash)
1054                    .expect("Just get the ready tx hash from import stream; qed");
1055                let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
1056                    &mut ready_tx.data.encode().as_slice(),
1057                )
1058                .expect("Decode tx must success");
1059                if let subspace_test_runtime::RuntimeCall::Domains(
1060                    pallet_domains::Call::submit_fraud_proof { fraud_proof },
1061                ) = ext.function
1062                    && fraud_proof_predicate(&fraud_proof)
1063                {
1064                    return *fraud_proof;
1065                }
1066            }
1067            unreachable!()
1068        })
1069    }
1070
1071    /// Get the free balance of the given account
1072    pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
1073        self.client
1074            .runtime_api()
1075            .free_balance(self.client.info().best_hash, account_id)
1076            .expect("Fail to get account free balance")
1077    }
1078
1079    /// Give the peer at `addr` the minimum reputation, which will ban it.
1080    // TODO: also ban/unban in the DSN
1081    pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
1082        // If unban_peer() has been called on the peer, we need to bump it twice
1083        // to give it the minimal reputation.
1084        self.network_service.report_peer(
1085            addr.peer_id,
1086            ReputationChange::new_fatal("Peer banned by test (1)"),
1087        );
1088        self.network_service.report_peer(
1089            addr.peer_id,
1090            ReputationChange::new_fatal("Peer banned by test (2)"),
1091        );
1092    }
1093
1094    /// Give the peer at `addr` a high reputation, which guarantees it is un-banned it.
1095    pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
1096        // If ReputationChange::new_fatal() has been called on the peer, we need to bump it twice
1097        // to give it a positive reputation.
1098        self.network_service.report_peer(
1099            addr.peer_id,
1100            ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
1101        );
1102        self.network_service.report_peer(
1103            addr.peer_id,
1104            ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
1105        );
1106    }
1107
1108    /// Take and stop the `MockConsensusNode` and delete its database lock file.
1109    ///
1110    /// Stopping and restarting a node can cause weird race conditions, with errors like:
1111    /// "The system cannot find the path specified".
1112    /// If this happens, try increasing the wait time in this method.
1113    pub async fn stop(self) -> Result<(), std::io::Error> {
1114        let lock_file_path = self.base_path.path().join("paritydb").join("lock");
1115        // On Windows, sometimes open files can’t be deleted so `drop` first then delete
1116        std::mem::drop(self);
1117
1118        // Give the node time to cleanup, exit, and release the lock file.
1119        // TODO: fix the underlying issue or wait for the actual shutdown instead
1120        sleep(Duration::from_secs(2)).await;
1121
1122        // The lock file already being deleted is not a fatal test error, so just log it
1123        if let Err(err) = std::fs::remove_file(lock_file_path) {
1124            tracing::error!("deleting paritydb lock file failed: {err:?}");
1125        }
1126        Ok(())
1127    }
1128}
1129
1130impl MockConsensusNode {
1131    async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
1132        self.transaction_pool
1133            .ready_at(parent_hash)
1134            .await
1135            .map(|pending_tx| pending_tx.data().as_ref().clone())
1136            .collect()
1137    }
1138
1139    async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
1140        let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1141            <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1142        ));
1143        let subspace_inherents =
1144            sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1145
1146        let inherent_data = (subspace_inherents, timestamp)
1147            .create_inherent_data()
1148            .await?;
1149
1150        Ok(inherent_data)
1151    }
1152
1153    fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1154        let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1155            slot,
1156            solution: self.mock_solution.clone(),
1157            pot_info: PreDigestPotInfo::V0 {
1158                proof_of_time: Default::default(),
1159                future_proof_of_time: Default::default(),
1160            },
1161        };
1162        let mut digest = Digest::default();
1163        digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1164        digest
1165    }
1166
1167    /// Build block
1168    async fn build_block(
1169        &self,
1170        slot: Slot,
1171        parent_hash: BlockHashFor<Block>,
1172        extrinsics: Vec<ExtrinsicFor<Block>>,
1173    ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1174        let inherent_digest = self.mock_subspace_digest(slot);
1175
1176        let inherent_data = Self::mock_inherent_data(slot).await?;
1177
1178        let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1179            .on_parent_block(parent_hash)
1180            .fetch_parent_block_number(self.client.as_ref())?
1181            .with_inherent_digests(inherent_digest)
1182            .build()
1183            .expect("Creates new block builder");
1184
1185        let inherent_txns = block_builder.create_inherents(inherent_data)?;
1186
1187        for tx in inherent_txns.into_iter().chain(extrinsics) {
1188            if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1189                tracing::error!("Invalid transaction while building block: {}", err);
1190            }
1191        }
1192
1193        let (block, storage_changes, _) = block_builder.build()?.into_inner();
1194        Ok((block, storage_changes))
1195    }
1196
1197    /// Import block
1198    async fn import_block(
1199        &self,
1200        block: Block,
1201        storage_changes: Option<StorageChanges>,
1202    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1203        let (header, body) = block.deconstruct();
1204
1205        let header_hash = header.hash();
1206        let header_number = header.number;
1207
1208        let block_import_params = {
1209            let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1210            import_block.body = Some(body);
1211            import_block.state_action = match storage_changes {
1212                Some(changes) => {
1213                    StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1214                }
1215                None => StateAction::Execute,
1216            };
1217            import_block
1218        };
1219
1220        let import_result = self.block_import.import_block(block_import_params).await?;
1221
1222        if let Some(finalized_block_hash) = self
1223            .finalize_block_depth
1224            .and_then(|depth| header_number.checked_sub(depth))
1225            .and_then(|block_to_finalize| {
1226                self.client
1227                    .hash(block_to_finalize)
1228                    .expect("Block hash not found for number: {block_to_finalize:?}")
1229            })
1230        {
1231            self.client
1232                .finalize_block(finalized_block_hash, None, true)
1233                .unwrap();
1234        }
1235
1236        match import_result {
1237            ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1238            bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1239        }
1240    }
1241
1242    /// Produce a new block with the slot on top of `parent_hash`, with optional
1243    /// specified extrinsic list.
1244    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1245    pub async fn produce_block_with_slot_at(
1246        &mut self,
1247        new_slot: NewSlot,
1248        parent_hash: BlockHashFor<Block>,
1249        maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1250    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1251        let block_timer = time::Instant::now();
1252
1253        let extrinsics = match maybe_extrinsics {
1254            Some(extrinsics) => extrinsics,
1255            None => self.collect_txn_from_pool(parent_hash).await,
1256        };
1257        let tx_hashes: Vec<_> = extrinsics
1258            .iter()
1259            .map(|t| self.transaction_pool.hash_of(t))
1260            .collect();
1261
1262        let (block, storage_changes) = self
1263            .build_block(new_slot.0, parent_hash, extrinsics)
1264            .await?;
1265
1266        log_new_block(&block, block_timer.elapsed().as_millis());
1267
1268        let res = match self.import_block(block, Some(storage_changes)).await {
1269            Ok(hash) => {
1270                // Remove the tx of the imported block from the tx pool, so we don't re-include
1271                // them in future blocks by accident.
1272                self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1273                Ok(hash)
1274            }
1275            err => err,
1276        };
1277        self.confirm_block_import_processed().await;
1278        res
1279    }
1280
1281    /// Produce a new block on top of the current best block, with the extrinsics collected from
1282    /// the transaction pool.
1283    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1284    pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1285        self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1286            .await?;
1287        Ok(())
1288    }
1289
1290    /// Produce a new block on top of the current best block, with the specified extrinsics.
1291    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1292    pub async fn produce_block_with_extrinsics(
1293        &mut self,
1294        extrinsics: Vec<ExtrinsicFor<Block>>,
1295    ) -> Result<(), Box<dyn Error>> {
1296        let slot = self.produce_slot();
1297        self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1298            .await?;
1299        Ok(())
1300    }
1301
1302    /// Produce `n` number of blocks.
1303    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1304    pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1305        for _ in 0..n {
1306            let slot = self.produce_slot();
1307            self.produce_block_with_slot(slot).await?;
1308        }
1309        Ok(())
1310    }
1311
1312    /// Produce `n` number of blocks and wait for bundle submitted to the block.
1313    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1314    pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1315        for _ in 0..n {
1316            let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1317            self.produce_block_with_slot(slot).await?;
1318        }
1319        Ok(())
1320    }
1321
1322    /// Get the nonce of the node account
1323    pub fn account_nonce(&self) -> u32 {
1324        self.client
1325            .runtime_api()
1326            .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1327            .expect("Fail to get account nonce")
1328    }
1329
1330    /// Get the nonce of the given account
1331    pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1332        self.client
1333            .runtime_api()
1334            .account_nonce(self.client.info().best_hash, account_id)
1335            .expect("Fail to get account nonce")
1336    }
1337
1338    /// Construct an extrinsic.
1339    pub fn construct_extrinsic(
1340        &self,
1341        nonce: u32,
1342        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1343    ) -> UncheckedExtrinsic {
1344        construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1345    }
1346
1347    /// Construct an unsigned general extrinsic.
1348    pub fn construct_unsigned_extrinsic(
1349        &self,
1350        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1351    ) -> UncheckedExtrinsic {
1352        construct_unsigned_extrinsic(&self.client, function)
1353    }
1354
1355    /// Construct and send extrinsic through rpc
1356    pub async fn construct_and_send_extrinsic_with(
1357        &self,
1358        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1359    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1360        let nonce = self.account_nonce();
1361        let extrinsic = self.construct_extrinsic(nonce, function);
1362        self.rpc_handlers.send_transaction(extrinsic.into()).await
1363    }
1364
1365    /// Get the nonce of the given account
1366    pub async fn send_extrinsic(
1367        &self,
1368        extrinsic: impl Into<OpaqueExtrinsic>,
1369    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1370        self.rpc_handlers.send_transaction(extrinsic.into()).await
1371    }
1372}
1373
1374fn log_new_block(block: &Block, used_time_ms: u128) {
1375    tracing::info!(
1376        "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1377        block.header().number(),
1378        used_time_ms,
1379        block.header().hash(),
1380        block.header().parent_hash(),
1381        block.extrinsics().len(),
1382        block
1383            .extrinsics()
1384            .iter()
1385            .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1386            .collect::<Vec<_>>()
1387            .join(", ")
1388    );
1389}
1390
1391fn mock_import_queue<Block: BlockT, I>(
1392    block_import: I,
1393    spawner: &impl SpawnEssentialNamed,
1394) -> BasicQueue<Block>
1395where
1396    I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1397{
1398    BasicQueue::new(
1399        MockVerifier::default(),
1400        Box::new(block_import),
1401        None,
1402        spawner,
1403        None,
1404    )
1405}
1406
1407struct MockVerifier<Block> {
1408    _marker: PhantomData<Block>,
1409}
1410
1411impl<Block> Default for MockVerifier<Block> {
1412    fn default() -> Self {
1413        Self {
1414            _marker: PhantomData,
1415        }
1416    }
1417}
1418
1419#[async_trait::async_trait]
1420impl<Block> VerifierT<Block> for MockVerifier<Block>
1421where
1422    Block: BlockT,
1423{
1424    async fn verify(
1425        &self,
1426        block_params: BlockImportParams<Block>,
1427    ) -> Result<BlockImportParams<Block>, String> {
1428        Ok(block_params)
1429    }
1430}
1431
1432// `MockBlockImport` is mostly port from `sc-consensus-subspace::SubspaceBlockImport` with all
1433// the consensus related logic removed.
1434#[allow(clippy::type_complexity)]
1435struct MockBlockImport<Client, Block: BlockT> {
1436    inner: Arc<Client>,
1437    block_importing_notification_subscribers:
1438        Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1439}
1440
1441impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1442    fn new(inner: Arc<Client>) -> Self {
1443        MockBlockImport {
1444            inner,
1445            block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1446        }
1447    }
1448
1449    // Subscribe the block importing notification
1450    fn block_importing_notification_stream(
1451        &self,
1452    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1453        let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1454        self.block_importing_notification_subscribers
1455            .lock()
1456            .push(tx);
1457        rx
1458    }
1459}
1460
1461impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1462    fn clone(&self) -> Self {
1463        MockBlockImport {
1464            inner: self.inner.clone(),
1465            block_importing_notification_subscribers: self
1466                .block_importing_notification_subscribers
1467                .clone(),
1468        }
1469    }
1470}
1471
1472#[async_trait::async_trait]
1473impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1474where
1475    Block: BlockT,
1476    for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1477    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1478    Client::Api: ApiExt<Block>,
1479{
1480    type Error = ConsensusError;
1481
1482    async fn import_block(
1483        &self,
1484        mut block: BlockImportParams<Block>,
1485    ) -> Result<ImportResult, Self::Error> {
1486        let block_number = *block.header.number();
1487        block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1488
1489        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1490
1491        // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver
1492        // will block forever as there is still a sender not closed.
1493        {
1494            let value = (block_number, acknowledgement_sender);
1495            self.block_importing_notification_subscribers
1496                .lock()
1497                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1498        }
1499
1500        while acknowledgement_receiver.next().await.is_some() {
1501            // Wait for all the acknowledgements to finish.
1502        }
1503
1504        self.inner.import_block(block).await
1505    }
1506
1507    async fn check_block(
1508        &self,
1509        block: BlockCheckParams<Block>,
1510    ) -> Result<ImportResult, Self::Error> {
1511        self.inner.check_block(block).await
1512    }
1513}
1514
1515/// Produce the given number of blocks for both the primary node and the domain nodes
1516#[macro_export]
1517macro_rules! produce_blocks {
1518    ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1519        {
1520            async {
1521                let domain_fut = {
1522                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1523                    futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1524                    $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1525                    futures::future::join_all(futs)
1526                };
1527                $primary_node.produce_blocks_with_bundles($count).await?;
1528                domain_fut.await;
1529                Ok::<(), Box<dyn std::error::Error>>(())
1530            }
1531        }
1532    };
1533}
1534
1535/// Producing one block for both the primary node and the domain nodes, where the primary node can
1536/// use the `produce_block_with_xxx` function (i.e. `produce_block_with_slot`) to produce block
1537#[macro_export]
1538macro_rules! produce_block_with {
1539    ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1540        {
1541            async {
1542                let domain_fut = {
1543                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1544                    futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1545                    $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1546                    futures::future::join_all(futs)
1547                };
1548                $primary_node_produce_block.await?;
1549                domain_fut.await;
1550                Ok::<(), Box<dyn std::error::Error>>(())
1551            }
1552        }
1553    };
1554}
1555
1556/// Keep producing block with a fixed interval until the given condition become `true`
1557#[macro_export]
1558macro_rules! produce_blocks_until {
1559    ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1560        async {
1561            while !$condition {
1562                produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1563                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1564            }
1565            Ok::<(), Box<dyn std::error::Error>>(())
1566        }
1567    };
1568}
1569
1570type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1571
1572fn get_signed_extra(
1573    current_block: u64,
1574    immortal: bool,
1575    nonce: u32,
1576    tip: BalanceOf<Runtime>,
1577) -> SignedExtra {
1578    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1579        .checked_next_power_of_two()
1580        .map(|c| c / 2)
1581        .unwrap_or(2);
1582    (
1583        frame_system::CheckNonZeroSender::<Runtime>::new(),
1584        frame_system::CheckSpecVersion::<Runtime>::new(),
1585        frame_system::CheckTxVersion::<Runtime>::new(),
1586        frame_system::CheckGenesis::<Runtime>::new(),
1587        frame_system::CheckMortality::<Runtime>::from(if immortal {
1588            generic::Era::Immortal
1589        } else {
1590            generic::Era::mortal(period, current_block)
1591        }),
1592        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1593        frame_system::CheckWeight::<Runtime>::new(),
1594        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1595        BalanceTransferCheckExtension::<Runtime>::default(),
1596        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1597        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1598        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1599    )
1600}
1601
1602fn construct_extrinsic_raw_payload<Client>(
1603    client: impl AsRef<Client>,
1604    function: <Runtime as frame_system::Config>::RuntimeCall,
1605    immortal: bool,
1606    nonce: u32,
1607    tip: BalanceOf<Runtime>,
1608) -> (
1609    SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1610    SignedExtra,
1611)
1612where
1613    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1614    u64: From<BlockNumberFor<Runtime>>,
1615    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1616{
1617    let current_block_hash = client.as_ref().info().best_hash;
1618    let current_block = client.as_ref().info().best_number.saturated_into();
1619    let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1620    let extra = get_signed_extra(current_block, immortal, nonce, tip);
1621    (
1622        generic::SignedPayload::<
1623            <Runtime as frame_system::Config>::RuntimeCall,
1624            SignedExtra,
1625        >::from_raw(
1626            function,
1627            extra.clone(),
1628            ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1629        ),
1630        extra,
1631    )
1632}
1633
1634/// Construct an extrinsic that can be applied to the test runtime.
1635pub fn construct_extrinsic_generic<Client>(
1636    client: impl AsRef<Client>,
1637    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1638    caller: Sr25519Keyring,
1639    immortal: bool,
1640    nonce: u32,
1641    tip: BalanceOf<Runtime>,
1642) -> UncheckedExtrinsic
1643where
1644    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1645    u64: From<BlockNumberFor<Runtime>>,
1646    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1647{
1648    let function = function.into();
1649    let (raw_payload, extra) =
1650        construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1651    let signature = raw_payload.using_encoded(|e| caller.sign(e));
1652    UncheckedExtrinsic::new_signed(
1653        function,
1654        MultiAddress::Id(caller.to_account_id()),
1655        Signature::Sr25519(signature),
1656        extra,
1657    )
1658}
1659
1660/// Construct a general unsigned extrinsic that can be applied to the test runtime.
1661fn construct_unsigned_extrinsic<Client>(
1662    client: impl AsRef<Client>,
1663    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1664) -> UncheckedExtrinsic
1665where
1666    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1667    u64: From<BlockNumberFor<Runtime>>,
1668    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1669{
1670    let function = function.into();
1671    let current_block = client.as_ref().info().best_number.saturated_into();
1672    let extra = get_signed_extra(current_block, true, 0, 0);
1673    UncheckedExtrinsic::new_transaction(function, extra)
1674}