subspace_test_service/
lib.rs

1// Copyright (C) 2021 Subspace Labs, Inc.
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Subspace test service only.
18
19#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34    BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41    NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44    DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45    OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, WasmExecutionMethod,
46    WasmtimeInstantiationStrategy,
47};
48use sc_service::{
49    BasePath, BlocksPruning, Configuration, NetworkStarter, Role, SpawnTasksParams, TaskManager,
50};
51use sc_transaction_pool::{BasicPool, FullChainApi, Options};
52use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
53use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
54use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
55use sp_api::{ApiExt, ProvideRuntimeApi};
56use sp_blockchain::{HashAndNumber, HeaderBackend};
57use sp_consensus::{BlockOrigin, Error as ConsensusError};
58use sp_consensus_slots::Slot;
59use sp_consensus_subspace::digests::{
60    CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
61};
62use sp_consensus_subspace::{PotExtension, SubspaceApi};
63use sp_core::H256;
64use sp_core::offchain::OffchainDbExt;
65use sp_core::offchain::storage::OffchainDb;
66use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
67use sp_domains::{
68    BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OpaqueBundle, OperatorId,
69};
70use sp_domains_fraud_proof::fraud_proof::FraudProof;
71use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
72use sp_externalities::Extensions;
73use sp_inherents::{InherentData, InherentDataProvider};
74use sp_keyring::Sr25519Keyring;
75use sp_messenger::MessengerApi;
76use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
77use sp_mmr_primitives::MmrApi;
78use sp_runtime::generic::{Digest, SignedPayload};
79use sp_runtime::traits::{
80    BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
81};
82use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
83use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
84use sp_timestamp::Timestamp;
85use std::collections::HashMap;
86use std::error::Error;
87use std::marker::PhantomData;
88use std::pin::Pin;
89use std::sync::Arc;
90use std::time;
91use std::time::Duration;
92use subspace_core_primitives::pot::PotOutput;
93use subspace_core_primitives::solutions::Solution;
94use subspace_core_primitives::{BlockNumber, PublicKey};
95use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
96use subspace_runtime_primitives::opaque::Block;
97use subspace_runtime_primitives::{
98    AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
99};
100use subspace_service::{FullSelectChain, RuntimeExecutor};
101use subspace_test_client::{Backend, Client, chain_spec};
102use subspace_test_primitives::OnchainStateApi;
103use subspace_test_runtime::{
104    Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
105};
106use substrate_frame_rpc_system::AccountNonceApi;
107use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
108use tokio::time::sleep;
109
110/// Helper type alias
111pub type FraudProofFor<Block, DomainBlock> =
112    FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
113
114const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
115
116/// Create a Subspace `Configuration`.
117///
118/// By default an in-memory socket will be used, therefore you need to provide boot
119/// nodes if you want the future node to be connected to other nodes.
120#[expect(clippy::too_many_arguments)]
121pub fn node_config(
122    tokio_handle: tokio::runtime::Handle,
123    key: Sr25519Keyring,
124    boot_nodes: Vec<MultiaddrWithPeerId>,
125    run_farmer: bool,
126    force_authoring: bool,
127    force_synced: bool,
128    private_evm: bool,
129    evm_owner_account: Option<AccountId>,
130    base_path: BasePath,
131) -> Configuration {
132    let root = base_path.path();
133    let role = if run_farmer {
134        Role::Authority
135    } else {
136        Role::Full
137    };
138    let key_seed = key.to_seed();
139    let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
140
141    let mut network_config = NetworkConfiguration::new(
142        key_seed.to_string(),
143        "network/test/0.1",
144        Default::default(),
145        None,
146    );
147
148    network_config.boot_nodes = boot_nodes;
149
150    network_config.allow_non_globals_in_dht = true;
151
152    let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
153    network_config.listen_addresses.push(addr.clone());
154
155    network_config.public_addresses.push(addr);
156
157    network_config.transport = TransportConfig::MemoryOnly;
158
159    network_config.force_synced = force_synced;
160
161    Configuration {
162        impl_name: "subspace-test-node".to_string(),
163        impl_version: "0.1".to_string(),
164        role,
165        tokio_handle,
166        transaction_pool: Default::default(),
167        network: network_config,
168        keystore: KeystoreConfig::InMemory,
169        database: DatabaseSource::ParityDb {
170            path: root.join("paritydb"),
171        },
172        trie_cache_maximum_size: Some(64 * 1024 * 1024),
173        state_pruning: Default::default(),
174        blocks_pruning: BlocksPruning::KeepAll,
175        chain_spec: Box::new(spec),
176        executor: ExecutorConfiguration {
177            wasm_method: WasmExecutionMethod::Compiled {
178                instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
179            },
180            max_runtime_instances: 8,
181            default_heap_pages: None,
182            runtime_cache_size: 2,
183        },
184        wasm_runtime_overrides: Default::default(),
185        rpc: RpcConfiguration {
186            addr: None,
187            max_request_size: 0,
188            max_response_size: 0,
189            id_provider: None,
190            max_subs_per_conn: 0,
191            port: 0,
192            message_buffer_capacity: 0,
193            batch_config: RpcBatchRequestConfig::Disabled,
194            max_connections: 0,
195            cors: None,
196            methods: Default::default(),
197            rate_limit: None,
198            rate_limit_whitelisted_ips: vec![],
199            rate_limit_trust_proxy_headers: false,
200        },
201        prometheus_config: None,
202        telemetry_endpoints: None,
203        offchain_worker: OffchainWorkerConfig {
204            enabled: false,
205            indexing_enabled: true,
206        },
207        force_authoring,
208        disable_grandpa: false,
209        dev_key_seed: Some(key_seed),
210        tracing_targets: None,
211        tracing_receiver: Default::default(),
212        announce_block: true,
213        data_path: base_path.path().into(),
214        base_path,
215    }
216}
217
218type StorageChanges = sp_api::StorageChanges<Block>;
219
220struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
221    consensus_client: Arc<Client>,
222    consensus_backend: Arc<CBackend>,
223    executor: Arc<Executor>,
224    mock_pot_verifier: Arc<MockPotVerfier>,
225    confirmation_depth_k: BlockNumber,
226    _phantom: PhantomData<DomainBlock>,
227}
228
229impl<Client, DomainBlock, Executor, CBackend>
230    MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
231{
232    fn new(
233        consensus_client: Arc<Client>,
234        executor: Arc<Executor>,
235        mock_pot_verifier: Arc<MockPotVerfier>,
236        consensus_backend: Arc<CBackend>,
237        confirmation_depth_k: BlockNumber,
238    ) -> Self {
239        Self {
240            consensus_client,
241            consensus_backend,
242            executor,
243            mock_pot_verifier,
244            confirmation_depth_k,
245            _phantom: Default::default(),
246        }
247    }
248}
249
250#[derive(Default)]
251struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
252
253impl MockPotVerfier {
254    fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
255        self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
256    }
257
258    fn inject_pot(&self, slot: u64, pot: PotOutput) {
259        self.0.lock().insert(slot, pot);
260    }
261}
262
263impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
264    for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
265where
266    Block: BlockT,
267    Block::Hash: From<H256> + Into<H256>,
268    DomainBlock: BlockT,
269    DomainBlock::Hash: Into<H256> + From<H256>,
270    Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
271    Client::Api: DomainsApi<Block, DomainBlock::Header>
272        + BundleProducerElectionApi<Block, Balance>
273        + MessengerApi<Block, NumberFor<Block>, Block::Hash>
274        + MmrApi<Block, H256, NumberFor<Block>>,
275    Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
276    CBackend: BackendT<Block> + 'static,
277{
278    fn extensions_for(
279        &self,
280        _block_hash: Block::Hash,
281        _block_number: NumberFor<Block>,
282    ) -> Extensions {
283        let confirmation_depth_k = self.confirmation_depth_k;
284        let mut exts = Extensions::new();
285        exts.register(FraudProofExtension::new(Arc::new(
286            FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
287                self.consensus_client.clone(),
288                self.executor.clone(),
289                move |client, executor| {
290                    let extension_factory =
291                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
292                            client,
293                            executor,
294                            confirmation_depth_k,
295                        );
296                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
297                },
298            ),
299        )));
300        exts.register(SubspaceMmrExtension::new(Arc::new(
301            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
302                self.consensus_client.clone(),
303                confirmation_depth_k,
304            ),
305        )));
306        exts.register(MessengerExtension::new(Arc::new(
307            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
308                self.consensus_client.clone(),
309                self.executor.clone(),
310            ),
311        )));
312
313        if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
314            let offchain_db = OffchainDb::new(offchain_storage);
315            exts.register(OffchainDbExt::new(offchain_db));
316        }
317        exts.register(PotExtension::new({
318            let client = Arc::clone(&self.consensus_client);
319            let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
320            Box::new(
321                move |parent_hash, slot, proof_of_time, _quick_verification| {
322                    let parent_hash = {
323                        let mut converted_parent_hash = Block::Hash::default();
324                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
325                        converted_parent_hash
326                    };
327
328                    let parent_header = match client.header(parent_hash) {
329                        Ok(Some(parent_header)) => parent_header,
330                        _ => return false,
331                    };
332                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
333                        Ok(parent_pre_digest) => parent_pre_digest,
334                        _ => return false,
335                    };
336
337                    let parent_slot = parent_pre_digest.slot();
338                    if slot <= *parent_slot {
339                        return false;
340                    }
341
342                    mock_pot_verifier.is_valid(slot, proof_of_time)
343                },
344            )
345        }));
346        exts
347    }
348}
349
350type NewSlot = (Slot, PotOutput);
351
352/// A mock Subspace consensus node instance used for testing.
353pub struct MockConsensusNode {
354    /// `TaskManager`'s instance.
355    pub task_manager: TaskManager,
356    /// Client's instance.
357    pub client: Arc<Client>,
358    /// Backend.
359    pub backend: Arc<Backend>,
360    /// Code executor.
361    pub executor: RuntimeExecutor,
362    /// Transaction pool.
363    pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
364    /// The SelectChain Strategy
365    pub select_chain: FullSelectChain,
366    /// Network service.
367    pub network_service: Arc<dyn NetworkService + Send + Sync>,
368    /// Cross-domain gossip notification service.
369    pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
370    /// Sync service.
371    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
372    /// RPC handlers.
373    pub rpc_handlers: sc_service::RpcHandlers,
374    /// Network starter
375    pub network_starter: Option<NetworkStarter>,
376    /// The next slot number
377    next_slot: u64,
378    /// The mock pot verifier
379    mock_pot_verifier: Arc<MockPotVerfier>,
380    /// The slot notification subscribers
381    new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
382    /// The acknowledgement sender subscribers
383    acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
384    /// Block import pipeline
385    block_import: MockBlockImport<Client, Block>,
386    xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
387    /// Mock subspace solution used to mock the subspace `PreDigest`
388    mock_solution: Solution<AccountId>,
389    log_prefix: &'static str,
390    /// Ferdie key
391    pub key: Sr25519Keyring,
392    finalize_block_depth: Option<NumberFor<Block>>,
393    /// The node's base path
394    base_path: BasePath,
395}
396
397impl MockConsensusNode {
398    /// Run a mock consensus node
399    pub fn run(
400        tokio_handle: tokio::runtime::Handle,
401        key: Sr25519Keyring,
402        base_path: BasePath,
403    ) -> MockConsensusNode {
404        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
405    }
406
407    /// Run a mock consensus node with a private EVM domain
408    pub fn run_with_private_evm(
409        tokio_handle: tokio::runtime::Handle,
410        key: Sr25519Keyring,
411        evm_owner: Option<Sr25519Keyring>,
412        base_path: BasePath,
413    ) -> MockConsensusNode {
414        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
415    }
416
417    /// Run a mock consensus node with finalization depth
418    pub fn run_with_finalization_depth(
419        tokio_handle: tokio::runtime::Handle,
420        key: Sr25519Keyring,
421        base_path: BasePath,
422        finalize_block_depth: Option<NumberFor<Block>>,
423        private_evm: bool,
424        evm_owner: Option<Sr25519Keyring>,
425    ) -> MockConsensusNode {
426        let log_prefix = key.into();
427
428        let mut config = node_config(
429            tokio_handle,
430            key,
431            vec![],
432            false,
433            false,
434            false,
435            private_evm,
436            evm_owner.map(|key| key.to_account_id()),
437            base_path.clone(),
438        );
439
440        // Set `transaction_pool.ban_time` to 0 such that duplicated tx will not immediately rejected
441        // by `TemporarilyBanned`
442        // rest of the options are taken from default
443        let tx_pool_options = Options {
444            ban_time: Duration::from_millis(0),
445            ..Default::default()
446        };
447
448        config.network.node_name = format!("{} (Consensus)", config.network.node_name);
449        let span = sc_tracing::tracing::info_span!(
450            sc_tracing::logging::PREFIX_LOG_SPAN,
451            name = config.network.node_name.as_str()
452        );
453        let _enter = span.enter();
454
455        let executor = sc_service::new_wasm_executor(&config.executor);
456
457        let (client, backend, keystore_container, mut task_manager) =
458            sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
459                .expect("Fail to new full parts");
460
461        let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
462        let client = Arc::new(client);
463        let mock_pot_verifier = Arc::new(MockPotVerfier::default());
464        let chain_constants = client
465            .runtime_api()
466            .chain_constants(client.info().best_hash)
467            .expect("Fail to get chain constants");
468        client
469            .execution_extensions()
470            .set_extensions_factory(MockExtensionsFactory::<
471                _,
472                DomainBlock,
473                sc_domains::RuntimeExecutor,
474                _,
475            >::new(
476                client.clone(),
477                domain_executor.clone(),
478                Arc::clone(&mock_pot_verifier),
479                backend.clone(),
480                chain_constants.confirmation_depth_k(),
481            ));
482
483        let select_chain = sc_consensus::LongestChain::new(backend.clone());
484        let transaction_pool = Arc::from(BasicPool::new_full(
485            tx_pool_options,
486            config.role.is_authority().into(),
487            config.prometheus_registry(),
488            task_manager.spawn_essential_handle(),
489            client.clone(),
490        ));
491
492        let block_import = MockBlockImport::<_, _>::new(client.clone());
493
494        let mut net_config = sc_network::config::FullNetworkConfiguration::<
495            _,
496            _,
497            NetworkWorker<_, _>,
498        >::new(&config.network, None);
499        let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
500            xdm_gossip_peers_set_config();
501        net_config.add_notification_protocol(xdm_gossip_notification_config);
502
503        let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
504            sc_service::build_network(sc_service::BuildNetworkParams {
505                config: &config,
506                net_config,
507                client: client.clone(),
508                transaction_pool: transaction_pool.clone(),
509                spawn_handle: task_manager.spawn_handle(),
510                import_queue: mock_import_queue(
511                    block_import.clone(),
512                    &task_manager.spawn_essential_handle(),
513                ),
514                block_announce_validator_builder: None,
515                warp_sync_config: None,
516                block_relay: None,
517                metrics: NotificationMetrics::new(None),
518            })
519            .expect("Should be able to build network");
520
521        let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
522            network: network_service.clone(),
523            client: client.clone(),
524            keystore: keystore_container.keystore(),
525            task_manager: &mut task_manager,
526            transaction_pool: transaction_pool.clone(),
527            rpc_builder: Box::new(|_| Ok(RpcModule::new(()))),
528            backend: backend.clone(),
529            system_rpc_tx,
530            config,
531            telemetry: None,
532            tx_handler_controller,
533            sync_service: sync_service.clone(),
534        })
535        .expect("Should be able to spawn tasks");
536
537        let mock_solution =
538            Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
539
540        let mut gossip_builder = GossipWorkerBuilder::new();
541
542        task_manager
543            .spawn_essential_handle()
544            .spawn_essential_blocking(
545                "consensus-chain-channel-update-worker",
546                None,
547                Box::pin(
548                    domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
549                        ChainId::Consensus,
550                        client.clone(),
551                        sync_service.clone(),
552                        gossip_builder.gossip_msg_sink(),
553                    ),
554                ),
555            );
556
557        let (consensus_msg_sink, consensus_msg_receiver) =
558            tracing_unbounded("consensus_message_channel", 100);
559
560        // Start cross domain message listener for Consensus chain to receive messages from domains in the network
561        let consensus_listener =
562            cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
563                ChainId::Consensus,
564                client.clone(),
565                client.clone(),
566                transaction_pool.clone(),
567                network_service.clone(),
568                consensus_msg_receiver,
569                domain_executor,
570                sync_service.clone(),
571            );
572
573        task_manager
574            .spawn_essential_handle()
575            .spawn_essential_blocking(
576                "consensus-message-listener",
577                None,
578                Box::pin(consensus_listener),
579            );
580
581        gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
582
583        task_manager.spawn_essential_handle().spawn_blocking(
584            "mmr-gadget",
585            None,
586            mmr_gadget::MmrGadget::start(
587                client.clone(),
588                backend.clone(),
589                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
590            ),
591        );
592
593        MockConsensusNode {
594            task_manager,
595            client,
596            backend,
597            executor,
598            transaction_pool,
599            select_chain,
600            network_service,
601            xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
602            sync_service,
603            rpc_handlers,
604            network_starter: Some(network_starter),
605            next_slot: 1,
606            mock_pot_verifier,
607            new_slot_notification_subscribers: Vec::new(),
608            acknowledgement_sender_subscribers: Vec::new(),
609            block_import,
610            xdm_gossip_worker_builder: Some(gossip_builder),
611            mock_solution,
612            log_prefix,
613            key,
614            finalize_block_depth,
615            base_path,
616        }
617    }
618
619    /// Start the mock consensus node network
620    pub fn start_network(&mut self) {
621        self.network_starter
622            .take()
623            .expect("mock consensus node network have not started yet")
624            .start_network();
625    }
626
627    /// Get the cross domain gossip message worker builder
628    pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
629        self.xdm_gossip_worker_builder
630            .as_mut()
631            .expect("gossip message worker have not started yet")
632    }
633
634    /// Start the cross domain gossip message worker.
635    pub fn start_cross_domain_gossip_message_worker(&mut self) {
636        let xdm_gossip_worker_builder = self
637            .xdm_gossip_worker_builder
638            .take()
639            .expect("gossip message worker have not started yet");
640        let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
641            self.network_service.clone(),
642            self.xdm_gossip_notification_service
643                .take()
644                .expect("XDM gossip notification service must be used only once"),
645            self.sync_service.clone(),
646        );
647        self.task_manager
648            .spawn_essential_handle()
649            .spawn_essential_blocking(
650                "cross-domain-gossip-message-worker",
651                None,
652                Box::pin(cross_domain_message_gossip_worker.run()),
653            );
654    }
655
656    /// Return the next slot number
657    pub fn next_slot(&self) -> u64 {
658        self.next_slot
659    }
660
661    /// Set the next slot number
662    pub fn set_next_slot(&mut self, next_slot: u64) {
663        self.next_slot = next_slot;
664    }
665
666    /// Produce a slot only, without waiting for the potential slot handlers.
667    pub fn produce_slot(&mut self) -> NewSlot {
668        let slot = Slot::from(self.next_slot);
669        let proof_of_time = PotOutput::from(
670            <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
671                .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
672        );
673        self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
674        self.next_slot += 1;
675
676        (slot, proof_of_time)
677    }
678
679    /// Notify the executor about the new slot and wait for the bundle produced at this slot.
680    pub async fn notify_new_slot_and_wait_for_bundle(
681        &mut self,
682        new_slot: NewSlot,
683    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
684        self.new_slot_notification_subscribers
685            .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
686
687        self.confirm_acknowledgement().await;
688        self.get_bundle_from_tx_pool(new_slot)
689    }
690
691    /// Produce a new slot and wait for a bundle produced at this slot.
692    pub async fn produce_slot_and_wait_for_bundle_submission(
693        &mut self,
694    ) -> (
695        NewSlot,
696        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
697    ) {
698        let slot = self.produce_slot();
699        for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
700            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
701                return (slot, bundle);
702            }
703        }
704        panic!(
705            "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
706        );
707    }
708
709    /// Produce a slot and wait for bundle submission from specific operator.
710    pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
711        &mut self,
712        operator_id: OperatorId,
713    ) -> (
714        NewSlot,
715        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
716    ) {
717        loop {
718            let slot = self.produce_slot();
719            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
720                && bundle.sealed_header.header.proof_of_election.operator_id == operator_id
721            {
722                return (slot, bundle);
723            }
724        }
725    }
726
727    /// Subscribe the new slot notification
728    pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
729        let (tx, rx) = mpsc::unbounded();
730        self.new_slot_notification_subscribers.push(tx);
731        rx
732    }
733
734    /// Subscribe the acknowledgement sender stream
735    pub fn new_acknowledgement_sender_stream(
736        &mut self,
737    ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
738        let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
739        self.acknowledgement_sender_subscribers.push(tx);
740        rx
741    }
742
743    /// Wait for all the acknowledgements before return
744    ///
745    /// It is used to wait for the acknowledgement of the domain worker to ensure it have
746    /// finish all the previous tasks before return
747    pub async fn confirm_acknowledgement(&mut self) {
748        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
749
750        // Must drop `acknowledgement_sender` after the notification otherwise the receiver
751        // will block forever as there is still a sender not closed.
752        {
753            self.acknowledgement_sender_subscribers
754                .retain(|subscriber| {
755                    subscriber
756                        .unbounded_send(acknowledgement_sender.clone())
757                        .is_ok()
758                });
759            drop(acknowledgement_sender);
760        }
761
762        // Wait for all the acknowledgements to progress and proactively drop closed subscribers.
763        while acknowledgement_receiver.next().await.is_some() {
764            // Wait for all the acknowledgements to finish.
765        }
766    }
767
768    /// Wait for the operator finish processing the consensus block before return
769    pub async fn confirm_block_import_processed(&mut self) {
770        // Send one more notification to ensure the previous consensus block import notificaion
771        // have received by the operator
772        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
773        {
774            // Must drop `block_import_acknowledgement_sender` after the notification otherwise
775            // the receiver will block forever as there is still a sender not closed.
776            // NOTE: it is okay to use the default block number since it is ignored in the consumer side.
777            let value = (NumberFor::<Block>::default(), acknowledgement_sender);
778            self.block_import
779                .block_importing_notification_subscribers
780                .lock()
781                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
782        }
783        while acknowledgement_receiver.next().await.is_some() {
784            // Wait for all the acknowledgements to finish.
785        }
786
787        // Ensure the operator finish processing the consensus block
788        self.confirm_acknowledgement().await;
789    }
790
791    /// Subscribe the block importing notification
792    pub fn block_importing_notification_stream(
793        &self,
794    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
795        self.block_import.block_importing_notification_stream()
796    }
797
798    /// Get the bundle that created at `slot` from the transaction pool
799    pub fn get_bundle_from_tx_pool(
800        &self,
801        new_slot: NewSlot,
802    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
803        for ready_tx in self.transaction_pool.ready() {
804            let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
805                .expect("should be able to decode");
806            if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
807                ext.function
808                && opaque_bundle.sealed_header.slot_number() == *new_slot.0
809            {
810                return Some(opaque_bundle);
811            }
812        }
813        None
814    }
815
816    /// Submit a tx to the tx pool
817    pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
818        self.transaction_pool
819            .submit_one(
820                self.client.info().best_hash,
821                TransactionSource::External,
822                tx,
823            )
824            .await
825            .map_err(|err| {
826                err.into_pool_error()
827                    .expect("should always be a pool error")
828            })
829    }
830
831    /// Remove all tx from the tx pool
832    pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
833        let txs: Vec<_> = self
834            .transaction_pool
835            .ready()
836            .map(|t| self.transaction_pool.hash_of(&t.data))
837            .collect();
838        self.prune_txs_from_pool(txs.as_slice()).await
839    }
840
841    /// Remove a ready transaction from transaction pool.
842    pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
843        self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
844            .await
845    }
846
847    async fn prune_txs_from_pool(
848        &self,
849        tx_hashes: &[BlockHashFor<Block>],
850    ) -> Result<(), Box<dyn Error>> {
851        let hash_and_number = HashAndNumber {
852            number: self.client.info().best_number,
853            hash: self.client.info().best_hash,
854        };
855        self.transaction_pool
856            .pool()
857            .prune_known(&hash_and_number, tx_hashes);
858        // `ban_time` have set to 0, explicitly wait 1ms here to ensure `clear_stale` will remove
859        // all the bans as the ban time must be passed.
860        tokio::time::sleep(time::Duration::from_millis(1)).await;
861        self.transaction_pool
862            .pool()
863            .validated_pool()
864            .clear_stale(&hash_and_number);
865        Ok(())
866    }
867
868    /// Return if the given ER exist in the consensus state
869    pub fn does_receipt_exist(
870        &self,
871        er_hash: BlockHashFor<DomainBlock>,
872    ) -> Result<bool, Box<dyn Error>> {
873        Ok(self
874            .client
875            .runtime_api()
876            .execution_receipt(self.client.info().best_hash, er_hash)?
877            .is_some())
878    }
879
880    /// Returns the stake summary of the Domain.
881    pub fn get_domain_staking_summary(
882        &self,
883        domain_id: DomainId,
884    ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
885        Ok(self
886            .client
887            .runtime_api()
888            .domain_stake_summary(self.client.info().best_hash, domain_id)?)
889    }
890
891    /// Return a future that only resolve if a fraud proof that the given `fraud_proof_predicate`
892    /// return true is submitted to the consensus tx pool
893    pub fn wait_for_fraud_proof<FP>(
894        &self,
895        fraud_proof_predicate: FP,
896    ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
897    where
898        FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
899    {
900        let tx_pool = self.transaction_pool.clone();
901        let mut import_tx_stream = self.transaction_pool.import_notification_stream();
902        Box::pin(async move {
903            while let Some(ready_tx_hash) = import_tx_stream.next().await {
904                let ready_tx = tx_pool
905                    .ready_transaction(&ready_tx_hash)
906                    .expect("Just get the ready tx hash from import stream; qed");
907                let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
908                    &mut ready_tx.data.encode().as_slice(),
909                )
910                .expect("Decode tx must success");
911                if let subspace_test_runtime::RuntimeCall::Domains(
912                    pallet_domains::Call::submit_fraud_proof { fraud_proof },
913                ) = ext.function
914                    && fraud_proof_predicate(&fraud_proof)
915                {
916                    return *fraud_proof;
917                }
918            }
919            unreachable!()
920        })
921    }
922
923    /// Get the free balance of the given account
924    pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
925        self.client
926            .runtime_api()
927            .free_balance(self.client.info().best_hash, account_id)
928            .expect("Fail to get account free balance")
929    }
930
931    /// Give the peer at `addr` the minimum reputation, which will ban it.
932    // TODO: also ban/unban in the DSN
933    pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
934        // If unban_peer() has been called on the peer, we need to bump it twice
935        // to give it the minimal reputation.
936        self.network_service.report_peer(
937            addr.peer_id,
938            ReputationChange::new_fatal("Peer banned by test (1)"),
939        );
940        self.network_service.report_peer(
941            addr.peer_id,
942            ReputationChange::new_fatal("Peer banned by test (2)"),
943        );
944    }
945
946    /// Give the peer at `addr` a high reputation, which guarantees it is un-banned it.
947    pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
948        // If ReputationChange::new_fatal() has been called on the peer, we need to bump it twice
949        // to give it a positive reputation.
950        self.network_service.report_peer(
951            addr.peer_id,
952            ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
953        );
954        self.network_service.report_peer(
955            addr.peer_id,
956            ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
957        );
958    }
959
960    /// Take and stop the `MockConsensusNode` and delete its database lock file.
961    ///
962    /// Stopping and restarting a node can cause weird race conditions, with errors like:
963    /// "The system cannot find the path specified".
964    /// If this happens, try increasing the wait time in this method.
965    pub async fn stop(self) -> Result<(), std::io::Error> {
966        let lock_file_path = self.base_path.path().join("paritydb").join("lock");
967        // On Windows, sometimes open files can’t be deleted so `drop` first then delete
968        std::mem::drop(self);
969
970        // Give the node time to cleanup, exit, and release the lock file.
971        // TODO: fix the underlying issue or wait for the actual shutdown instead
972        sleep(Duration::from_secs(2)).await;
973
974        // The lock file already being deleted is not a fatal test error, so just log it
975        if let Err(err) = std::fs::remove_file(lock_file_path) {
976            tracing::error!("deleting paritydb lock file failed: {err:?}");
977        }
978        Ok(())
979    }
980}
981
982impl MockConsensusNode {
983    async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
984        self.transaction_pool
985            .ready_at(parent_hash)
986            .await
987            .map(|pending_tx| pending_tx.data().as_ref().clone())
988            .collect()
989    }
990
991    async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
992        let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
993            <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
994        ));
995        let subspace_inherents =
996            sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
997
998        let inherent_data = (subspace_inherents, timestamp)
999            .create_inherent_data()
1000            .await?;
1001
1002        Ok(inherent_data)
1003    }
1004
1005    fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1006        let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1007            slot,
1008            solution: self.mock_solution.clone(),
1009            pot_info: PreDigestPotInfo::V0 {
1010                proof_of_time: Default::default(),
1011                future_proof_of_time: Default::default(),
1012            },
1013        };
1014        let mut digest = Digest::default();
1015        digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1016        digest
1017    }
1018
1019    /// Build block
1020    async fn build_block(
1021        &self,
1022        slot: Slot,
1023        parent_hash: BlockHashFor<Block>,
1024        extrinsics: Vec<ExtrinsicFor<Block>>,
1025    ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1026        let inherent_digest = self.mock_subspace_digest(slot);
1027
1028        let inherent_data = Self::mock_inherent_data(slot).await?;
1029
1030        let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1031            .on_parent_block(parent_hash)
1032            .fetch_parent_block_number(self.client.as_ref())?
1033            .with_inherent_digests(inherent_digest)
1034            .build()
1035            .expect("Creates new block builder");
1036
1037        let inherent_txns = block_builder.create_inherents(inherent_data)?;
1038
1039        for tx in inherent_txns.into_iter().chain(extrinsics) {
1040            if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1041                tracing::error!("Invalid transaction while building block: {}", err);
1042            }
1043        }
1044
1045        let (block, storage_changes, _) = block_builder.build()?.into_inner();
1046        Ok((block, storage_changes))
1047    }
1048
1049    /// Import block
1050    async fn import_block(
1051        &self,
1052        block: Block,
1053        storage_changes: Option<StorageChanges>,
1054    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1055        let (header, body) = block.deconstruct();
1056
1057        let header_hash = header.hash();
1058        let header_number = header.number;
1059
1060        let block_import_params = {
1061            let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1062            import_block.body = Some(body);
1063            import_block.state_action = match storage_changes {
1064                Some(changes) => {
1065                    StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1066                }
1067                None => StateAction::Execute,
1068            };
1069            import_block
1070        };
1071
1072        let import_result = self.block_import.import_block(block_import_params).await?;
1073
1074        if let Some(finalized_block_hash) = self
1075            .finalize_block_depth
1076            .and_then(|depth| header_number.checked_sub(depth))
1077            .and_then(|block_to_finalize| {
1078                self.client
1079                    .hash(block_to_finalize)
1080                    .expect("Block hash not found for number: {block_to_finalize:?}")
1081            })
1082        {
1083            self.client
1084                .finalize_block(finalized_block_hash, None, true)
1085                .unwrap();
1086        }
1087
1088        match import_result {
1089            ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1090            bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1091        }
1092    }
1093
1094    /// Produce a new block with the slot on top of `parent_hash`, with optional
1095    /// specified extrinsic list.
1096    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1097    pub async fn produce_block_with_slot_at(
1098        &mut self,
1099        new_slot: NewSlot,
1100        parent_hash: BlockHashFor<Block>,
1101        maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1102    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1103        let block_timer = time::Instant::now();
1104
1105        let extrinsics = match maybe_extrinsics {
1106            Some(extrinsics) => extrinsics,
1107            None => self.collect_txn_from_pool(parent_hash).await,
1108        };
1109        let tx_hashes: Vec<_> = extrinsics
1110            .iter()
1111            .map(|t| self.transaction_pool.hash_of(t))
1112            .collect();
1113
1114        let (block, storage_changes) = self
1115            .build_block(new_slot.0, parent_hash, extrinsics)
1116            .await?;
1117
1118        log_new_block(&block, block_timer.elapsed().as_millis());
1119
1120        let res = match self.import_block(block, Some(storage_changes)).await {
1121            Ok(hash) => {
1122                // Remove the tx of the imported block from the tx pool incase re-include them
1123                // in the future block by accident.
1124                self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1125                Ok(hash)
1126            }
1127            err => err,
1128        };
1129        self.confirm_block_import_processed().await;
1130        res
1131    }
1132
1133    /// Produce a new block on top of the current best block, with the extrinsics collected from
1134    /// the transaction pool.
1135    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1136    pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1137        self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1138            .await?;
1139        Ok(())
1140    }
1141
1142    /// Produce a new block on top of the current best block, with the specificed extrinsics.
1143    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1144    pub async fn produce_block_with_extrinsics(
1145        &mut self,
1146        extrinsics: Vec<ExtrinsicFor<Block>>,
1147    ) -> Result<(), Box<dyn Error>> {
1148        let slot = self.produce_slot();
1149        self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1150            .await?;
1151        Ok(())
1152    }
1153
1154    /// Produce `n` number of blocks.
1155    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1156    pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1157        for _ in 0..n {
1158            let slot = self.produce_slot();
1159            self.produce_block_with_slot(slot).await?;
1160        }
1161        Ok(())
1162    }
1163
1164    /// Produce `n` number of blocks and wait for bundle submitted to the block.
1165    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1166    pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1167        for _ in 0..n {
1168            let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1169            self.produce_block_with_slot(slot).await?;
1170        }
1171        Ok(())
1172    }
1173
1174    /// Get the nonce of the node account
1175    pub fn account_nonce(&self) -> u32 {
1176        self.client
1177            .runtime_api()
1178            .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1179            .expect("Fail to get account nonce")
1180    }
1181
1182    /// Get the nonce of the given account
1183    pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1184        self.client
1185            .runtime_api()
1186            .account_nonce(self.client.info().best_hash, account_id)
1187            .expect("Fail to get account nonce")
1188    }
1189
1190    /// Construct an extrinsic.
1191    pub fn construct_extrinsic(
1192        &self,
1193        nonce: u32,
1194        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1195    ) -> UncheckedExtrinsic {
1196        construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1197    }
1198
1199    /// Construct an unsigned general extrinsic.
1200    pub fn construct_unsigned_extrinsic(
1201        &self,
1202        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1203    ) -> UncheckedExtrinsic {
1204        construct_unsigned_extrinsic(&self.client, function)
1205    }
1206
1207    /// Construct and send extrinsic through rpc
1208    pub async fn construct_and_send_extrinsic_with(
1209        &self,
1210        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1211    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1212        let nonce = self.account_nonce();
1213        let extrinsic = self.construct_extrinsic(nonce, function);
1214        self.rpc_handlers.send_transaction(extrinsic.into()).await
1215    }
1216
1217    /// Get the nonce of the given account
1218    pub async fn send_extrinsic(
1219        &self,
1220        extrinsic: impl Into<OpaqueExtrinsic>,
1221    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1222        self.rpc_handlers.send_transaction(extrinsic.into()).await
1223    }
1224}
1225
1226fn log_new_block(block: &Block, used_time_ms: u128) {
1227    tracing::info!(
1228        "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1229        block.header().number(),
1230        used_time_ms,
1231        block.header().hash(),
1232        block.header().parent_hash(),
1233        block.extrinsics().len(),
1234        block
1235            .extrinsics()
1236            .iter()
1237            .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1238            .collect::<Vec<_>>()
1239            .join(", ")
1240    );
1241}
1242
1243fn mock_import_queue<Block: BlockT, I>(
1244    block_import: I,
1245    spawner: &impl SpawnEssentialNamed,
1246) -> BasicQueue<Block>
1247where
1248    I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1249{
1250    BasicQueue::new(
1251        MockVerifier::default(),
1252        Box::new(block_import),
1253        None,
1254        spawner,
1255        None,
1256    )
1257}
1258
1259struct MockVerifier<Block> {
1260    _marker: PhantomData<Block>,
1261}
1262
1263impl<Block> Default for MockVerifier<Block> {
1264    fn default() -> Self {
1265        Self {
1266            _marker: PhantomData,
1267        }
1268    }
1269}
1270
1271#[async_trait::async_trait]
1272impl<Block> VerifierT<Block> for MockVerifier<Block>
1273where
1274    Block: BlockT,
1275{
1276    async fn verify(
1277        &self,
1278        block_params: BlockImportParams<Block>,
1279    ) -> Result<BlockImportParams<Block>, String> {
1280        Ok(block_params)
1281    }
1282}
1283
1284// `MockBlockImport` is mostly port from `sc-consensus-subspace::SubspaceBlockImport` with all
1285// the consensus related logic removed.
1286#[allow(clippy::type_complexity)]
1287struct MockBlockImport<Client, Block: BlockT> {
1288    inner: Arc<Client>,
1289    block_importing_notification_subscribers:
1290        Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1291}
1292
1293impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1294    fn new(inner: Arc<Client>) -> Self {
1295        MockBlockImport {
1296            inner,
1297            block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1298        }
1299    }
1300
1301    // Subscribe the block importing notification
1302    fn block_importing_notification_stream(
1303        &self,
1304    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1305        let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1306        self.block_importing_notification_subscribers
1307            .lock()
1308            .push(tx);
1309        rx
1310    }
1311}
1312
1313impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1314    fn clone(&self) -> Self {
1315        MockBlockImport {
1316            inner: self.inner.clone(),
1317            block_importing_notification_subscribers: self
1318                .block_importing_notification_subscribers
1319                .clone(),
1320        }
1321    }
1322}
1323
1324#[async_trait::async_trait]
1325impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1326where
1327    Block: BlockT,
1328    for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1329    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1330    Client::Api: ApiExt<Block>,
1331{
1332    type Error = ConsensusError;
1333
1334    async fn import_block(
1335        &self,
1336        mut block: BlockImportParams<Block>,
1337    ) -> Result<ImportResult, Self::Error> {
1338        let block_number = *block.header.number();
1339        block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1340
1341        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1342
1343        // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver
1344        // will block forever as there is still a sender not closed.
1345        {
1346            let value = (block_number, acknowledgement_sender);
1347            self.block_importing_notification_subscribers
1348                .lock()
1349                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1350        }
1351
1352        while acknowledgement_receiver.next().await.is_some() {
1353            // Wait for all the acknowledgements to finish.
1354        }
1355
1356        self.inner.import_block(block).await
1357    }
1358
1359    async fn check_block(
1360        &self,
1361        block: BlockCheckParams<Block>,
1362    ) -> Result<ImportResult, Self::Error> {
1363        self.inner.check_block(block).await
1364    }
1365}
1366
1367/// Produce the given number of blocks for both the primary node and the domain nodes
1368#[macro_export]
1369macro_rules! produce_blocks {
1370    ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1371        {
1372            async {
1373                let domain_fut = {
1374                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1375                    futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1376                    $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1377                    futures::future::join_all(futs)
1378                };
1379                $primary_node.produce_blocks_with_bundles($count).await?;
1380                domain_fut.await;
1381                Ok::<(), Box<dyn std::error::Error>>(())
1382            }
1383        }
1384    };
1385}
1386
1387/// Producing one block for both the primary node and the domain nodes, where the primary node can
1388/// use the `produce_block_with_xxx` function (i.e. `produce_block_with_slot`) to produce block
1389#[macro_export]
1390macro_rules! produce_block_with {
1391    ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1392        {
1393            async {
1394                let domain_fut = {
1395                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1396                    futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1397                    $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1398                    futures::future::join_all(futs)
1399                };
1400                $primary_node_produce_block.await?;
1401                domain_fut.await;
1402                Ok::<(), Box<dyn std::error::Error>>(())
1403            }
1404        }
1405    };
1406}
1407
1408/// Keep producing block with a fixed interval until the given condition become `true`
1409#[macro_export]
1410macro_rules! produce_blocks_until {
1411    ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1412        async {
1413            while !$condition {
1414                produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1415                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1416            }
1417            Ok::<(), Box<dyn std::error::Error>>(())
1418        }
1419    };
1420}
1421
1422type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1423
1424fn get_signed_extra(
1425    current_block: u64,
1426    immortal: bool,
1427    nonce: u32,
1428    tip: BalanceOf<Runtime>,
1429) -> SignedExtra {
1430    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1431        .checked_next_power_of_two()
1432        .map(|c| c / 2)
1433        .unwrap_or(2);
1434    (
1435        frame_system::CheckNonZeroSender::<Runtime>::new(),
1436        frame_system::CheckSpecVersion::<Runtime>::new(),
1437        frame_system::CheckTxVersion::<Runtime>::new(),
1438        frame_system::CheckGenesis::<Runtime>::new(),
1439        frame_system::CheckMortality::<Runtime>::from(if immortal {
1440            generic::Era::Immortal
1441        } else {
1442            generic::Era::mortal(period, current_block)
1443        }),
1444        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1445        frame_system::CheckWeight::<Runtime>::new(),
1446        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1447        BalanceTransferCheckExtension::<Runtime>::default(),
1448        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1449        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1450        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1451    )
1452}
1453
1454fn construct_extrinsic_raw_payload<Client>(
1455    client: impl AsRef<Client>,
1456    function: <Runtime as frame_system::Config>::RuntimeCall,
1457    immortal: bool,
1458    nonce: u32,
1459    tip: BalanceOf<Runtime>,
1460) -> (
1461    SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1462    SignedExtra,
1463)
1464where
1465    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1466    u64: From<BlockNumberFor<Runtime>>,
1467    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1468{
1469    let current_block_hash = client.as_ref().info().best_hash;
1470    let current_block = client.as_ref().info().best_number.saturated_into();
1471    let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1472    let extra = get_signed_extra(current_block, immortal, nonce, tip);
1473    (
1474        generic::SignedPayload::<
1475            <Runtime as frame_system::Config>::RuntimeCall,
1476            SignedExtra,
1477        >::from_raw(
1478            function,
1479            extra.clone(),
1480            ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1481        ),
1482        extra,
1483    )
1484}
1485
1486/// Construct an extrinsic that can be applied to the test runtime.
1487pub fn construct_extrinsic_generic<Client>(
1488    client: impl AsRef<Client>,
1489    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1490    caller: Sr25519Keyring,
1491    immortal: bool,
1492    nonce: u32,
1493    tip: BalanceOf<Runtime>,
1494) -> UncheckedExtrinsic
1495where
1496    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1497    u64: From<BlockNumberFor<Runtime>>,
1498    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1499{
1500    let function = function.into();
1501    let (raw_payload, extra) =
1502        construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1503    let signature = raw_payload.using_encoded(|e| caller.sign(e));
1504    UncheckedExtrinsic::new_signed(
1505        function,
1506        MultiAddress::Id(caller.to_account_id()),
1507        Signature::Sr25519(signature),
1508        extra,
1509    )
1510}
1511
1512/// Construct a general unsigned extrinsic that can be applied to the test runtime.
1513fn construct_unsigned_extrinsic<Client>(
1514    client: impl AsRef<Client>,
1515    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1516) -> UncheckedExtrinsic
1517where
1518    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1519    u64: From<BlockNumberFor<Runtime>>,
1520    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1521{
1522    let function = function.into();
1523    let current_block = client.as_ref().info().best_number.saturated_into();
1524    let extra = get_signed_extra(current_block, true, 0, 0);
1525    UncheckedExtrinsic::new_transaction(function, extra)
1526}