subspace_test_service/
lib.rs

1// Copyright (C) 2021 Subspace Labs, Inc.
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Subspace test service only.
18
19#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use pallet_domains::staking::StakingSummary;
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use sc_block_builder::BlockBuilderBuilder;
31use sc_client_api::execution_extensions::ExtensionsFactory;
32use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
33use sc_consensus::block_import::{
34    BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
35};
36use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
37use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
38use sc_network::config::{NetworkConfiguration, TransportConfig};
39use sc_network::service::traits::NetworkService;
40use sc_network::{
41    NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
42};
43use sc_service::config::{
44    DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
45    OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, WasmExecutionMethod,
46    WasmtimeInstantiationStrategy,
47};
48use sc_service::{
49    BasePath, BlocksPruning, Configuration, NetworkStarter, Role, SpawnTasksParams, TaskManager,
50};
51use sc_transaction_pool::{BasicPool, FullChainApi, Options};
52use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
53use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
54use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
55use sp_api::{ApiExt, ProvideRuntimeApi};
56use sp_blockchain::{HashAndNumber, HeaderBackend};
57use sp_consensus::{BlockOrigin, Error as ConsensusError};
58use sp_consensus_slots::Slot;
59use sp_consensus_subspace::digests::{
60    CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
61};
62use sp_consensus_subspace::{PotExtension, SubspaceApi};
63use sp_core::H256;
64use sp_core::offchain::OffchainDbExt;
65use sp_core::offchain::storage::OffchainDb;
66use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
67use sp_domains::bundle::OpaqueBundle;
68use sp_domains::{BundleProducerElectionApi, ChainId, DomainId, DomainsApi, OperatorId};
69use sp_domains_fraud_proof::fraud_proof::FraudProof;
70use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
71use sp_externalities::Extensions;
72use sp_inherents::{InherentData, InherentDataProvider};
73use sp_keyring::Sr25519Keyring;
74use sp_messenger::MessengerApi;
75use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
76use sp_mmr_primitives::MmrApi;
77use sp_runtime::generic::{Digest, SignedPayload};
78use sp_runtime::traits::{
79    BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
80};
81use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
82use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
83use sp_timestamp::Timestamp;
84use std::collections::HashMap;
85use std::error::Error;
86use std::marker::PhantomData;
87use std::pin::Pin;
88use std::sync::Arc;
89use std::time;
90use std::time::Duration;
91use subspace_core_primitives::pot::PotOutput;
92use subspace_core_primitives::solutions::Solution;
93use subspace_core_primitives::{BlockNumber, PublicKey};
94use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
95use subspace_runtime_primitives::opaque::Block;
96use subspace_runtime_primitives::{
97    AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
98};
99use subspace_service::{FullSelectChain, RuntimeExecutor};
100use subspace_test_client::{Backend, Client, chain_spec};
101use subspace_test_primitives::OnchainStateApi;
102use subspace_test_runtime::{
103    Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra, UncheckedExtrinsic,
104};
105use substrate_frame_rpc_system::AccountNonceApi;
106use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
107use tokio::time::sleep;
108
109/// Helper type alias
110pub type FraudProofFor<Block, DomainBlock> =
111    FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
112
113const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
114
115/// Create a Subspace `Configuration`.
116///
117/// By default an in-memory socket will be used, therefore you need to provide boot
118/// nodes if you want the future node to be connected to other nodes.
119#[expect(clippy::too_many_arguments)]
120pub fn node_config(
121    tokio_handle: tokio::runtime::Handle,
122    key: Sr25519Keyring,
123    boot_nodes: Vec<MultiaddrWithPeerId>,
124    run_farmer: bool,
125    force_authoring: bool,
126    force_synced: bool,
127    private_evm: bool,
128    evm_owner_account: Option<AccountId>,
129    base_path: BasePath,
130) -> Configuration {
131    let root = base_path.path();
132    let role = if run_farmer {
133        Role::Authority
134    } else {
135        Role::Full
136    };
137    let key_seed = key.to_seed();
138    let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
139
140    let mut network_config = NetworkConfiguration::new(
141        key_seed.to_string(),
142        "network/test/0.1",
143        Default::default(),
144        None,
145    );
146
147    network_config.boot_nodes = boot_nodes;
148
149    network_config.allow_non_globals_in_dht = true;
150
151    let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
152    network_config.listen_addresses.push(addr.clone());
153
154    network_config.public_addresses.push(addr);
155
156    network_config.transport = TransportConfig::MemoryOnly;
157
158    network_config.force_synced = force_synced;
159
160    Configuration {
161        impl_name: "subspace-test-node".to_string(),
162        impl_version: "0.1".to_string(),
163        role,
164        tokio_handle,
165        transaction_pool: Default::default(),
166        network: network_config,
167        keystore: KeystoreConfig::InMemory,
168        database: DatabaseSource::ParityDb {
169            path: root.join("paritydb"),
170        },
171        trie_cache_maximum_size: Some(64 * 1024 * 1024),
172        state_pruning: Default::default(),
173        blocks_pruning: BlocksPruning::KeepAll,
174        chain_spec: Box::new(spec),
175        executor: ExecutorConfiguration {
176            wasm_method: WasmExecutionMethod::Compiled {
177                instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
178            },
179            max_runtime_instances: 8,
180            default_heap_pages: None,
181            runtime_cache_size: 2,
182        },
183        wasm_runtime_overrides: Default::default(),
184        rpc: RpcConfiguration {
185            addr: None,
186            max_request_size: 0,
187            max_response_size: 0,
188            id_provider: None,
189            max_subs_per_conn: 0,
190            port: 0,
191            message_buffer_capacity: 0,
192            batch_config: RpcBatchRequestConfig::Disabled,
193            max_connections: 0,
194            cors: None,
195            methods: Default::default(),
196            rate_limit: None,
197            rate_limit_whitelisted_ips: vec![],
198            rate_limit_trust_proxy_headers: false,
199        },
200        prometheus_config: None,
201        telemetry_endpoints: None,
202        offchain_worker: OffchainWorkerConfig {
203            enabled: false,
204            indexing_enabled: true,
205        },
206        force_authoring,
207        disable_grandpa: false,
208        dev_key_seed: Some(key_seed),
209        tracing_targets: None,
210        tracing_receiver: Default::default(),
211        announce_block: true,
212        data_path: base_path.path().into(),
213        base_path,
214    }
215}
216
217type StorageChanges = sp_api::StorageChanges<Block>;
218
219struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
220    consensus_client: Arc<Client>,
221    consensus_backend: Arc<CBackend>,
222    executor: Arc<Executor>,
223    mock_pot_verifier: Arc<MockPotVerfier>,
224    confirmation_depth_k: BlockNumber,
225    _phantom: PhantomData<DomainBlock>,
226}
227
228impl<Client, DomainBlock, Executor, CBackend>
229    MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
230{
231    fn new(
232        consensus_client: Arc<Client>,
233        executor: Arc<Executor>,
234        mock_pot_verifier: Arc<MockPotVerfier>,
235        consensus_backend: Arc<CBackend>,
236        confirmation_depth_k: BlockNumber,
237    ) -> Self {
238        Self {
239            consensus_client,
240            consensus_backend,
241            executor,
242            mock_pot_verifier,
243            confirmation_depth_k,
244            _phantom: Default::default(),
245        }
246    }
247}
248
249#[derive(Default)]
250struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
251
252impl MockPotVerfier {
253    fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
254        self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
255    }
256
257    fn inject_pot(&self, slot: u64, pot: PotOutput) {
258        self.0.lock().insert(slot, pot);
259    }
260}
261
262impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
263    for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
264where
265    Block: BlockT,
266    Block::Hash: From<H256> + Into<H256>,
267    DomainBlock: BlockT,
268    DomainBlock::Hash: Into<H256> + From<H256>,
269    Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
270    Client::Api: DomainsApi<Block, DomainBlock::Header>
271        + BundleProducerElectionApi<Block, Balance>
272        + MessengerApi<Block, NumberFor<Block>, Block::Hash>
273        + MmrApi<Block, H256, NumberFor<Block>>,
274    Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
275    CBackend: BackendT<Block> + 'static,
276{
277    fn extensions_for(
278        &self,
279        _block_hash: Block::Hash,
280        _block_number: NumberFor<Block>,
281    ) -> Extensions {
282        let confirmation_depth_k = self.confirmation_depth_k;
283        let mut exts = Extensions::new();
284        exts.register(FraudProofExtension::new(Arc::new(
285            FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
286                self.consensus_client.clone(),
287                self.executor.clone(),
288                move |client, executor| {
289                    let extension_factory =
290                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
291                            client,
292                            executor,
293                            confirmation_depth_k,
294                        );
295                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
296                },
297            ),
298        )));
299        exts.register(SubspaceMmrExtension::new(Arc::new(
300            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
301                self.consensus_client.clone(),
302                confirmation_depth_k,
303            ),
304        )));
305        exts.register(MessengerExtension::new(Arc::new(
306            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
307                self.consensus_client.clone(),
308                self.executor.clone(),
309            ),
310        )));
311
312        if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
313            let offchain_db = OffchainDb::new(offchain_storage);
314            exts.register(OffchainDbExt::new(offchain_db));
315        }
316        exts.register(PotExtension::new({
317            let client = Arc::clone(&self.consensus_client);
318            let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
319            Box::new(
320                move |parent_hash, slot, proof_of_time, _quick_verification| {
321                    let parent_hash = {
322                        let mut converted_parent_hash = Block::Hash::default();
323                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
324                        converted_parent_hash
325                    };
326
327                    let parent_header = match client.header(parent_hash) {
328                        Ok(Some(parent_header)) => parent_header,
329                        _ => return false,
330                    };
331                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
332                        Ok(parent_pre_digest) => parent_pre_digest,
333                        _ => return false,
334                    };
335
336                    let parent_slot = parent_pre_digest.slot();
337                    if slot <= *parent_slot {
338                        return false;
339                    }
340
341                    mock_pot_verifier.is_valid(slot, proof_of_time)
342                },
343            )
344        }));
345        exts
346    }
347}
348
349type NewSlot = (Slot, PotOutput);
350
351/// A mock Subspace consensus node instance used for testing.
352pub struct MockConsensusNode {
353    /// `TaskManager`'s instance.
354    pub task_manager: TaskManager,
355    /// Client's instance.
356    pub client: Arc<Client>,
357    /// Backend.
358    pub backend: Arc<Backend>,
359    /// Code executor.
360    pub executor: RuntimeExecutor,
361    /// Transaction pool.
362    pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
363    /// The SelectChain Strategy
364    pub select_chain: FullSelectChain,
365    /// Network service.
366    pub network_service: Arc<dyn NetworkService + Send + Sync>,
367    /// Cross-domain gossip notification service.
368    pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
369    /// Sync service.
370    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
371    /// RPC handlers.
372    pub rpc_handlers: sc_service::RpcHandlers,
373    /// Network starter
374    pub network_starter: Option<NetworkStarter>,
375    /// The next slot number
376    next_slot: u64,
377    /// The mock pot verifier
378    mock_pot_verifier: Arc<MockPotVerfier>,
379    /// The slot notification subscribers
380    new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
381    /// The acknowledgement sender subscribers
382    acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
383    /// Block import pipeline
384    block_import: MockBlockImport<Client, Block>,
385    xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
386    /// Mock subspace solution used to mock the subspace `PreDigest`
387    mock_solution: Solution<AccountId>,
388    log_prefix: &'static str,
389    /// Ferdie key
390    pub key: Sr25519Keyring,
391    finalize_block_depth: Option<NumberFor<Block>>,
392    /// The node's base path
393    base_path: BasePath,
394}
395
396impl MockConsensusNode {
397    /// Run a mock consensus node
398    pub fn run(
399        tokio_handle: tokio::runtime::Handle,
400        key: Sr25519Keyring,
401        base_path: BasePath,
402    ) -> MockConsensusNode {
403        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
404    }
405
406    /// Run a mock consensus node with a private EVM domain
407    pub fn run_with_private_evm(
408        tokio_handle: tokio::runtime::Handle,
409        key: Sr25519Keyring,
410        evm_owner: Option<Sr25519Keyring>,
411        base_path: BasePath,
412    ) -> MockConsensusNode {
413        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
414    }
415
416    /// Run a mock consensus node with finalization depth
417    pub fn run_with_finalization_depth(
418        tokio_handle: tokio::runtime::Handle,
419        key: Sr25519Keyring,
420        base_path: BasePath,
421        finalize_block_depth: Option<NumberFor<Block>>,
422        private_evm: bool,
423        evm_owner: Option<Sr25519Keyring>,
424    ) -> MockConsensusNode {
425        let log_prefix = key.into();
426
427        let mut config = node_config(
428            tokio_handle,
429            key,
430            vec![],
431            false,
432            false,
433            false,
434            private_evm,
435            evm_owner.map(|key| key.to_account_id()),
436            base_path.clone(),
437        );
438
439        // Set `transaction_pool.ban_time` to 0 such that duplicated tx will not immediately rejected
440        // by `TemporarilyBanned`
441        // rest of the options are taken from default
442        let tx_pool_options = Options {
443            ban_time: Duration::from_millis(0),
444            ..Default::default()
445        };
446
447        config.network.node_name = format!("{} (Consensus)", config.network.node_name);
448        let span = sc_tracing::tracing::info_span!(
449            sc_tracing::logging::PREFIX_LOG_SPAN,
450            name = config.network.node_name.as_str()
451        );
452        let _enter = span.enter();
453
454        let executor = sc_service::new_wasm_executor(&config.executor);
455
456        let (client, backend, keystore_container, mut task_manager) =
457            sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
458                .expect("Fail to new full parts");
459
460        let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
461        let client = Arc::new(client);
462        let mock_pot_verifier = Arc::new(MockPotVerfier::default());
463        let chain_constants = client
464            .runtime_api()
465            .chain_constants(client.info().best_hash)
466            .expect("Fail to get chain constants");
467        client
468            .execution_extensions()
469            .set_extensions_factory(MockExtensionsFactory::<
470                _,
471                DomainBlock,
472                sc_domains::RuntimeExecutor,
473                _,
474            >::new(
475                client.clone(),
476                domain_executor.clone(),
477                Arc::clone(&mock_pot_verifier),
478                backend.clone(),
479                chain_constants.confirmation_depth_k(),
480            ));
481
482        let select_chain = sc_consensus::LongestChain::new(backend.clone());
483        let transaction_pool = Arc::from(BasicPool::new_full(
484            tx_pool_options,
485            config.role.is_authority().into(),
486            config.prometheus_registry(),
487            task_manager.spawn_essential_handle(),
488            client.clone(),
489        ));
490
491        let block_import = MockBlockImport::<_, _>::new(client.clone());
492
493        let mut net_config = sc_network::config::FullNetworkConfiguration::<
494            _,
495            _,
496            NetworkWorker<_, _>,
497        >::new(&config.network, None);
498        let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
499            xdm_gossip_peers_set_config();
500        net_config.add_notification_protocol(xdm_gossip_notification_config);
501
502        let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
503            sc_service::build_network(sc_service::BuildNetworkParams {
504                config: &config,
505                net_config,
506                client: client.clone(),
507                transaction_pool: transaction_pool.clone(),
508                spawn_handle: task_manager.spawn_handle(),
509                import_queue: mock_import_queue(
510                    block_import.clone(),
511                    &task_manager.spawn_essential_handle(),
512                ),
513                block_announce_validator_builder: None,
514                warp_sync_config: None,
515                block_relay: None,
516                metrics: NotificationMetrics::new(None),
517            })
518            .expect("Should be able to build network");
519
520        let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
521            network: network_service.clone(),
522            client: client.clone(),
523            keystore: keystore_container.keystore(),
524            task_manager: &mut task_manager,
525            transaction_pool: transaction_pool.clone(),
526            rpc_builder: Box::new(|_| Ok(RpcModule::new(()))),
527            backend: backend.clone(),
528            system_rpc_tx,
529            config,
530            telemetry: None,
531            tx_handler_controller,
532            sync_service: sync_service.clone(),
533        })
534        .expect("Should be able to spawn tasks");
535
536        let mock_solution =
537            Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
538
539        let mut gossip_builder = GossipWorkerBuilder::new();
540
541        task_manager
542            .spawn_essential_handle()
543            .spawn_essential_blocking(
544                "consensus-chain-channel-update-worker",
545                None,
546                Box::pin(
547                    domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
548                        ChainId::Consensus,
549                        client.clone(),
550                        sync_service.clone(),
551                        gossip_builder.gossip_msg_sink(),
552                    ),
553                ),
554            );
555
556        let (consensus_msg_sink, consensus_msg_receiver) =
557            tracing_unbounded("consensus_message_channel", 100);
558
559        // Start cross domain message listener for Consensus chain to receive messages from domains in the network
560        let consensus_listener =
561            cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
562                ChainId::Consensus,
563                client.clone(),
564                client.clone(),
565                transaction_pool.clone(),
566                network_service.clone(),
567                consensus_msg_receiver,
568                domain_executor,
569                sync_service.clone(),
570            );
571
572        task_manager
573            .spawn_essential_handle()
574            .spawn_essential_blocking(
575                "consensus-message-listener",
576                None,
577                Box::pin(consensus_listener),
578            );
579
580        gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
581
582        task_manager.spawn_essential_handle().spawn_blocking(
583            "mmr-gadget",
584            None,
585            mmr_gadget::MmrGadget::start(
586                client.clone(),
587                backend.clone(),
588                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
589            ),
590        );
591
592        MockConsensusNode {
593            task_manager,
594            client,
595            backend,
596            executor,
597            transaction_pool,
598            select_chain,
599            network_service,
600            xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
601            sync_service,
602            rpc_handlers,
603            network_starter: Some(network_starter),
604            next_slot: 1,
605            mock_pot_verifier,
606            new_slot_notification_subscribers: Vec::new(),
607            acknowledgement_sender_subscribers: Vec::new(),
608            block_import,
609            xdm_gossip_worker_builder: Some(gossip_builder),
610            mock_solution,
611            log_prefix,
612            key,
613            finalize_block_depth,
614            base_path,
615        }
616    }
617
618    /// Start the mock consensus node network
619    pub fn start_network(&mut self) {
620        self.network_starter
621            .take()
622            .expect("mock consensus node network have not started yet")
623            .start_network();
624    }
625
626    /// Get the cross domain gossip message worker builder
627    pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
628        self.xdm_gossip_worker_builder
629            .as_mut()
630            .expect("gossip message worker have not started yet")
631    }
632
633    /// Start the cross domain gossip message worker.
634    pub fn start_cross_domain_gossip_message_worker(&mut self) {
635        let xdm_gossip_worker_builder = self
636            .xdm_gossip_worker_builder
637            .take()
638            .expect("gossip message worker have not started yet");
639        let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
640            self.network_service.clone(),
641            self.xdm_gossip_notification_service
642                .take()
643                .expect("XDM gossip notification service must be used only once"),
644            self.sync_service.clone(),
645        );
646        self.task_manager
647            .spawn_essential_handle()
648            .spawn_essential_blocking(
649                "cross-domain-gossip-message-worker",
650                None,
651                Box::pin(cross_domain_message_gossip_worker.run()),
652            );
653    }
654
655    /// Return the next slot number
656    pub fn next_slot(&self) -> u64 {
657        self.next_slot
658    }
659
660    /// Set the next slot number
661    pub fn set_next_slot(&mut self, next_slot: u64) {
662        self.next_slot = next_slot;
663    }
664
665    /// Produce a slot only, without waiting for the potential slot handlers.
666    pub fn produce_slot(&mut self) -> NewSlot {
667        let slot = Slot::from(self.next_slot);
668        let proof_of_time = PotOutput::from(
669            <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
670                .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
671        );
672        self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
673        self.next_slot += 1;
674
675        (slot, proof_of_time)
676    }
677
678    /// Notify the executor about the new slot and wait for the bundle produced at this slot.
679    pub async fn notify_new_slot_and_wait_for_bundle(
680        &mut self,
681        new_slot: NewSlot,
682    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
683        self.new_slot_notification_subscribers
684            .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
685
686        self.confirm_acknowledgement().await;
687        self.get_bundle_from_tx_pool(new_slot)
688    }
689
690    /// Produce a new slot and wait for a bundle produced at this slot.
691    pub async fn produce_slot_and_wait_for_bundle_submission(
692        &mut self,
693    ) -> (
694        NewSlot,
695        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
696    ) {
697        let slot = self.produce_slot();
698        for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
699            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
700                return (slot, bundle);
701            }
702        }
703        panic!(
704            "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
705        );
706    }
707
708    /// Produce a slot and wait for bundle submission from specific operator.
709    pub async fn produce_slot_and_wait_for_bundle_submission_from_operator(
710        &mut self,
711        operator_id: OperatorId,
712    ) -> (
713        NewSlot,
714        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
715    ) {
716        loop {
717            let slot = self.produce_slot();
718            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await
719                && bundle.sealed_header().proof_of_election().operator_id == operator_id
720            {
721                return (slot, bundle);
722            }
723        }
724    }
725
726    /// Subscribe the new slot notification
727    pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
728        let (tx, rx) = mpsc::unbounded();
729        self.new_slot_notification_subscribers.push(tx);
730        rx
731    }
732
733    /// Subscribe the acknowledgement sender stream
734    pub fn new_acknowledgement_sender_stream(
735        &mut self,
736    ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
737        let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
738        self.acknowledgement_sender_subscribers.push(tx);
739        rx
740    }
741
742    /// Wait for all the acknowledgements before return
743    ///
744    /// It is used to wait for the acknowledgement of the domain worker to ensure it have
745    /// finish all the previous tasks before return
746    pub async fn confirm_acknowledgement(&mut self) {
747        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
748
749        // Must drop `acknowledgement_sender` after the notification otherwise the receiver
750        // will block forever as there is still a sender not closed.
751        {
752            self.acknowledgement_sender_subscribers
753                .retain(|subscriber| {
754                    subscriber
755                        .unbounded_send(acknowledgement_sender.clone())
756                        .is_ok()
757                });
758            drop(acknowledgement_sender);
759        }
760
761        // Wait for all the acknowledgements to progress and proactively drop closed subscribers.
762        while acknowledgement_receiver.next().await.is_some() {
763            // Wait for all the acknowledgements to finish.
764        }
765    }
766
767    /// Wait for the operator finish processing the consensus block before return
768    pub async fn confirm_block_import_processed(&mut self) {
769        // Send one more notification to ensure the previous consensus block import notificaion
770        // have received by the operator
771        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
772        {
773            // Must drop `block_import_acknowledgement_sender` after the notification otherwise
774            // the receiver will block forever as there is still a sender not closed.
775            // NOTE: it is okay to use the default block number since it is ignored in the consumer side.
776            let value = (NumberFor::<Block>::default(), acknowledgement_sender);
777            self.block_import
778                .block_importing_notification_subscribers
779                .lock()
780                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
781        }
782        while acknowledgement_receiver.next().await.is_some() {
783            // Wait for all the acknowledgements to finish.
784        }
785
786        // Ensure the operator finish processing the consensus block
787        self.confirm_acknowledgement().await;
788    }
789
790    /// Subscribe the block importing notification
791    pub fn block_importing_notification_stream(
792        &self,
793    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
794        self.block_import.block_importing_notification_stream()
795    }
796
797    /// Get the bundle that created at `slot` from the transaction pool
798    pub fn get_bundle_from_tx_pool(
799        &self,
800        new_slot: NewSlot,
801    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
802        for ready_tx in self.transaction_pool.ready() {
803            let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
804                .expect("should be able to decode");
805            if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
806                ext.function
807                && opaque_bundle.sealed_header().slot_number() == *new_slot.0
808            {
809                return Some(opaque_bundle);
810            }
811        }
812        None
813    }
814
815    /// Submit a tx to the tx pool
816    pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
817        self.transaction_pool
818            .submit_one(
819                self.client.info().best_hash,
820                TransactionSource::External,
821                tx,
822            )
823            .await
824            .map_err(|err| {
825                err.into_pool_error()
826                    .expect("should always be a pool error")
827            })
828    }
829
830    /// Remove all tx from the tx pool
831    pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
832        let txs: Vec<_> = self
833            .transaction_pool
834            .ready()
835            .map(|t| self.transaction_pool.hash_of(&t.data))
836            .collect();
837        self.prune_txs_from_pool(txs.as_slice()).await
838    }
839
840    /// Remove a ready transaction from transaction pool.
841    pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
842        self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
843            .await
844    }
845
846    async fn prune_txs_from_pool(
847        &self,
848        tx_hashes: &[BlockHashFor<Block>],
849    ) -> Result<(), Box<dyn Error>> {
850        let hash_and_number = HashAndNumber {
851            number: self.client.info().best_number,
852            hash: self.client.info().best_hash,
853        };
854        self.transaction_pool
855            .pool()
856            .prune_known(&hash_and_number, tx_hashes);
857        // `ban_time` have set to 0, explicitly wait 1ms here to ensure `clear_stale` will remove
858        // all the bans as the ban time must be passed.
859        tokio::time::sleep(time::Duration::from_millis(1)).await;
860        self.transaction_pool
861            .pool()
862            .validated_pool()
863            .clear_stale(&hash_and_number);
864        Ok(())
865    }
866
867    /// Return if the given ER exist in the consensus state
868    pub fn does_receipt_exist(
869        &self,
870        er_hash: BlockHashFor<DomainBlock>,
871    ) -> Result<bool, Box<dyn Error>> {
872        Ok(self
873            .client
874            .runtime_api()
875            .execution_receipt(self.client.info().best_hash, er_hash)?
876            .is_some())
877    }
878
879    /// Returns the stake summary of the Domain.
880    pub fn get_domain_staking_summary(
881        &self,
882        domain_id: DomainId,
883    ) -> Result<Option<StakingSummary<OperatorId, Balance>>, Box<dyn Error>> {
884        Ok(self
885            .client
886            .runtime_api()
887            .domain_stake_summary(self.client.info().best_hash, domain_id)?)
888    }
889
890    /// Returns the domain block pruning depth.
891    pub fn get_domain_block_pruning_depth(&self) -> Result<BlockNumber, Box<dyn Error>> {
892        Ok(self
893            .client
894            .runtime_api()
895            .block_pruning_depth(self.client.info().best_hash)?)
896    }
897
898    /// Return a future that only resolve if a fraud proof that the given `fraud_proof_predicate`
899    /// return true is submitted to the consensus tx pool
900    pub fn wait_for_fraud_proof<FP>(
901        &self,
902        fraud_proof_predicate: FP,
903    ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
904    where
905        FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
906    {
907        let tx_pool = self.transaction_pool.clone();
908        let mut import_tx_stream = self.transaction_pool.import_notification_stream();
909        Box::pin(async move {
910            while let Some(ready_tx_hash) = import_tx_stream.next().await {
911                let ready_tx = tx_pool
912                    .ready_transaction(&ready_tx_hash)
913                    .expect("Just get the ready tx hash from import stream; qed");
914                let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
915                    &mut ready_tx.data.encode().as_slice(),
916                )
917                .expect("Decode tx must success");
918                if let subspace_test_runtime::RuntimeCall::Domains(
919                    pallet_domains::Call::submit_fraud_proof { fraud_proof },
920                ) = ext.function
921                    && fraud_proof_predicate(&fraud_proof)
922                {
923                    return *fraud_proof;
924                }
925            }
926            unreachable!()
927        })
928    }
929
930    /// Get the free balance of the given account
931    pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
932        self.client
933            .runtime_api()
934            .free_balance(self.client.info().best_hash, account_id)
935            .expect("Fail to get account free balance")
936    }
937
938    /// Give the peer at `addr` the minimum reputation, which will ban it.
939    // TODO: also ban/unban in the DSN
940    pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
941        // If unban_peer() has been called on the peer, we need to bump it twice
942        // to give it the minimal reputation.
943        self.network_service.report_peer(
944            addr.peer_id,
945            ReputationChange::new_fatal("Peer banned by test (1)"),
946        );
947        self.network_service.report_peer(
948            addr.peer_id,
949            ReputationChange::new_fatal("Peer banned by test (2)"),
950        );
951    }
952
953    /// Give the peer at `addr` a high reputation, which guarantees it is un-banned it.
954    pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
955        // If ReputationChange::new_fatal() has been called on the peer, we need to bump it twice
956        // to give it a positive reputation.
957        self.network_service.report_peer(
958            addr.peer_id,
959            ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
960        );
961        self.network_service.report_peer(
962            addr.peer_id,
963            ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
964        );
965    }
966
967    /// Take and stop the `MockConsensusNode` and delete its database lock file.
968    ///
969    /// Stopping and restarting a node can cause weird race conditions, with errors like:
970    /// "The system cannot find the path specified".
971    /// If this happens, try increasing the wait time in this method.
972    pub async fn stop(self) -> Result<(), std::io::Error> {
973        let lock_file_path = self.base_path.path().join("paritydb").join("lock");
974        // On Windows, sometimes open files can’t be deleted so `drop` first then delete
975        std::mem::drop(self);
976
977        // Give the node time to cleanup, exit, and release the lock file.
978        // TODO: fix the underlying issue or wait for the actual shutdown instead
979        sleep(Duration::from_secs(2)).await;
980
981        // The lock file already being deleted is not a fatal test error, so just log it
982        if let Err(err) = std::fs::remove_file(lock_file_path) {
983            tracing::error!("deleting paritydb lock file failed: {err:?}");
984        }
985        Ok(())
986    }
987}
988
989impl MockConsensusNode {
990    async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
991        self.transaction_pool
992            .ready_at(parent_hash)
993            .await
994            .map(|pending_tx| pending_tx.data().as_ref().clone())
995            .collect()
996    }
997
998    async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
999        let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
1000            <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
1001        ));
1002        let subspace_inherents =
1003            sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
1004
1005        let inherent_data = (subspace_inherents, timestamp)
1006            .create_inherent_data()
1007            .await?;
1008
1009        Ok(inherent_data)
1010    }
1011
1012    fn mock_subspace_digest(&self, slot: Slot) -> Digest {
1013        let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
1014            slot,
1015            solution: self.mock_solution.clone(),
1016            pot_info: PreDigestPotInfo::V0 {
1017                proof_of_time: Default::default(),
1018                future_proof_of_time: Default::default(),
1019            },
1020        };
1021        let mut digest = Digest::default();
1022        digest.push(DigestItem::subspace_pre_digest(&pre_digest));
1023        digest
1024    }
1025
1026    /// Build block
1027    async fn build_block(
1028        &self,
1029        slot: Slot,
1030        parent_hash: BlockHashFor<Block>,
1031        extrinsics: Vec<ExtrinsicFor<Block>>,
1032    ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
1033        let inherent_digest = self.mock_subspace_digest(slot);
1034
1035        let inherent_data = Self::mock_inherent_data(slot).await?;
1036
1037        let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
1038            .on_parent_block(parent_hash)
1039            .fetch_parent_block_number(self.client.as_ref())?
1040            .with_inherent_digests(inherent_digest)
1041            .build()
1042            .expect("Creates new block builder");
1043
1044        let inherent_txns = block_builder.create_inherents(inherent_data)?;
1045
1046        for tx in inherent_txns.into_iter().chain(extrinsics) {
1047            if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1048                tracing::error!("Invalid transaction while building block: {}", err);
1049            }
1050        }
1051
1052        let (block, storage_changes, _) = block_builder.build()?.into_inner();
1053        Ok((block, storage_changes))
1054    }
1055
1056    /// Import block
1057    async fn import_block(
1058        &self,
1059        block: Block,
1060        storage_changes: Option<StorageChanges>,
1061    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1062        let (header, body) = block.deconstruct();
1063
1064        let header_hash = header.hash();
1065        let header_number = header.number;
1066
1067        let block_import_params = {
1068            let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1069            import_block.body = Some(body);
1070            import_block.state_action = match storage_changes {
1071                Some(changes) => {
1072                    StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1073                }
1074                None => StateAction::Execute,
1075            };
1076            import_block
1077        };
1078
1079        let import_result = self.block_import.import_block(block_import_params).await?;
1080
1081        if let Some(finalized_block_hash) = self
1082            .finalize_block_depth
1083            .and_then(|depth| header_number.checked_sub(depth))
1084            .and_then(|block_to_finalize| {
1085                self.client
1086                    .hash(block_to_finalize)
1087                    .expect("Block hash not found for number: {block_to_finalize:?}")
1088            })
1089        {
1090            self.client
1091                .finalize_block(finalized_block_hash, None, true)
1092                .unwrap();
1093        }
1094
1095        match import_result {
1096            ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1097            bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1098        }
1099    }
1100
1101    /// Produce a new block with the slot on top of `parent_hash`, with optional
1102    /// specified extrinsic list.
1103    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1104    pub async fn produce_block_with_slot_at(
1105        &mut self,
1106        new_slot: NewSlot,
1107        parent_hash: BlockHashFor<Block>,
1108        maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1109    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1110        let block_timer = time::Instant::now();
1111
1112        let extrinsics = match maybe_extrinsics {
1113            Some(extrinsics) => extrinsics,
1114            None => self.collect_txn_from_pool(parent_hash).await,
1115        };
1116        let tx_hashes: Vec<_> = extrinsics
1117            .iter()
1118            .map(|t| self.transaction_pool.hash_of(t))
1119            .collect();
1120
1121        let (block, storage_changes) = self
1122            .build_block(new_slot.0, parent_hash, extrinsics)
1123            .await?;
1124
1125        log_new_block(&block, block_timer.elapsed().as_millis());
1126
1127        let res = match self.import_block(block, Some(storage_changes)).await {
1128            Ok(hash) => {
1129                // Remove the tx of the imported block from the tx pool incase re-include them
1130                // in the future block by accident.
1131                self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1132                Ok(hash)
1133            }
1134            err => err,
1135        };
1136        self.confirm_block_import_processed().await;
1137        res
1138    }
1139
1140    /// Produce a new block on top of the current best block, with the extrinsics collected from
1141    /// the transaction pool.
1142    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1143    pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1144        self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1145            .await?;
1146        Ok(())
1147    }
1148
1149    /// Produce a new block on top of the current best block, with the specificed extrinsics.
1150    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1151    pub async fn produce_block_with_extrinsics(
1152        &mut self,
1153        extrinsics: Vec<ExtrinsicFor<Block>>,
1154    ) -> Result<(), Box<dyn Error>> {
1155        let slot = self.produce_slot();
1156        self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1157            .await?;
1158        Ok(())
1159    }
1160
1161    /// Produce `n` number of blocks.
1162    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1163    pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1164        for _ in 0..n {
1165            let slot = self.produce_slot();
1166            self.produce_block_with_slot(slot).await?;
1167        }
1168        Ok(())
1169    }
1170
1171    /// Produce `n` number of blocks and wait for bundle submitted to the block.
1172    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1173    pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1174        for _ in 0..n {
1175            let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1176            self.produce_block_with_slot(slot).await?;
1177        }
1178        Ok(())
1179    }
1180
1181    /// Get the nonce of the node account
1182    pub fn account_nonce(&self) -> u32 {
1183        self.client
1184            .runtime_api()
1185            .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1186            .expect("Fail to get account nonce")
1187    }
1188
1189    /// Get the nonce of the given account
1190    pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1191        self.client
1192            .runtime_api()
1193            .account_nonce(self.client.info().best_hash, account_id)
1194            .expect("Fail to get account nonce")
1195    }
1196
1197    /// Construct an extrinsic.
1198    pub fn construct_extrinsic(
1199        &self,
1200        nonce: u32,
1201        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1202    ) -> UncheckedExtrinsic {
1203        construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1204    }
1205
1206    /// Construct an unsigned general extrinsic.
1207    pub fn construct_unsigned_extrinsic(
1208        &self,
1209        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1210    ) -> UncheckedExtrinsic {
1211        construct_unsigned_extrinsic(&self.client, function)
1212    }
1213
1214    /// Construct and send extrinsic through rpc
1215    pub async fn construct_and_send_extrinsic_with(
1216        &self,
1217        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1218    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1219        let nonce = self.account_nonce();
1220        let extrinsic = self.construct_extrinsic(nonce, function);
1221        self.rpc_handlers.send_transaction(extrinsic.into()).await
1222    }
1223
1224    /// Get the nonce of the given account
1225    pub async fn send_extrinsic(
1226        &self,
1227        extrinsic: impl Into<OpaqueExtrinsic>,
1228    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1229        self.rpc_handlers.send_transaction(extrinsic.into()).await
1230    }
1231}
1232
1233fn log_new_block(block: &Block, used_time_ms: u128) {
1234    tracing::info!(
1235        "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1236        block.header().number(),
1237        used_time_ms,
1238        block.header().hash(),
1239        block.header().parent_hash(),
1240        block.extrinsics().len(),
1241        block
1242            .extrinsics()
1243            .iter()
1244            .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1245            .collect::<Vec<_>>()
1246            .join(", ")
1247    );
1248}
1249
1250fn mock_import_queue<Block: BlockT, I>(
1251    block_import: I,
1252    spawner: &impl SpawnEssentialNamed,
1253) -> BasicQueue<Block>
1254where
1255    I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1256{
1257    BasicQueue::new(
1258        MockVerifier::default(),
1259        Box::new(block_import),
1260        None,
1261        spawner,
1262        None,
1263    )
1264}
1265
1266struct MockVerifier<Block> {
1267    _marker: PhantomData<Block>,
1268}
1269
1270impl<Block> Default for MockVerifier<Block> {
1271    fn default() -> Self {
1272        Self {
1273            _marker: PhantomData,
1274        }
1275    }
1276}
1277
1278#[async_trait::async_trait]
1279impl<Block> VerifierT<Block> for MockVerifier<Block>
1280where
1281    Block: BlockT,
1282{
1283    async fn verify(
1284        &self,
1285        block_params: BlockImportParams<Block>,
1286    ) -> Result<BlockImportParams<Block>, String> {
1287        Ok(block_params)
1288    }
1289}
1290
1291// `MockBlockImport` is mostly port from `sc-consensus-subspace::SubspaceBlockImport` with all
1292// the consensus related logic removed.
1293#[allow(clippy::type_complexity)]
1294struct MockBlockImport<Client, Block: BlockT> {
1295    inner: Arc<Client>,
1296    block_importing_notification_subscribers:
1297        Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1298}
1299
1300impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1301    fn new(inner: Arc<Client>) -> Self {
1302        MockBlockImport {
1303            inner,
1304            block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1305        }
1306    }
1307
1308    // Subscribe the block importing notification
1309    fn block_importing_notification_stream(
1310        &self,
1311    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1312        let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1313        self.block_importing_notification_subscribers
1314            .lock()
1315            .push(tx);
1316        rx
1317    }
1318}
1319
1320impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1321    fn clone(&self) -> Self {
1322        MockBlockImport {
1323            inner: self.inner.clone(),
1324            block_importing_notification_subscribers: self
1325                .block_importing_notification_subscribers
1326                .clone(),
1327        }
1328    }
1329}
1330
1331#[async_trait::async_trait]
1332impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1333where
1334    Block: BlockT,
1335    for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1336    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1337    Client::Api: ApiExt<Block>,
1338{
1339    type Error = ConsensusError;
1340
1341    async fn import_block(
1342        &self,
1343        mut block: BlockImportParams<Block>,
1344    ) -> Result<ImportResult, Self::Error> {
1345        let block_number = *block.header.number();
1346        block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1347
1348        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1349
1350        // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver
1351        // will block forever as there is still a sender not closed.
1352        {
1353            let value = (block_number, acknowledgement_sender);
1354            self.block_importing_notification_subscribers
1355                .lock()
1356                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1357        }
1358
1359        while acknowledgement_receiver.next().await.is_some() {
1360            // Wait for all the acknowledgements to finish.
1361        }
1362
1363        self.inner.import_block(block).await
1364    }
1365
1366    async fn check_block(
1367        &self,
1368        block: BlockCheckParams<Block>,
1369    ) -> Result<ImportResult, Self::Error> {
1370        self.inner.check_block(block).await
1371    }
1372}
1373
1374/// Produce the given number of blocks for both the primary node and the domain nodes
1375#[macro_export]
1376macro_rules! produce_blocks {
1377    ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1378        {
1379            async {
1380                let domain_fut = {
1381                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1382                    futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1383                    $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1384                    futures::future::join_all(futs)
1385                };
1386                $primary_node.produce_blocks_with_bundles($count).await?;
1387                domain_fut.await;
1388                Ok::<(), Box<dyn std::error::Error>>(())
1389            }
1390        }
1391    };
1392}
1393
1394/// Producing one block for both the primary node and the domain nodes, where the primary node can
1395/// use the `produce_block_with_xxx` function (i.e. `produce_block_with_slot`) to produce block
1396#[macro_export]
1397macro_rules! produce_block_with {
1398    ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1399        {
1400            async {
1401                let domain_fut = {
1402                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1403                    futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1404                    $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1405                    futures::future::join_all(futs)
1406                };
1407                $primary_node_produce_block.await?;
1408                domain_fut.await;
1409                Ok::<(), Box<dyn std::error::Error>>(())
1410            }
1411        }
1412    };
1413}
1414
1415/// Keep producing block with a fixed interval until the given condition become `true`
1416#[macro_export]
1417macro_rules! produce_blocks_until {
1418    ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1419        async {
1420            while !$condition {
1421                produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1422                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1423            }
1424            Ok::<(), Box<dyn std::error::Error>>(())
1425        }
1426    };
1427}
1428
1429type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1430
1431fn get_signed_extra(
1432    current_block: u64,
1433    immortal: bool,
1434    nonce: u32,
1435    tip: BalanceOf<Runtime>,
1436) -> SignedExtra {
1437    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1438        .checked_next_power_of_two()
1439        .map(|c| c / 2)
1440        .unwrap_or(2);
1441    (
1442        frame_system::CheckNonZeroSender::<Runtime>::new(),
1443        frame_system::CheckSpecVersion::<Runtime>::new(),
1444        frame_system::CheckTxVersion::<Runtime>::new(),
1445        frame_system::CheckGenesis::<Runtime>::new(),
1446        frame_system::CheckMortality::<Runtime>::from(if immortal {
1447            generic::Era::Immortal
1448        } else {
1449            generic::Era::mortal(period, current_block)
1450        }),
1451        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1452        frame_system::CheckWeight::<Runtime>::new(),
1453        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1454        BalanceTransferCheckExtension::<Runtime>::default(),
1455        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1456        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1457        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1458    )
1459}
1460
1461fn construct_extrinsic_raw_payload<Client>(
1462    client: impl AsRef<Client>,
1463    function: <Runtime as frame_system::Config>::RuntimeCall,
1464    immortal: bool,
1465    nonce: u32,
1466    tip: BalanceOf<Runtime>,
1467) -> (
1468    SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1469    SignedExtra,
1470)
1471where
1472    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1473    u64: From<BlockNumberFor<Runtime>>,
1474    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1475{
1476    let current_block_hash = client.as_ref().info().best_hash;
1477    let current_block = client.as_ref().info().best_number.saturated_into();
1478    let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1479    let extra = get_signed_extra(current_block, immortal, nonce, tip);
1480    (
1481        generic::SignedPayload::<
1482            <Runtime as frame_system::Config>::RuntimeCall,
1483            SignedExtra,
1484        >::from_raw(
1485            function,
1486            extra.clone(),
1487            ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1488        ),
1489        extra,
1490    )
1491}
1492
1493/// Construct an extrinsic that can be applied to the test runtime.
1494pub fn construct_extrinsic_generic<Client>(
1495    client: impl AsRef<Client>,
1496    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1497    caller: Sr25519Keyring,
1498    immortal: bool,
1499    nonce: u32,
1500    tip: BalanceOf<Runtime>,
1501) -> UncheckedExtrinsic
1502where
1503    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1504    u64: From<BlockNumberFor<Runtime>>,
1505    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1506{
1507    let function = function.into();
1508    let (raw_payload, extra) =
1509        construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1510    let signature = raw_payload.using_encoded(|e| caller.sign(e));
1511    UncheckedExtrinsic::new_signed(
1512        function,
1513        MultiAddress::Id(caller.to_account_id()),
1514        Signature::Sr25519(signature),
1515        extra,
1516    )
1517}
1518
1519/// Construct a general unsigned extrinsic that can be applied to the test runtime.
1520fn construct_unsigned_extrinsic<Client>(
1521    client: impl AsRef<Client>,
1522    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1523) -> UncheckedExtrinsic
1524where
1525    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1526    u64: From<BlockNumberFor<Runtime>>,
1527    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1528{
1529    let function = function.into();
1530    let current_block = client.as_ref().info().best_number.saturated_into();
1531    let extra = get_signed_extra(current_block, true, 0, 0);
1532    UncheckedExtrinsic::new_transaction(function, extra)
1533}