subspace_test_service/
lib.rs

1// Copyright (C) 2021 Subspace Labs, Inc.
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Subspace test service only.
18
19#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{GossipWorkerBuilder, xdm_gossip_peers_set_config};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use parity_scale_codec::{Decode, Encode};
28use parking_lot::Mutex;
29use sc_block_builder::BlockBuilderBuilder;
30use sc_client_api::execution_extensions::ExtensionsFactory;
31use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
32use sc_consensus::block_import::{
33    BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
34};
35use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
36use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
37use sc_network::config::{NetworkConfiguration, TransportConfig};
38use sc_network::service::traits::NetworkService;
39use sc_network::{
40    NetworkWorker, NotificationMetrics, NotificationService, ReputationChange, multiaddr,
41};
42use sc_service::config::{
43    DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
44    OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, WasmExecutionMethod,
45    WasmtimeInstantiationStrategy,
46};
47use sc_service::{
48    BasePath, BlocksPruning, Configuration, NetworkStarter, Role, SpawnTasksParams, TaskManager,
49};
50use sc_transaction_pool::{BasicPool, FullChainApi, Options};
51use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
52use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
53use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
54use sp_api::{ApiExt, ProvideRuntimeApi};
55use sp_blockchain::{HashAndNumber, HeaderBackend};
56use sp_consensus::{BlockOrigin, Error as ConsensusError};
57use sp_consensus_slots::Slot;
58use sp_consensus_subspace::digests::{
59    CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
60};
61use sp_consensus_subspace::{PotExtension, SubspaceApi};
62use sp_core::H256;
63use sp_core::offchain::OffchainDbExt;
64use sp_core::offchain::storage::OffchainDb;
65use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
66use sp_domains::{BundleProducerElectionApi, ChainId, DomainsApi, OpaqueBundle};
67use sp_domains_fraud_proof::fraud_proof::FraudProof;
68use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
69use sp_externalities::Extensions;
70use sp_inherents::{InherentData, InherentDataProvider};
71use sp_keyring::Sr25519Keyring;
72use sp_messenger::MessengerApi;
73use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
74use sp_mmr_primitives::MmrApi;
75use sp_runtime::generic::{Digest, SignedPayload};
76use sp_runtime::traits::{
77    BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
78};
79use sp_runtime::{DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion, generic};
80use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
81use sp_timestamp::Timestamp;
82use std::collections::HashMap;
83use std::error::Error;
84use std::marker::PhantomData;
85use std::pin::Pin;
86use std::sync::Arc;
87use std::time;
88use std::time::Duration;
89use subspace_core_primitives::pot::PotOutput;
90use subspace_core_primitives::solutions::Solution;
91use subspace_core_primitives::{BlockNumber, PublicKey};
92use subspace_runtime_primitives::opaque::Block;
93use subspace_runtime_primitives::{
94    AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
95};
96use subspace_service::{FullSelectChain, RuntimeExecutor};
97use subspace_test_client::{Backend, Client, chain_spec};
98use subspace_test_primitives::OnchainStateApi;
99use subspace_test_runtime::{
100    DisablePallets, Runtime, RuntimeApi, RuntimeCall, SLOT_DURATION, SignedExtra,
101    UncheckedExtrinsic,
102};
103use substrate_frame_rpc_system::AccountNonceApi;
104use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
105use tokio::time::sleep;
106
107/// Helper type alias
108pub type FraudProofFor<Block, DomainBlock> =
109    FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
110
111const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
112
113/// Create a Subspace `Configuration`.
114///
115/// By default an in-memory socket will be used, therefore you need to provide boot
116/// nodes if you want the future node to be connected to other nodes.
117#[expect(clippy::too_many_arguments)]
118pub fn node_config(
119    tokio_handle: tokio::runtime::Handle,
120    key: Sr25519Keyring,
121    boot_nodes: Vec<MultiaddrWithPeerId>,
122    run_farmer: bool,
123    force_authoring: bool,
124    force_synced: bool,
125    private_evm: bool,
126    evm_owner_account: Option<AccountId>,
127    base_path: BasePath,
128) -> Configuration {
129    let root = base_path.path();
130    let role = if run_farmer {
131        Role::Authority
132    } else {
133        Role::Full
134    };
135    let key_seed = key.to_seed();
136    let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
137
138    let mut network_config = NetworkConfiguration::new(
139        key_seed.to_string(),
140        "network/test/0.1",
141        Default::default(),
142        None,
143    );
144
145    network_config.boot_nodes = boot_nodes;
146
147    network_config.allow_non_globals_in_dht = true;
148
149    let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
150    network_config.listen_addresses.push(addr.clone());
151
152    network_config.public_addresses.push(addr);
153
154    network_config.transport = TransportConfig::MemoryOnly;
155
156    network_config.force_synced = force_synced;
157
158    Configuration {
159        impl_name: "subspace-test-node".to_string(),
160        impl_version: "0.1".to_string(),
161        role,
162        tokio_handle,
163        transaction_pool: Default::default(),
164        network: network_config,
165        keystore: KeystoreConfig::InMemory,
166        database: DatabaseSource::ParityDb {
167            path: root.join("paritydb"),
168        },
169        trie_cache_maximum_size: Some(64 * 1024 * 1024),
170        state_pruning: Default::default(),
171        blocks_pruning: BlocksPruning::KeepAll,
172        chain_spec: Box::new(spec),
173        executor: ExecutorConfiguration {
174            wasm_method: WasmExecutionMethod::Compiled {
175                instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
176            },
177            max_runtime_instances: 8,
178            default_heap_pages: None,
179            runtime_cache_size: 2,
180        },
181        wasm_runtime_overrides: Default::default(),
182        rpc: RpcConfiguration {
183            addr: None,
184            max_request_size: 0,
185            max_response_size: 0,
186            id_provider: None,
187            max_subs_per_conn: 0,
188            port: 0,
189            message_buffer_capacity: 0,
190            batch_config: RpcBatchRequestConfig::Disabled,
191            max_connections: 0,
192            cors: None,
193            methods: Default::default(),
194            rate_limit: None,
195            rate_limit_whitelisted_ips: vec![],
196            rate_limit_trust_proxy_headers: false,
197        },
198        prometheus_config: None,
199        telemetry_endpoints: None,
200        offchain_worker: OffchainWorkerConfig {
201            enabled: false,
202            indexing_enabled: true,
203        },
204        force_authoring,
205        disable_grandpa: false,
206        dev_key_seed: Some(key_seed),
207        tracing_targets: None,
208        tracing_receiver: Default::default(),
209        announce_block: true,
210        data_path: base_path.path().into(),
211        base_path,
212    }
213}
214
215type StorageChanges = sp_api::StorageChanges<Block>;
216
217struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
218    consensus_client: Arc<Client>,
219    consensus_backend: Arc<CBackend>,
220    executor: Arc<Executor>,
221    mock_pot_verifier: Arc<MockPotVerfier>,
222    confirmation_depth_k: BlockNumber,
223    _phantom: PhantomData<DomainBlock>,
224}
225
226impl<Client, DomainBlock, Executor, CBackend>
227    MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
228{
229    fn new(
230        consensus_client: Arc<Client>,
231        executor: Arc<Executor>,
232        mock_pot_verifier: Arc<MockPotVerfier>,
233        consensus_backend: Arc<CBackend>,
234        confirmation_depth_k: BlockNumber,
235    ) -> Self {
236        Self {
237            consensus_client,
238            consensus_backend,
239            executor,
240            mock_pot_verifier,
241            confirmation_depth_k,
242            _phantom: Default::default(),
243        }
244    }
245}
246
247#[derive(Default)]
248struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
249
250impl MockPotVerfier {
251    fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
252        self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
253    }
254
255    fn inject_pot(&self, slot: u64, pot: PotOutput) {
256        self.0.lock().insert(slot, pot);
257    }
258}
259
260impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
261    for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
262where
263    Block: BlockT,
264    Block::Hash: From<H256> + Into<H256>,
265    DomainBlock: BlockT,
266    DomainBlock::Hash: Into<H256> + From<H256>,
267    Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
268    Client::Api: DomainsApi<Block, DomainBlock::Header>
269        + BundleProducerElectionApi<Block, Balance>
270        + MessengerApi<Block, NumberFor<Block>, Block::Hash>
271        + MmrApi<Block, H256, NumberFor<Block>>,
272    Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
273    CBackend: BackendT<Block> + 'static,
274{
275    fn extensions_for(
276        &self,
277        _block_hash: Block::Hash,
278        _block_number: NumberFor<Block>,
279    ) -> Extensions {
280        let confirmation_depth_k = self.confirmation_depth_k;
281        let mut exts = Extensions::new();
282        exts.register(FraudProofExtension::new(Arc::new(
283            FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
284                self.consensus_client.clone(),
285                self.executor.clone(),
286                move |client, executor| {
287                    let extension_factory =
288                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
289                            client,
290                            executor,
291                            confirmation_depth_k,
292                        );
293                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
294                },
295            ),
296        )));
297        exts.register(SubspaceMmrExtension::new(Arc::new(
298            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
299                self.consensus_client.clone(),
300                confirmation_depth_k,
301            ),
302        )));
303        exts.register(MessengerExtension::new(Arc::new(
304            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
305                self.consensus_client.clone(),
306                self.executor.clone(),
307            ),
308        )));
309
310        if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
311            let offchain_db = OffchainDb::new(offchain_storage);
312            exts.register(OffchainDbExt::new(offchain_db));
313        }
314        exts.register(PotExtension::new({
315            let client = Arc::clone(&self.consensus_client);
316            let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
317            Box::new(
318                move |parent_hash, slot, proof_of_time, _quick_verification| {
319                    let parent_hash = {
320                        let mut converted_parent_hash = Block::Hash::default();
321                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
322                        converted_parent_hash
323                    };
324
325                    let parent_header = match client.header(parent_hash) {
326                        Ok(Some(parent_header)) => parent_header,
327                        _ => return false,
328                    };
329                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
330                        Ok(parent_pre_digest) => parent_pre_digest,
331                        _ => return false,
332                    };
333
334                    let parent_slot = parent_pre_digest.slot();
335                    if slot <= *parent_slot {
336                        return false;
337                    }
338
339                    mock_pot_verifier.is_valid(slot, proof_of_time)
340                },
341            )
342        }));
343        exts
344    }
345}
346
347type NewSlot = (Slot, PotOutput);
348
349/// A mock Subspace consensus node instance used for testing.
350pub struct MockConsensusNode {
351    /// `TaskManager`'s instance.
352    pub task_manager: TaskManager,
353    /// Client's instance.
354    pub client: Arc<Client>,
355    /// Backend.
356    pub backend: Arc<Backend>,
357    /// Code executor.
358    pub executor: RuntimeExecutor,
359    /// Transaction pool.
360    pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
361    /// The SelectChain Strategy
362    pub select_chain: FullSelectChain,
363    /// Network service.
364    pub network_service: Arc<dyn NetworkService + Send + Sync>,
365    /// Cross-domain gossip notification service.
366    pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
367    /// Sync service.
368    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
369    /// RPC handlers.
370    pub rpc_handlers: sc_service::RpcHandlers,
371    /// Network starter
372    pub network_starter: Option<NetworkStarter>,
373    /// The next slot number
374    next_slot: u64,
375    /// The mock pot verifier
376    mock_pot_verifier: Arc<MockPotVerfier>,
377    /// The slot notification subscribers
378    new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
379    /// The acknowledgement sender subscribers
380    acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
381    /// Block import pipeline
382    block_import: MockBlockImport<Client, Block>,
383    xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
384    /// Mock subspace solution used to mock the subspace `PreDigest`
385    mock_solution: Solution<AccountId>,
386    log_prefix: &'static str,
387    /// Ferdie key
388    pub key: Sr25519Keyring,
389    finalize_block_depth: Option<NumberFor<Block>>,
390    /// The node's base path
391    base_path: BasePath,
392}
393
394impl MockConsensusNode {
395    /// Run a mock consensus node
396    pub fn run(
397        tokio_handle: tokio::runtime::Handle,
398        key: Sr25519Keyring,
399        base_path: BasePath,
400    ) -> MockConsensusNode {
401        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
402    }
403
404    /// Run a mock consensus node with a private EVM domain
405    pub fn run_with_private_evm(
406        tokio_handle: tokio::runtime::Handle,
407        key: Sr25519Keyring,
408        evm_owner: Option<Sr25519Keyring>,
409        base_path: BasePath,
410    ) -> MockConsensusNode {
411        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
412    }
413
414    /// Run a mock consensus node with finalization depth
415    pub fn run_with_finalization_depth(
416        tokio_handle: tokio::runtime::Handle,
417        key: Sr25519Keyring,
418        base_path: BasePath,
419        finalize_block_depth: Option<NumberFor<Block>>,
420        private_evm: bool,
421        evm_owner: Option<Sr25519Keyring>,
422    ) -> MockConsensusNode {
423        let log_prefix = key.into();
424
425        let mut config = node_config(
426            tokio_handle,
427            key,
428            vec![],
429            false,
430            false,
431            false,
432            private_evm,
433            evm_owner.map(|key| key.to_account_id()),
434            base_path.clone(),
435        );
436
437        // Set `transaction_pool.ban_time` to 0 such that duplicated tx will not immediately rejected
438        // by `TemporarilyBanned`
439        // rest of the options are taken from default
440        let tx_pool_options = Options {
441            ban_time: Duration::from_millis(0),
442            ..Default::default()
443        };
444
445        config.network.node_name = format!("{} (Consensus)", config.network.node_name);
446        let span = sc_tracing::tracing::info_span!(
447            sc_tracing::logging::PREFIX_LOG_SPAN,
448            name = config.network.node_name.as_str()
449        );
450        let _enter = span.enter();
451
452        let executor = sc_service::new_wasm_executor(&config.executor);
453
454        let (client, backend, keystore_container, mut task_manager) =
455            sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
456                .expect("Fail to new full parts");
457
458        let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
459        let client = Arc::new(client);
460        let mock_pot_verifier = Arc::new(MockPotVerfier::default());
461        let chain_constants = client
462            .runtime_api()
463            .chain_constants(client.info().best_hash)
464            .expect("Fail to get chain constants");
465        client
466            .execution_extensions()
467            .set_extensions_factory(MockExtensionsFactory::<
468                _,
469                DomainBlock,
470                sc_domains::RuntimeExecutor,
471                _,
472            >::new(
473                client.clone(),
474                domain_executor.clone(),
475                Arc::clone(&mock_pot_verifier),
476                backend.clone(),
477                chain_constants.confirmation_depth_k(),
478            ));
479
480        let select_chain = sc_consensus::LongestChain::new(backend.clone());
481        let transaction_pool = Arc::from(BasicPool::new_full(
482            tx_pool_options,
483            config.role.is_authority().into(),
484            config.prometheus_registry(),
485            task_manager.spawn_essential_handle(),
486            client.clone(),
487        ));
488
489        let block_import = MockBlockImport::<_, _>::new(client.clone());
490
491        let mut net_config = sc_network::config::FullNetworkConfiguration::<
492            _,
493            _,
494            NetworkWorker<_, _>,
495        >::new(&config.network, None);
496        let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
497            xdm_gossip_peers_set_config();
498        net_config.add_notification_protocol(xdm_gossip_notification_config);
499
500        let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
501            sc_service::build_network(sc_service::BuildNetworkParams {
502                config: &config,
503                net_config,
504                client: client.clone(),
505                transaction_pool: transaction_pool.clone(),
506                spawn_handle: task_manager.spawn_handle(),
507                import_queue: mock_import_queue(
508                    block_import.clone(),
509                    &task_manager.spawn_essential_handle(),
510                ),
511                block_announce_validator_builder: None,
512                warp_sync_config: None,
513                block_relay: None,
514                metrics: NotificationMetrics::new(None),
515            })
516            .expect("Should be able to build network");
517
518        let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
519            network: network_service.clone(),
520            client: client.clone(),
521            keystore: keystore_container.keystore(),
522            task_manager: &mut task_manager,
523            transaction_pool: transaction_pool.clone(),
524            rpc_builder: Box::new(|_| Ok(RpcModule::new(()))),
525            backend: backend.clone(),
526            system_rpc_tx,
527            config,
528            telemetry: None,
529            tx_handler_controller,
530            sync_service: sync_service.clone(),
531        })
532        .expect("Should be able to spawn tasks");
533
534        let mock_solution =
535            Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
536
537        let mut gossip_builder = GossipWorkerBuilder::new();
538
539        task_manager
540            .spawn_essential_handle()
541            .spawn_essential_blocking(
542                "consensus-chain-channel-update-worker",
543                None,
544                Box::pin(
545                    domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
546                        ChainId::Consensus,
547                        client.clone(),
548                        sync_service.clone(),
549                        gossip_builder.gossip_msg_sink(),
550                    ),
551                ),
552            );
553
554        let (consensus_msg_sink, consensus_msg_receiver) =
555            tracing_unbounded("consensus_message_channel", 100);
556
557        // Start cross domain message listener for Consensus chain to receive messages from domains in the network
558        let consensus_listener =
559            cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
560                ChainId::Consensus,
561                client.clone(),
562                client.clone(),
563                transaction_pool.clone(),
564                network_service.clone(),
565                consensus_msg_receiver,
566                domain_executor,
567                sync_service.clone(),
568            );
569
570        task_manager
571            .spawn_essential_handle()
572            .spawn_essential_blocking(
573                "consensus-message-listener",
574                None,
575                Box::pin(consensus_listener),
576            );
577
578        gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
579
580        task_manager.spawn_essential_handle().spawn_blocking(
581            "mmr-gadget",
582            None,
583            mmr_gadget::MmrGadget::start(
584                client.clone(),
585                backend.clone(),
586                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
587            ),
588        );
589
590        MockConsensusNode {
591            task_manager,
592            client,
593            backend,
594            executor,
595            transaction_pool,
596            select_chain,
597            network_service,
598            xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
599            sync_service,
600            rpc_handlers,
601            network_starter: Some(network_starter),
602            next_slot: 1,
603            mock_pot_verifier,
604            new_slot_notification_subscribers: Vec::new(),
605            acknowledgement_sender_subscribers: Vec::new(),
606            block_import,
607            xdm_gossip_worker_builder: Some(gossip_builder),
608            mock_solution,
609            log_prefix,
610            key,
611            finalize_block_depth,
612            base_path,
613        }
614    }
615
616    /// Start the mock consensus node network
617    pub fn start_network(&mut self) {
618        self.network_starter
619            .take()
620            .expect("mock consensus node network have not started yet")
621            .start_network();
622    }
623
624    /// Get the cross domain gossip message worker builder
625    pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
626        self.xdm_gossip_worker_builder
627            .as_mut()
628            .expect("gossip message worker have not started yet")
629    }
630
631    /// Start the cross domain gossip message worker.
632    pub fn start_cross_domain_gossip_message_worker(&mut self) {
633        let xdm_gossip_worker_builder = self
634            .xdm_gossip_worker_builder
635            .take()
636            .expect("gossip message worker have not started yet");
637        let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
638            self.network_service.clone(),
639            self.xdm_gossip_notification_service
640                .take()
641                .expect("XDM gossip notification service must be used only once"),
642            self.sync_service.clone(),
643        );
644        self.task_manager
645            .spawn_essential_handle()
646            .spawn_essential_blocking(
647                "cross-domain-gossip-message-worker",
648                None,
649                Box::pin(cross_domain_message_gossip_worker.run()),
650            );
651    }
652
653    /// Return the next slot number
654    pub fn next_slot(&self) -> u64 {
655        self.next_slot
656    }
657
658    /// Set the next slot number
659    pub fn set_next_slot(&mut self, next_slot: u64) {
660        self.next_slot = next_slot;
661    }
662
663    /// Produce a slot only, without waiting for the potential slot handlers.
664    pub fn produce_slot(&mut self) -> NewSlot {
665        let slot = Slot::from(self.next_slot);
666        let proof_of_time = PotOutput::from(
667            <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
668                .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
669        );
670        self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
671        self.next_slot += 1;
672
673        (slot, proof_of_time)
674    }
675
676    /// Notify the executor about the new slot and wait for the bundle produced at this slot.
677    pub async fn notify_new_slot_and_wait_for_bundle(
678        &mut self,
679        new_slot: NewSlot,
680    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
681        self.new_slot_notification_subscribers
682            .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
683
684        self.confirm_acknowledgement().await;
685        self.get_bundle_from_tx_pool(new_slot)
686    }
687
688    /// Produce a new slot and wait for a bundle produced at this slot.
689    pub async fn produce_slot_and_wait_for_bundle_submission(
690        &mut self,
691    ) -> (
692        NewSlot,
693        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
694    ) {
695        let slot = self.produce_slot();
696        for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
697            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
698                return (slot, bundle);
699            }
700        }
701        panic!(
702            "Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong"
703        );
704    }
705
706    /// Subscribe the new slot notification
707    pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
708        let (tx, rx) = mpsc::unbounded();
709        self.new_slot_notification_subscribers.push(tx);
710        rx
711    }
712
713    /// Subscribe the acknowledgement sender stream
714    pub fn new_acknowledgement_sender_stream(
715        &mut self,
716    ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
717        let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
718        self.acknowledgement_sender_subscribers.push(tx);
719        rx
720    }
721
722    /// Wait for all the acknowledgements before return
723    ///
724    /// It is used to wait for the acknowledgement of the domain worker to ensure it have
725    /// finish all the previous tasks before return
726    pub async fn confirm_acknowledgement(&mut self) {
727        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
728
729        // Must drop `acknowledgement_sender` after the notification otherwise the receiver
730        // will block forever as there is still a sender not closed.
731        {
732            self.acknowledgement_sender_subscribers
733                .retain(|subscriber| {
734                    subscriber
735                        .unbounded_send(acknowledgement_sender.clone())
736                        .is_ok()
737                });
738            drop(acknowledgement_sender);
739        }
740
741        // Wait for all the acknowledgements to progress and proactively drop closed subscribers.
742        while acknowledgement_receiver.next().await.is_some() {
743            // Wait for all the acknowledgements to finish.
744        }
745    }
746
747    /// Wait for the operator finish processing the consensus block before return
748    pub async fn confirm_block_import_processed(&mut self) {
749        // Send one more notification to ensure the previous consensus block import notificaion
750        // have received by the operator
751        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
752        {
753            // Must drop `block_import_acknowledgement_sender` after the notification otherwise
754            // the receiver will block forever as there is still a sender not closed.
755            // NOTE: it is okay to use the default block number since it is ignored in the consumer side.
756            let value = (NumberFor::<Block>::default(), acknowledgement_sender);
757            self.block_import
758                .block_importing_notification_subscribers
759                .lock()
760                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
761        }
762        while acknowledgement_receiver.next().await.is_some() {
763            // Wait for all the acknowledgements to finish.
764        }
765
766        // Ensure the operator finish processing the consensus block
767        self.confirm_acknowledgement().await;
768    }
769
770    /// Subscribe the block importing notification
771    pub fn block_importing_notification_stream(
772        &self,
773    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
774        self.block_import.block_importing_notification_stream()
775    }
776
777    /// Get the bundle that created at `slot` from the transaction pool
778    pub fn get_bundle_from_tx_pool(
779        &self,
780        new_slot: NewSlot,
781    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
782        for ready_tx in self.transaction_pool.ready() {
783            let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
784                .expect("should be able to decode");
785            if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
786                ext.function
787                && opaque_bundle.sealed_header.slot_number() == *new_slot.0
788            {
789                return Some(opaque_bundle);
790            }
791        }
792        None
793    }
794
795    /// Submit a tx to the tx pool
796    pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
797        self.transaction_pool
798            .submit_one(
799                self.client.info().best_hash,
800                TransactionSource::External,
801                tx,
802            )
803            .await
804            .map_err(|err| {
805                err.into_pool_error()
806                    .expect("should always be a pool error")
807            })
808    }
809
810    /// Remove all tx from the tx pool
811    pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
812        let txs: Vec<_> = self
813            .transaction_pool
814            .ready()
815            .map(|t| self.transaction_pool.hash_of(&t.data))
816            .collect();
817        self.prune_txs_from_pool(txs.as_slice()).await
818    }
819
820    /// Remove a ready transaction from transaction pool.
821    pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
822        self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
823            .await
824    }
825
826    async fn prune_txs_from_pool(
827        &self,
828        tx_hashes: &[BlockHashFor<Block>],
829    ) -> Result<(), Box<dyn Error>> {
830        let hash_and_number = HashAndNumber {
831            number: self.client.info().best_number,
832            hash: self.client.info().best_hash,
833        };
834        self.transaction_pool
835            .pool()
836            .prune_known(&hash_and_number, tx_hashes);
837        // `ban_time` have set to 0, explicitly wait 1ms here to ensure `clear_stale` will remove
838        // all the bans as the ban time must be passed.
839        tokio::time::sleep(time::Duration::from_millis(1)).await;
840        self.transaction_pool
841            .pool()
842            .validated_pool()
843            .clear_stale(&hash_and_number);
844        Ok(())
845    }
846
847    /// Return if the given ER exist in the consensus state
848    pub fn does_receipt_exist(
849        &self,
850        er_hash: BlockHashFor<DomainBlock>,
851    ) -> Result<bool, Box<dyn Error>> {
852        Ok(self
853            .client
854            .runtime_api()
855            .execution_receipt(self.client.info().best_hash, er_hash)?
856            .is_some())
857    }
858
859    /// Return a future that only resolve if a fraud proof that the given `fraud_proof_predicate`
860    /// return true is submitted to the consensus tx pool
861    pub fn wait_for_fraud_proof<FP>(
862        &self,
863        fraud_proof_predicate: FP,
864    ) -> Pin<Box<dyn Future<Output = FraudProofFor<Block, DomainBlock>> + Send>>
865    where
866        FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
867    {
868        let tx_pool = self.transaction_pool.clone();
869        let mut import_tx_stream = self.transaction_pool.import_notification_stream();
870        Box::pin(async move {
871            while let Some(ready_tx_hash) = import_tx_stream.next().await {
872                let ready_tx = tx_pool
873                    .ready_transaction(&ready_tx_hash)
874                    .expect("Just get the ready tx hash from import stream; qed");
875                let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
876                    &mut ready_tx.data.encode().as_slice(),
877                )
878                .expect("Decode tx must success");
879                if let subspace_test_runtime::RuntimeCall::Domains(
880                    pallet_domains::Call::submit_fraud_proof { fraud_proof },
881                ) = ext.function
882                    && fraud_proof_predicate(&fraud_proof)
883                {
884                    return *fraud_proof;
885                }
886            }
887            unreachable!()
888        })
889    }
890
891    /// Get the free balance of the given account
892    pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
893        self.client
894            .runtime_api()
895            .free_balance(self.client.info().best_hash, account_id)
896            .expect("Fail to get account free balance")
897    }
898
899    /// Give the peer at `addr` the minimum reputation, which will ban it.
900    // TODO: also ban/unban in the DSN
901    pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
902        // If unban_peer() has been called on the peer, we need to bump it twice
903        // to give it the minimal reputation.
904        self.network_service.report_peer(
905            addr.peer_id,
906            ReputationChange::new_fatal("Peer banned by test (1)"),
907        );
908        self.network_service.report_peer(
909            addr.peer_id,
910            ReputationChange::new_fatal("Peer banned by test (2)"),
911        );
912    }
913
914    /// Give the peer at `addr` a high reputation, which guarantees it is un-banned it.
915    pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
916        // If ReputationChange::new_fatal() has been called on the peer, we need to bump it twice
917        // to give it a positive reputation.
918        self.network_service.report_peer(
919            addr.peer_id,
920            ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
921        );
922        self.network_service.report_peer(
923            addr.peer_id,
924            ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
925        );
926    }
927
928    /// Take and stop the `MockConsensusNode` and delete its database lock file.
929    ///
930    /// Stopping and restarting a node can cause weird race conditions, with errors like:
931    /// "The system cannot find the path specified".
932    /// If this happens, try increasing the wait time in this method.
933    pub async fn stop(self) -> Result<(), std::io::Error> {
934        let lock_file_path = self.base_path.path().join("paritydb").join("lock");
935        // On Windows, sometimes open files can’t be deleted so `drop` first then delete
936        std::mem::drop(self);
937
938        // Give the node time to cleanup, exit, and release the lock file.
939        // TODO: fix the underlying issue or wait for the actual shutdown instead
940        sleep(Duration::from_secs(2)).await;
941
942        // The lock file already being deleted is not a fatal test error, so just log it
943        if let Err(err) = std::fs::remove_file(lock_file_path) {
944            tracing::error!("deleting paritydb lock file failed: {err:?}");
945        }
946        Ok(())
947    }
948}
949
950impl MockConsensusNode {
951    async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
952        self.transaction_pool
953            .ready_at(parent_hash)
954            .await
955            .map(|pending_tx| pending_tx.data().as_ref().clone())
956            .collect()
957    }
958
959    async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
960        let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
961            <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
962        ));
963        let subspace_inherents =
964            sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
965
966        let inherent_data = (subspace_inherents, timestamp)
967            .create_inherent_data()
968            .await?;
969
970        Ok(inherent_data)
971    }
972
973    fn mock_subspace_digest(&self, slot: Slot) -> Digest {
974        let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
975            slot,
976            solution: self.mock_solution.clone(),
977            pot_info: PreDigestPotInfo::V0 {
978                proof_of_time: Default::default(),
979                future_proof_of_time: Default::default(),
980            },
981        };
982        let mut digest = Digest::default();
983        digest.push(DigestItem::subspace_pre_digest(&pre_digest));
984        digest
985    }
986
987    /// Build block
988    async fn build_block(
989        &self,
990        slot: Slot,
991        parent_hash: BlockHashFor<Block>,
992        extrinsics: Vec<ExtrinsicFor<Block>>,
993    ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
994        let inherent_digest = self.mock_subspace_digest(slot);
995
996        let inherent_data = Self::mock_inherent_data(slot).await?;
997
998        let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
999            .on_parent_block(parent_hash)
1000            .fetch_parent_block_number(self.client.as_ref())?
1001            .with_inherent_digests(inherent_digest)
1002            .build()
1003            .expect("Creates new block builder");
1004
1005        let inherent_txns = block_builder.create_inherents(inherent_data)?;
1006
1007        for tx in inherent_txns.into_iter().chain(extrinsics) {
1008            if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1009                tracing::error!("Invalid transaction while building block: {}", err);
1010            }
1011        }
1012
1013        let (block, storage_changes, _) = block_builder.build()?.into_inner();
1014        Ok((block, storage_changes))
1015    }
1016
1017    /// Import block
1018    async fn import_block(
1019        &self,
1020        block: Block,
1021        storage_changes: Option<StorageChanges>,
1022    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1023        let (header, body) = block.deconstruct();
1024
1025        let header_hash = header.hash();
1026        let header_number = header.number;
1027
1028        let block_import_params = {
1029            let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1030            import_block.body = Some(body);
1031            import_block.state_action = match storage_changes {
1032                Some(changes) => {
1033                    StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1034                }
1035                None => StateAction::Execute,
1036            };
1037            import_block
1038        };
1039
1040        let import_result = self.block_import.import_block(block_import_params).await?;
1041
1042        if let Some(finalized_block_hash) = self
1043            .finalize_block_depth
1044            .and_then(|depth| header_number.checked_sub(depth))
1045            .and_then(|block_to_finalize| {
1046                self.client
1047                    .hash(block_to_finalize)
1048                    .expect("Block hash not found for number: {block_to_finalize:?}")
1049            })
1050        {
1051            self.client
1052                .finalize_block(finalized_block_hash, None, true)
1053                .unwrap();
1054        }
1055
1056        match import_result {
1057            ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1058            bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1059        }
1060    }
1061
1062    /// Produce a new block with the slot on top of `parent_hash`, with optional
1063    /// specified extrinsic list.
1064    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1065    pub async fn produce_block_with_slot_at(
1066        &mut self,
1067        new_slot: NewSlot,
1068        parent_hash: BlockHashFor<Block>,
1069        maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1070    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1071        let block_timer = time::Instant::now();
1072
1073        let extrinsics = match maybe_extrinsics {
1074            Some(extrinsics) => extrinsics,
1075            None => self.collect_txn_from_pool(parent_hash).await,
1076        };
1077        let tx_hashes: Vec<_> = extrinsics
1078            .iter()
1079            .map(|t| self.transaction_pool.hash_of(t))
1080            .collect();
1081
1082        let (block, storage_changes) = self
1083            .build_block(new_slot.0, parent_hash, extrinsics)
1084            .await?;
1085
1086        log_new_block(&block, block_timer.elapsed().as_millis());
1087
1088        let res = match self.import_block(block, Some(storage_changes)).await {
1089            Ok(hash) => {
1090                // Remove the tx of the imported block from the tx pool incase re-include them
1091                // in the future block by accident.
1092                self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1093                Ok(hash)
1094            }
1095            err => err,
1096        };
1097        self.confirm_block_import_processed().await;
1098        res
1099    }
1100
1101    /// Produce a new block on top of the current best block, with the extrinsics collected from
1102    /// the transaction pool.
1103    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1104    pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1105        self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1106            .await?;
1107        Ok(())
1108    }
1109
1110    /// Produce a new block on top of the current best block, with the specificed extrinsics.
1111    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1112    pub async fn produce_block_with_extrinsics(
1113        &mut self,
1114        extrinsics: Vec<ExtrinsicFor<Block>>,
1115    ) -> Result<(), Box<dyn Error>> {
1116        let slot = self.produce_slot();
1117        self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1118            .await?;
1119        Ok(())
1120    }
1121
1122    /// Produce `n` number of blocks.
1123    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1124    pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1125        for _ in 0..n {
1126            let slot = self.produce_slot();
1127            self.produce_block_with_slot(slot).await?;
1128        }
1129        Ok(())
1130    }
1131
1132    /// Produce `n` number of blocks and wait for bundle submitted to the block.
1133    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1134    pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1135        for _ in 0..n {
1136            let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1137            self.produce_block_with_slot(slot).await?;
1138        }
1139        Ok(())
1140    }
1141
1142    /// Get the nonce of the node account
1143    pub fn account_nonce(&self) -> u32 {
1144        self.client
1145            .runtime_api()
1146            .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1147            .expect("Fail to get account nonce")
1148    }
1149
1150    /// Get the nonce of the given account
1151    pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1152        self.client
1153            .runtime_api()
1154            .account_nonce(self.client.info().best_hash, account_id)
1155            .expect("Fail to get account nonce")
1156    }
1157
1158    /// Construct an extrinsic.
1159    pub fn construct_extrinsic(
1160        &self,
1161        nonce: u32,
1162        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1163    ) -> UncheckedExtrinsic {
1164        construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1165    }
1166
1167    /// Construct an unsigned general extrinsic.
1168    pub fn construct_unsigned_extrinsic(
1169        &self,
1170        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1171    ) -> UncheckedExtrinsic {
1172        construct_unsigned_extrinsic(&self.client, function)
1173    }
1174
1175    /// Construct and send extrinsic through rpc
1176    pub async fn construct_and_send_extrinsic_with(
1177        &self,
1178        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1179    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1180        let nonce = self.account_nonce();
1181        let extrinsic = self.construct_extrinsic(nonce, function);
1182        self.rpc_handlers.send_transaction(extrinsic.into()).await
1183    }
1184
1185    /// Get the nonce of the given account
1186    pub async fn send_extrinsic(
1187        &self,
1188        extrinsic: impl Into<OpaqueExtrinsic>,
1189    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1190        self.rpc_handlers.send_transaction(extrinsic.into()).await
1191    }
1192}
1193
1194fn log_new_block(block: &Block, used_time_ms: u128) {
1195    tracing::info!(
1196        "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1197        block.header().number(),
1198        used_time_ms,
1199        block.header().hash(),
1200        block.header().parent_hash(),
1201        block.extrinsics().len(),
1202        block
1203            .extrinsics()
1204            .iter()
1205            .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1206            .collect::<Vec<_>>()
1207            .join(", ")
1208    );
1209}
1210
1211fn mock_import_queue<Block: BlockT, I>(
1212    block_import: I,
1213    spawner: &impl SpawnEssentialNamed,
1214) -> BasicQueue<Block>
1215where
1216    I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1217{
1218    BasicQueue::new(
1219        MockVerifier::default(),
1220        Box::new(block_import),
1221        None,
1222        spawner,
1223        None,
1224    )
1225}
1226
1227struct MockVerifier<Block> {
1228    _marker: PhantomData<Block>,
1229}
1230
1231impl<Block> Default for MockVerifier<Block> {
1232    fn default() -> Self {
1233        Self {
1234            _marker: PhantomData,
1235        }
1236    }
1237}
1238
1239#[async_trait::async_trait]
1240impl<Block> VerifierT<Block> for MockVerifier<Block>
1241where
1242    Block: BlockT,
1243{
1244    async fn verify(
1245        &self,
1246        block_params: BlockImportParams<Block>,
1247    ) -> Result<BlockImportParams<Block>, String> {
1248        Ok(block_params)
1249    }
1250}
1251
1252// `MockBlockImport` is mostly port from `sc-consensus-subspace::SubspaceBlockImport` with all
1253// the consensus related logic removed.
1254#[allow(clippy::type_complexity)]
1255struct MockBlockImport<Client, Block: BlockT> {
1256    inner: Arc<Client>,
1257    block_importing_notification_subscribers:
1258        Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1259}
1260
1261impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1262    fn new(inner: Arc<Client>) -> Self {
1263        MockBlockImport {
1264            inner,
1265            block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1266        }
1267    }
1268
1269    // Subscribe the block importing notification
1270    fn block_importing_notification_stream(
1271        &self,
1272    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1273        let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1274        self.block_importing_notification_subscribers
1275            .lock()
1276            .push(tx);
1277        rx
1278    }
1279}
1280
1281impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1282    fn clone(&self) -> Self {
1283        MockBlockImport {
1284            inner: self.inner.clone(),
1285            block_importing_notification_subscribers: self
1286                .block_importing_notification_subscribers
1287                .clone(),
1288        }
1289    }
1290}
1291
1292#[async_trait::async_trait]
1293impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1294where
1295    Block: BlockT,
1296    for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1297    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1298    Client::Api: ApiExt<Block>,
1299{
1300    type Error = ConsensusError;
1301
1302    async fn import_block(
1303        &self,
1304        mut block: BlockImportParams<Block>,
1305    ) -> Result<ImportResult, Self::Error> {
1306        let block_number = *block.header.number();
1307        block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1308
1309        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1310
1311        // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver
1312        // will block forever as there is still a sender not closed.
1313        {
1314            let value = (block_number, acknowledgement_sender);
1315            self.block_importing_notification_subscribers
1316                .lock()
1317                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1318        }
1319
1320        while acknowledgement_receiver.next().await.is_some() {
1321            // Wait for all the acknowledgements to finish.
1322        }
1323
1324        self.inner.import_block(block).await
1325    }
1326
1327    async fn check_block(
1328        &self,
1329        block: BlockCheckParams<Block>,
1330    ) -> Result<ImportResult, Self::Error> {
1331        self.inner.check_block(block).await
1332    }
1333}
1334
1335/// Produce the given number of blocks for both the primary node and the domain nodes
1336#[macro_export]
1337macro_rules! produce_blocks {
1338    ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1339        {
1340            async {
1341                let domain_fut = {
1342                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1343                    futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1344                    $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1345                    futures::future::join_all(futs)
1346                };
1347                $primary_node.produce_blocks_with_bundles($count).await?;
1348                domain_fut.await;
1349                Ok::<(), Box<dyn std::error::Error>>(())
1350            }
1351        }
1352    };
1353}
1354
1355/// Producing one block for both the primary node and the domain nodes, where the primary node can
1356/// use the `produce_block_with_xxx` function (i.e. `produce_block_with_slot`) to produce block
1357#[macro_export]
1358macro_rules! produce_block_with {
1359    ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1360        {
1361            async {
1362                let domain_fut = {
1363                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1364                    futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1365                    $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1366                    futures::future::join_all(futs)
1367                };
1368                $primary_node_produce_block.await?;
1369                domain_fut.await;
1370                Ok::<(), Box<dyn std::error::Error>>(())
1371            }
1372        }
1373    };
1374}
1375
1376/// Keep producing block with a fixed interval until the given condition become `true`
1377#[macro_export]
1378macro_rules! produce_blocks_until {
1379    ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1380        async {
1381            while !$condition {
1382                produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1383                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1384            }
1385            Ok::<(), Box<dyn std::error::Error>>(())
1386        }
1387    };
1388}
1389
1390type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1391
1392fn get_signed_extra(
1393    current_block: u64,
1394    immortal: bool,
1395    nonce: u32,
1396    tip: BalanceOf<Runtime>,
1397) -> SignedExtra {
1398    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1399        .checked_next_power_of_two()
1400        .map(|c| c / 2)
1401        .unwrap_or(2);
1402    (
1403        frame_system::CheckNonZeroSender::<Runtime>::new(),
1404        frame_system::CheckSpecVersion::<Runtime>::new(),
1405        frame_system::CheckTxVersion::<Runtime>::new(),
1406        frame_system::CheckGenesis::<Runtime>::new(),
1407        frame_system::CheckMortality::<Runtime>::from(if immortal {
1408            generic::Era::Immortal
1409        } else {
1410            generic::Era::mortal(period, current_block)
1411        }),
1412        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1413        frame_system::CheckWeight::<Runtime>::new(),
1414        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1415        DisablePallets,
1416        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1417        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1418        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1419    )
1420}
1421
1422fn construct_extrinsic_raw_payload<Client>(
1423    client: impl AsRef<Client>,
1424    function: <Runtime as frame_system::Config>::RuntimeCall,
1425    immortal: bool,
1426    nonce: u32,
1427    tip: BalanceOf<Runtime>,
1428) -> (
1429    SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1430    SignedExtra,
1431)
1432where
1433    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1434    u64: From<BlockNumberFor<Runtime>>,
1435    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1436{
1437    let current_block_hash = client.as_ref().info().best_hash;
1438    let current_block = client.as_ref().info().best_number.saturated_into();
1439    let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1440    let extra = get_signed_extra(current_block, immortal, nonce, tip);
1441    (
1442        generic::SignedPayload::<
1443            <Runtime as frame_system::Config>::RuntimeCall,
1444            SignedExtra,
1445        >::from_raw(
1446            function,
1447            extra.clone(),
1448            ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1449        ),
1450        extra,
1451    )
1452}
1453
1454/// Construct an extrinsic that can be applied to the test runtime.
1455pub fn construct_extrinsic_generic<Client>(
1456    client: impl AsRef<Client>,
1457    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1458    caller: Sr25519Keyring,
1459    immortal: bool,
1460    nonce: u32,
1461    tip: BalanceOf<Runtime>,
1462) -> UncheckedExtrinsic
1463where
1464    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1465    u64: From<BlockNumberFor<Runtime>>,
1466    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1467{
1468    let function = function.into();
1469    let (raw_payload, extra) =
1470        construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1471    let signature = raw_payload.using_encoded(|e| caller.sign(e));
1472    UncheckedExtrinsic::new_signed(
1473        function,
1474        MultiAddress::Id(caller.to_account_id()),
1475        Signature::Sr25519(signature),
1476        extra,
1477    )
1478}
1479
1480/// Construct a general unsigned extrinsic that can be applied to the test runtime.
1481fn construct_unsigned_extrinsic<Client>(
1482    client: impl AsRef<Client>,
1483    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1484) -> UncheckedExtrinsic
1485where
1486    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1487    u64: From<BlockNumberFor<Runtime>>,
1488    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1489{
1490    let function = function.into();
1491    let current_block = client.as_ref().info().best_number.saturated_into();
1492    let extra = get_signed_extra(current_block, true, 0, 0);
1493    UncheckedExtrinsic::new_transaction(function, extra)
1494}