subspace_test_service/
lib.rs

1// Copyright (C) 2021 Subspace Labs, Inc.
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Subspace test service only.
18
19#![warn(missing_docs, unused_crate_dependencies)]
20
21use cross_domain_message_gossip::{xdm_gossip_peers_set_config, GossipWorkerBuilder};
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system::pallet_prelude::BlockNumberFor;
24use futures::channel::mpsc;
25use futures::{Future, StreamExt};
26use jsonrpsee::RpcModule;
27use parity_scale_codec::{Decode, Encode};
28use parking_lot::Mutex;
29use sc_block_builder::BlockBuilderBuilder;
30use sc_client_api::execution_extensions::ExtensionsFactory;
31use sc_client_api::{Backend as BackendT, BlockBackend, ExecutorProvider, Finalizer};
32use sc_consensus::block_import::{
33    BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult,
34};
35use sc_consensus::{BasicQueue, BlockImport, StateAction, Verifier as VerifierT};
36use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
37use sc_network::config::{NetworkConfiguration, TransportConfig};
38use sc_network::service::traits::NetworkService;
39use sc_network::{
40    multiaddr, NetworkWorker, NotificationMetrics, NotificationService, ReputationChange,
41};
42use sc_service::config::{
43    DatabaseSource, ExecutorConfiguration, KeystoreConfig, MultiaddrWithPeerId,
44    OffchainWorkerConfig, RpcBatchRequestConfig, RpcConfiguration, WasmExecutionMethod,
45    WasmtimeInstantiationStrategy,
46};
47use sc_service::{
48    BasePath, BlocksPruning, Configuration, NetworkStarter, Role, SpawnTasksParams, TaskManager,
49};
50use sc_transaction_pool::{BasicPool, FullChainApi, Options};
51use sc_transaction_pool_api::error::{Error as TxPoolError, IntoPoolError};
52use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TransactionSource};
53use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
54use sp_api::{ApiExt, ProvideRuntimeApi};
55use sp_blockchain::{HashAndNumber, HeaderBackend};
56use sp_consensus::{BlockOrigin, Error as ConsensusError};
57use sp_consensus_slots::Slot;
58use sp_consensus_subspace::digests::{
59    extract_pre_digest, CompatibleDigestItem, PreDigest, PreDigestPotInfo,
60};
61use sp_consensus_subspace::{PotExtension, SubspaceApi};
62use sp_core::offchain::storage::OffchainDb;
63use sp_core::offchain::OffchainDbExt;
64use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
65use sp_core::H256;
66use sp_domains::{BundleProducerElectionApi, ChainId, DomainsApi, OpaqueBundle};
67use sp_domains_fraud_proof::fraud_proof::FraudProof;
68use sp_domains_fraud_proof::{FraudProofExtension, FraudProofHostFunctionsImpl};
69use sp_externalities::Extensions;
70use sp_inherents::{InherentData, InherentDataProvider};
71use sp_keyring::Sr25519Keyring;
72use sp_messenger::MessengerApi;
73use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
74use sp_mmr_primitives::MmrApi;
75use sp_runtime::generic::{Digest, SignedPayload};
76use sp_runtime::traits::{
77    BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor,
78};
79use sp_runtime::{generic, DigestItem, MultiAddress, OpaqueExtrinsic, SaturatedConversion};
80use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
81use sp_timestamp::Timestamp;
82use std::collections::HashMap;
83use std::error::Error;
84use std::marker::PhantomData;
85use std::pin::Pin;
86use std::sync::Arc;
87use std::time;
88use std::time::Duration;
89use subspace_core_primitives::pot::PotOutput;
90use subspace_core_primitives::solutions::Solution;
91use subspace_core_primitives::{BlockNumber, PublicKey};
92use subspace_runtime_primitives::opaque::Block;
93use subspace_runtime_primitives::{
94    AccountId, Balance, BlockHashFor, ExtrinsicFor, Hash, HeaderFor, Signature,
95};
96use subspace_service::{FullSelectChain, RuntimeExecutor};
97use subspace_test_client::{chain_spec, Backend, Client};
98use subspace_test_primitives::OnchainStateApi;
99use subspace_test_runtime::{
100    DisablePallets, Runtime, RuntimeApi, RuntimeCall, SignedExtra, UncheckedExtrinsic,
101    SLOT_DURATION,
102};
103use substrate_frame_rpc_system::AccountNonceApi;
104use substrate_test_client::{RpcHandlersExt, RpcTransactionError, RpcTransactionOutput};
105use tokio::time::sleep;
106
107type FraudProofFor<Block, DomainBlock> =
108    FraudProof<NumberFor<Block>, BlockHashFor<Block>, HeaderFor<DomainBlock>, H256>;
109
110const MAX_PRODUCE_BUNDLE_TRY: usize = 10;
111
112/// Create a Subspace `Configuration`.
113///
114/// By default an in-memory socket will be used, therefore you need to provide boot
115/// nodes if you want the future node to be connected to other nodes.
116#[expect(clippy::too_many_arguments)]
117pub fn node_config(
118    tokio_handle: tokio::runtime::Handle,
119    key: Sr25519Keyring,
120    boot_nodes: Vec<MultiaddrWithPeerId>,
121    run_farmer: bool,
122    force_authoring: bool,
123    force_synced: bool,
124    private_evm: bool,
125    evm_owner_account: Option<AccountId>,
126    base_path: BasePath,
127) -> Configuration {
128    let root = base_path.path();
129    let role = if run_farmer {
130        Role::Authority
131    } else {
132        Role::Full
133    };
134    let key_seed = key.to_seed();
135    let spec = chain_spec::subspace_local_testnet_config(private_evm, evm_owner_account).unwrap();
136
137    let mut network_config = NetworkConfiguration::new(
138        key_seed.to_string(),
139        "network/test/0.1",
140        Default::default(),
141        None,
142    );
143
144    network_config.boot_nodes = boot_nodes;
145
146    network_config.allow_non_globals_in_dht = true;
147
148    let addr: multiaddr::Multiaddr = multiaddr::Protocol::Memory(rand::random()).into();
149    network_config.listen_addresses.push(addr.clone());
150
151    network_config.public_addresses.push(addr);
152
153    network_config.transport = TransportConfig::MemoryOnly;
154
155    network_config.force_synced = force_synced;
156
157    Configuration {
158        impl_name: "subspace-test-node".to_string(),
159        impl_version: "0.1".to_string(),
160        role,
161        tokio_handle,
162        transaction_pool: Default::default(),
163        network: network_config,
164        keystore: KeystoreConfig::InMemory,
165        database: DatabaseSource::ParityDb {
166            path: root.join("paritydb"),
167        },
168        trie_cache_maximum_size: Some(64 * 1024 * 1024),
169        state_pruning: Default::default(),
170        blocks_pruning: BlocksPruning::KeepAll,
171        chain_spec: Box::new(spec),
172        executor: ExecutorConfiguration {
173            wasm_method: WasmExecutionMethod::Compiled {
174                instantiation_strategy: WasmtimeInstantiationStrategy::PoolingCopyOnWrite,
175            },
176            max_runtime_instances: 8,
177            default_heap_pages: None,
178            runtime_cache_size: 2,
179        },
180        wasm_runtime_overrides: Default::default(),
181        rpc: RpcConfiguration {
182            addr: None,
183            max_request_size: 0,
184            max_response_size: 0,
185            id_provider: None,
186            max_subs_per_conn: 0,
187            port: 0,
188            message_buffer_capacity: 0,
189            batch_config: RpcBatchRequestConfig::Disabled,
190            max_connections: 0,
191            cors: None,
192            methods: Default::default(),
193            rate_limit: None,
194            rate_limit_whitelisted_ips: vec![],
195            rate_limit_trust_proxy_headers: false,
196        },
197        prometheus_config: None,
198        telemetry_endpoints: None,
199        offchain_worker: OffchainWorkerConfig {
200            enabled: false,
201            indexing_enabled: true,
202        },
203        force_authoring,
204        disable_grandpa: false,
205        dev_key_seed: Some(key_seed),
206        tracing_targets: None,
207        tracing_receiver: Default::default(),
208        announce_block: true,
209        data_path: base_path.path().into(),
210        base_path,
211    }
212}
213
214type StorageChanges = sp_api::StorageChanges<Block>;
215
216struct MockExtensionsFactory<Client, DomainBlock, Executor, CBackend> {
217    consensus_client: Arc<Client>,
218    consensus_backend: Arc<CBackend>,
219    executor: Arc<Executor>,
220    mock_pot_verifier: Arc<MockPotVerfier>,
221    confirmation_depth_k: BlockNumber,
222    _phantom: PhantomData<DomainBlock>,
223}
224
225impl<Client, DomainBlock, Executor, CBackend>
226    MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
227{
228    fn new(
229        consensus_client: Arc<Client>,
230        executor: Arc<Executor>,
231        mock_pot_verifier: Arc<MockPotVerfier>,
232        consensus_backend: Arc<CBackend>,
233        confirmation_depth_k: BlockNumber,
234    ) -> Self {
235        Self {
236            consensus_client,
237            consensus_backend,
238            executor,
239            mock_pot_verifier,
240            confirmation_depth_k,
241            _phantom: Default::default(),
242        }
243    }
244}
245
246#[derive(Default)]
247struct MockPotVerfier(Mutex<HashMap<u64, PotOutput>>);
248
249impl MockPotVerfier {
250    fn is_valid(&self, slot: u64, pot: PotOutput) -> bool {
251        self.0.lock().get(&slot).map(|p| *p == pot).unwrap_or(false)
252    }
253
254    fn inject_pot(&self, slot: u64, pot: PotOutput) {
255        self.0.lock().insert(slot, pot);
256    }
257}
258
259impl<Block, Client, DomainBlock, Executor, CBackend> ExtensionsFactory<Block>
260    for MockExtensionsFactory<Client, DomainBlock, Executor, CBackend>
261where
262    Block: BlockT,
263    Block::Hash: From<H256> + Into<H256>,
264    DomainBlock: BlockT,
265    DomainBlock::Hash: Into<H256> + From<H256>,
266    Client: BlockBackend<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + 'static,
267    Client::Api: DomainsApi<Block, DomainBlock::Header>
268        + BundleProducerElectionApi<Block, Balance>
269        + MessengerApi<Block, NumberFor<Block>, Block::Hash>
270        + MmrApi<Block, H256, NumberFor<Block>>,
271    Executor: CodeExecutor + sc_executor::RuntimeVersionOf,
272    CBackend: BackendT<Block> + 'static,
273{
274    fn extensions_for(
275        &self,
276        _block_hash: Block::Hash,
277        _block_number: NumberFor<Block>,
278    ) -> Extensions {
279        let confirmation_depth_k = self.confirmation_depth_k;
280        let mut exts = Extensions::new();
281        exts.register(FraudProofExtension::new(Arc::new(
282            FraudProofHostFunctionsImpl::<_, _, DomainBlock, Executor, _>::new(
283                self.consensus_client.clone(),
284                self.executor.clone(),
285                move |client, executor| {
286                    let extension_factory =
287                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
288                            client,
289                            executor,
290                            confirmation_depth_k,
291                        );
292                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
293                },
294            ),
295        )));
296        exts.register(SubspaceMmrExtension::new(Arc::new(
297            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
298                self.consensus_client.clone(),
299                confirmation_depth_k,
300            ),
301        )));
302        exts.register(MessengerExtension::new(Arc::new(
303            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
304                self.consensus_client.clone(),
305                self.executor.clone(),
306            ),
307        )));
308
309        if let Some(offchain_storage) = self.consensus_backend.offchain_storage() {
310            let offchain_db = OffchainDb::new(offchain_storage);
311            exts.register(OffchainDbExt::new(offchain_db));
312        }
313        exts.register(PotExtension::new({
314            let client = Arc::clone(&self.consensus_client);
315            let mock_pot_verifier = Arc::clone(&self.mock_pot_verifier);
316            Box::new(
317                move |parent_hash, slot, proof_of_time, _quick_verification| {
318                    let parent_hash = {
319                        let mut converted_parent_hash = Block::Hash::default();
320                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
321                        converted_parent_hash
322                    };
323
324                    let parent_header = match client.header(parent_hash) {
325                        Ok(Some(parent_header)) => parent_header,
326                        _ => return false,
327                    };
328                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
329                        Ok(parent_pre_digest) => parent_pre_digest,
330                        _ => return false,
331                    };
332
333                    let parent_slot = parent_pre_digest.slot();
334                    if slot <= *parent_slot {
335                        return false;
336                    }
337
338                    mock_pot_verifier.is_valid(slot, proof_of_time)
339                },
340            )
341        }));
342        exts
343    }
344}
345
346type NewSlot = (Slot, PotOutput);
347
348/// A mock Subspace consensus node instance used for testing.
349pub struct MockConsensusNode {
350    /// `TaskManager`'s instance.
351    pub task_manager: TaskManager,
352    /// Client's instance.
353    pub client: Arc<Client>,
354    /// Backend.
355    pub backend: Arc<Backend>,
356    /// Code executor.
357    pub executor: RuntimeExecutor,
358    /// Transaction pool.
359    pub transaction_pool: Arc<BasicPool<FullChainApi<Client, Block>, Block>>,
360    /// The SelectChain Strategy
361    pub select_chain: FullSelectChain,
362    /// Network service.
363    pub network_service: Arc<dyn NetworkService + Send + Sync>,
364    /// Cross-domain gossip notification service.
365    pub xdm_gossip_notification_service: Option<Box<dyn NotificationService>>,
366    /// Sync service.
367    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
368    /// RPC handlers.
369    pub rpc_handlers: sc_service::RpcHandlers,
370    /// Network starter
371    pub network_starter: Option<NetworkStarter>,
372    /// The next slot number
373    next_slot: u64,
374    /// The mock pot verifier
375    mock_pot_verifier: Arc<MockPotVerfier>,
376    /// The slot notification subscribers
377    new_slot_notification_subscribers: Vec<mpsc::UnboundedSender<(Slot, PotOutput)>>,
378    /// The acknowledgement sender subscribers
379    acknowledgement_sender_subscribers: Vec<TracingUnboundedSender<mpsc::Sender<()>>>,
380    /// Block import pipeline
381    block_import: MockBlockImport<Client, Block>,
382    xdm_gossip_worker_builder: Option<GossipWorkerBuilder>,
383    /// Mock subspace solution used to mock the subspace `PreDigest`
384    mock_solution: Solution<AccountId>,
385    log_prefix: &'static str,
386    /// Ferdie key
387    pub key: Sr25519Keyring,
388    finalize_block_depth: Option<NumberFor<Block>>,
389    /// The node's base path
390    base_path: BasePath,
391}
392
393impl MockConsensusNode {
394    /// Run a mock consensus node
395    pub fn run(
396        tokio_handle: tokio::runtime::Handle,
397        key: Sr25519Keyring,
398        base_path: BasePath,
399    ) -> MockConsensusNode {
400        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, false, None)
401    }
402
403    /// Run a mock consensus node with a private EVM domain
404    pub fn run_with_private_evm(
405        tokio_handle: tokio::runtime::Handle,
406        key: Sr25519Keyring,
407        evm_owner: Option<Sr25519Keyring>,
408        base_path: BasePath,
409    ) -> MockConsensusNode {
410        Self::run_with_finalization_depth(tokio_handle, key, base_path, None, true, evm_owner)
411    }
412
413    /// Run a mock consensus node with finalization depth
414    pub fn run_with_finalization_depth(
415        tokio_handle: tokio::runtime::Handle,
416        key: Sr25519Keyring,
417        base_path: BasePath,
418        finalize_block_depth: Option<NumberFor<Block>>,
419        private_evm: bool,
420        evm_owner: Option<Sr25519Keyring>,
421    ) -> MockConsensusNode {
422        let log_prefix = key.into();
423
424        let mut config = node_config(
425            tokio_handle,
426            key,
427            vec![],
428            false,
429            false,
430            false,
431            private_evm,
432            evm_owner.map(|key| key.to_account_id()),
433            base_path.clone(),
434        );
435
436        // Set `transaction_pool.ban_time` to 0 such that duplicated tx will not immediately rejected
437        // by `TemporarilyBanned`
438        // rest of the options are taken from default
439        let tx_pool_options = Options {
440            ban_time: Duration::from_millis(0),
441            ..Default::default()
442        };
443
444        config.network.node_name = format!("{} (Consensus)", config.network.node_name);
445        let span = sc_tracing::tracing::info_span!(
446            sc_tracing::logging::PREFIX_LOG_SPAN,
447            name = config.network.node_name.as_str()
448        );
449        let _enter = span.enter();
450
451        let executor = sc_service::new_wasm_executor(&config.executor);
452
453        let (client, backend, keystore_container, mut task_manager) =
454            sc_service::new_full_parts::<Block, RuntimeApi, _>(&config, None, executor.clone())
455                .expect("Fail to new full parts");
456
457        let domain_executor = Arc::new(sc_service::new_wasm_executor(&config.executor));
458        let client = Arc::new(client);
459        let mock_pot_verifier = Arc::new(MockPotVerfier::default());
460        let chain_constants = client
461            .runtime_api()
462            .chain_constants(client.info().best_hash)
463            .expect("Fail to get chain constants");
464        client
465            .execution_extensions()
466            .set_extensions_factory(MockExtensionsFactory::<
467                _,
468                DomainBlock,
469                sc_domains::RuntimeExecutor,
470                _,
471            >::new(
472                client.clone(),
473                domain_executor.clone(),
474                Arc::clone(&mock_pot_verifier),
475                backend.clone(),
476                chain_constants.confirmation_depth_k(),
477            ));
478
479        let select_chain = sc_consensus::LongestChain::new(backend.clone());
480        let transaction_pool = Arc::from(BasicPool::new_full(
481            tx_pool_options,
482            config.role.is_authority().into(),
483            config.prometheus_registry(),
484            task_manager.spawn_essential_handle(),
485            client.clone(),
486        ));
487
488        let block_import = MockBlockImport::<_, _>::new(client.clone());
489
490        let mut net_config = sc_network::config::FullNetworkConfiguration::<
491            _,
492            _,
493            NetworkWorker<_, _>,
494        >::new(&config.network, None);
495        let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
496            xdm_gossip_peers_set_config();
497        net_config.add_notification_protocol(xdm_gossip_notification_config);
498
499        let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
500            sc_service::build_network(sc_service::BuildNetworkParams {
501                config: &config,
502                net_config,
503                client: client.clone(),
504                transaction_pool: transaction_pool.clone(),
505                spawn_handle: task_manager.spawn_handle(),
506                import_queue: mock_import_queue(
507                    block_import.clone(),
508                    &task_manager.spawn_essential_handle(),
509                ),
510                block_announce_validator_builder: None,
511                warp_sync_config: None,
512                block_relay: None,
513                metrics: NotificationMetrics::new(None),
514            })
515            .expect("Should be able to build network");
516
517        let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
518            network: network_service.clone(),
519            client: client.clone(),
520            keystore: keystore_container.keystore(),
521            task_manager: &mut task_manager,
522            transaction_pool: transaction_pool.clone(),
523            rpc_builder: Box::new(|_| Ok(RpcModule::new(()))),
524            backend: backend.clone(),
525            system_rpc_tx,
526            config,
527            telemetry: None,
528            tx_handler_controller,
529            sync_service: sync_service.clone(),
530        })
531        .expect("Should be able to spawn tasks");
532
533        let mock_solution =
534            Solution::genesis_solution(PublicKey::from(key.public().0), key.to_account_id());
535
536        let mut gossip_builder = GossipWorkerBuilder::new();
537
538        task_manager
539            .spawn_essential_handle()
540            .spawn_essential_blocking(
541                "consensus-chain-channel-update-worker",
542                None,
543                Box::pin(
544                    domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
545                        ChainId::Consensus,
546                        client.clone(),
547                        sync_service.clone(),
548                        gossip_builder.gossip_msg_sink(),
549                    ),
550                ),
551            );
552
553        let (consensus_msg_sink, consensus_msg_receiver) =
554            tracing_unbounded("consensus_message_channel", 100);
555
556        // Start cross domain message listener for Consensus chain to receive messages from domains in the network
557        let consensus_listener =
558            cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
559                ChainId::Consensus,
560                client.clone(),
561                client.clone(),
562                transaction_pool.clone(),
563                network_service.clone(),
564                consensus_msg_receiver,
565                domain_executor,
566                sync_service.clone(),
567            );
568
569        task_manager
570            .spawn_essential_handle()
571            .spawn_essential_blocking(
572                "consensus-message-listener",
573                None,
574                Box::pin(consensus_listener),
575            );
576
577        gossip_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
578
579        task_manager.spawn_essential_handle().spawn_blocking(
580            "mmr-gadget",
581            None,
582            mmr_gadget::MmrGadget::start(
583                client.clone(),
584                backend.clone(),
585                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
586            ),
587        );
588
589        MockConsensusNode {
590            task_manager,
591            client,
592            backend,
593            executor,
594            transaction_pool,
595            select_chain,
596            network_service,
597            xdm_gossip_notification_service: Some(xdm_gossip_notification_service),
598            sync_service,
599            rpc_handlers,
600            network_starter: Some(network_starter),
601            next_slot: 1,
602            mock_pot_verifier,
603            new_slot_notification_subscribers: Vec::new(),
604            acknowledgement_sender_subscribers: Vec::new(),
605            block_import,
606            xdm_gossip_worker_builder: Some(gossip_builder),
607            mock_solution,
608            log_prefix,
609            key,
610            finalize_block_depth,
611            base_path,
612        }
613    }
614
615    /// Start the mock consensus node network
616    pub fn start_network(&mut self) {
617        self.network_starter
618            .take()
619            .expect("mock consensus node network have not started yet")
620            .start_network();
621    }
622
623    /// Get the cross domain gossip message worker builder
624    pub fn xdm_gossip_worker_builder(&mut self) -> &mut GossipWorkerBuilder {
625        self.xdm_gossip_worker_builder
626            .as_mut()
627            .expect("gossip message worker have not started yet")
628    }
629
630    /// Start the cross domain gossip message worker.
631    pub fn start_cross_domain_gossip_message_worker(&mut self) {
632        let xdm_gossip_worker_builder = self
633            .xdm_gossip_worker_builder
634            .take()
635            .expect("gossip message worker have not started yet");
636        let cross_domain_message_gossip_worker = xdm_gossip_worker_builder.build::<Block, _, _>(
637            self.network_service.clone(),
638            self.xdm_gossip_notification_service
639                .take()
640                .expect("XDM gossip notification service must be used only once"),
641            self.sync_service.clone(),
642        );
643        self.task_manager
644            .spawn_essential_handle()
645            .spawn_essential_blocking(
646                "cross-domain-gossip-message-worker",
647                None,
648                Box::pin(cross_domain_message_gossip_worker.run()),
649            );
650    }
651
652    /// Return the next slot number
653    pub fn next_slot(&self) -> u64 {
654        self.next_slot
655    }
656
657    /// Set the next slot number
658    pub fn set_next_slot(&mut self, next_slot: u64) {
659        self.next_slot = next_slot;
660    }
661
662    /// Produce a slot only, without waiting for the potential slot handlers.
663    pub fn produce_slot(&mut self) -> NewSlot {
664        let slot = Slot::from(self.next_slot);
665        let proof_of_time = PotOutput::from(
666            <&[u8] as TryInto<[u8; 16]>>::try_into(&Hash::random().to_fixed_bytes()[..16])
667                .expect("slice with length of 16 must able convert into [u8; 16]; qed"),
668        );
669        self.mock_pot_verifier.inject_pot(*slot, proof_of_time);
670        self.next_slot += 1;
671
672        (slot, proof_of_time)
673    }
674
675    /// Notify the executor about the new slot and wait for the bundle produced at this slot.
676    pub async fn notify_new_slot_and_wait_for_bundle(
677        &mut self,
678        new_slot: NewSlot,
679    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
680        self.new_slot_notification_subscribers
681            .retain(|subscriber| subscriber.unbounded_send(new_slot).is_ok());
682
683        self.confirm_acknowledgement().await;
684        self.get_bundle_from_tx_pool(new_slot)
685    }
686
687    /// Produce a new slot and wait for a bundle produced at this slot.
688    pub async fn produce_slot_and_wait_for_bundle_submission(
689        &mut self,
690    ) -> (
691        NewSlot,
692        OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>,
693    ) {
694        let slot = self.produce_slot();
695        for _ in 0..MAX_PRODUCE_BUNDLE_TRY {
696            if let Some(bundle) = self.notify_new_slot_and_wait_for_bundle(slot).await {
697                return (slot, bundle);
698            }
699        }
700        panic!("Failed to produce bundle after {MAX_PRODUCE_BUNDLE_TRY:?} tries, something must be wrong");
701    }
702
703    /// Subscribe the new slot notification
704    pub fn new_slot_notification_stream(&mut self) -> mpsc::UnboundedReceiver<(Slot, PotOutput)> {
705        let (tx, rx) = mpsc::unbounded();
706        self.new_slot_notification_subscribers.push(tx);
707        rx
708    }
709
710    /// Subscribe the acknowledgement sender stream
711    pub fn new_acknowledgement_sender_stream(
712        &mut self,
713    ) -> TracingUnboundedReceiver<mpsc::Sender<()>> {
714        let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100);
715        self.acknowledgement_sender_subscribers.push(tx);
716        rx
717    }
718
719    /// Wait for all the acknowledgements before return
720    ///
721    /// It is used to wait for the acknowledgement of the domain worker to ensure it have
722    /// finish all the previous tasks before return
723    pub async fn confirm_acknowledgement(&mut self) {
724        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
725
726        // Must drop `acknowledgement_sender` after the notification otherwise the receiver
727        // will block forever as there is still a sender not closed.
728        {
729            self.acknowledgement_sender_subscribers
730                .retain(|subscriber| {
731                    subscriber
732                        .unbounded_send(acknowledgement_sender.clone())
733                        .is_ok()
734                });
735            drop(acknowledgement_sender);
736        }
737
738        // Wait for all the acknowledgements to progress and proactively drop closed subscribers.
739        while acknowledgement_receiver.next().await.is_some() {
740            // Wait for all the acknowledgements to finish.
741        }
742    }
743
744    /// Wait for the operator finish processing the consensus block before return
745    pub async fn confirm_block_import_processed(&mut self) {
746        // Send one more notification to ensure the previous consensus block import notificaion
747        // have received by the operator
748        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
749        {
750            // Must drop `block_import_acknowledgement_sender` after the notification otherwise
751            // the receiver will block forever as there is still a sender not closed.
752            // NOTE: it is okay to use the default block number since it is ignored in the consumer side.
753            let value = (NumberFor::<Block>::default(), acknowledgement_sender);
754            self.block_import
755                .block_importing_notification_subscribers
756                .lock()
757                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
758        }
759        while acknowledgement_receiver.next().await.is_some() {
760            // Wait for all the acknowledgements to finish.
761        }
762
763        // Ensure the operator finish processing the consensus block
764        self.confirm_acknowledgement().await;
765    }
766
767    /// Subscribe the block importing notification
768    pub fn block_importing_notification_stream(
769        &self,
770    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
771        self.block_import.block_importing_notification_stream()
772    }
773
774    /// Get the bundle that created at `slot` from the transaction pool
775    pub fn get_bundle_from_tx_pool(
776        &self,
777        new_slot: NewSlot,
778    ) -> Option<OpaqueBundle<NumberFor<Block>, Hash, DomainHeader, Balance>> {
779        for ready_tx in self.transaction_pool.ready() {
780            let ext = UncheckedExtrinsic::decode(&mut ready_tx.data.encode().as_slice())
781                .expect("should be able to decode");
782            if let RuntimeCall::Domains(pallet_domains::Call::submit_bundle { opaque_bundle }) =
783                ext.function
784            {
785                if opaque_bundle.sealed_header.slot_number() == *new_slot.0 {
786                    return Some(opaque_bundle);
787                }
788            }
789        }
790        None
791    }
792
793    /// Submit a tx to the tx pool
794    pub async fn submit_transaction(&self, tx: OpaqueExtrinsic) -> Result<H256, TxPoolError> {
795        self.transaction_pool
796            .submit_one(
797                self.client.info().best_hash,
798                TransactionSource::External,
799                tx,
800            )
801            .await
802            .map_err(|err| {
803                err.into_pool_error()
804                    .expect("should always be a pool error")
805            })
806    }
807
808    /// Remove all tx from the tx pool
809    pub async fn clear_tx_pool(&self) -> Result<(), Box<dyn Error>> {
810        let txs: Vec<_> = self
811            .transaction_pool
812            .ready()
813            .map(|t| self.transaction_pool.hash_of(&t.data))
814            .collect();
815        self.prune_txs_from_pool(txs.as_slice()).await
816    }
817
818    /// Remove a ready transaction from transaction pool.
819    pub async fn prune_tx_from_pool(&self, tx: &OpaqueExtrinsic) -> Result<(), Box<dyn Error>> {
820        self.prune_txs_from_pool(&[self.transaction_pool.hash_of(tx)])
821            .await
822    }
823
824    async fn prune_txs_from_pool(
825        &self,
826        tx_hashes: &[BlockHashFor<Block>],
827    ) -> Result<(), Box<dyn Error>> {
828        let hash_and_number = HashAndNumber {
829            number: self.client.info().best_number,
830            hash: self.client.info().best_hash,
831        };
832        self.transaction_pool
833            .pool()
834            .prune_known(&hash_and_number, tx_hashes);
835        // `ban_time` have set to 0, explicitly wait 1ms here to ensure `clear_stale` will remove
836        // all the bans as the ban time must be passed.
837        tokio::time::sleep(time::Duration::from_millis(1)).await;
838        self.transaction_pool
839            .pool()
840            .validated_pool()
841            .clear_stale(&hash_and_number);
842        Ok(())
843    }
844
845    /// Return if the given ER exist in the consensus state
846    pub fn does_receipt_exist(
847        &self,
848        er_hash: BlockHashFor<DomainBlock>,
849    ) -> Result<bool, Box<dyn Error>> {
850        Ok(self
851            .client
852            .runtime_api()
853            .execution_receipt(self.client.info().best_hash, er_hash)?
854            .is_some())
855    }
856
857    /// Return a future that only resolve if a fraud proof that the given `fraud_proof_predicate`
858    /// return true is submitted to the consensus tx pool
859    pub fn wait_for_fraud_proof<FP>(
860        &self,
861        fraud_proof_predicate: FP,
862    ) -> Pin<Box<dyn Future<Output = ()> + Send>>
863    where
864        FP: Fn(&FraudProofFor<Block, DomainBlock>) -> bool + Send + 'static,
865    {
866        let tx_pool = self.transaction_pool.clone();
867        let mut import_tx_stream = self.transaction_pool.import_notification_stream();
868        Box::pin(async move {
869            while let Some(ready_tx_hash) = import_tx_stream.next().await {
870                let ready_tx = tx_pool
871                    .ready_transaction(&ready_tx_hash)
872                    .expect("Just get the ready tx hash from import stream; qed");
873                let ext = subspace_test_runtime::UncheckedExtrinsic::decode(
874                    &mut ready_tx.data.encode().as_slice(),
875                )
876                .expect("Decode tx must success");
877                if let subspace_test_runtime::RuntimeCall::Domains(
878                    pallet_domains::Call::submit_fraud_proof { fraud_proof },
879                ) = ext.function
880                {
881                    if fraud_proof_predicate(&fraud_proof) {
882                        break;
883                    }
884                }
885            }
886        })
887    }
888
889    /// Get the free balance of the given account
890    pub fn free_balance(&self, account_id: AccountId) -> subspace_runtime_primitives::Balance {
891        self.client
892            .runtime_api()
893            .free_balance(self.client.info().best_hash, account_id)
894            .expect("Fail to get account free balance")
895    }
896
897    /// Give the peer at `addr` the minimum reputation, which will ban it.
898    // TODO: also ban/unban in the DSN
899    pub fn ban_peer(&self, addr: MultiaddrWithPeerId) {
900        // If unban_peer() has been called on the peer, we need to bump it twice
901        // to give it the minimal reputation.
902        self.network_service.report_peer(
903            addr.peer_id,
904            ReputationChange::new_fatal("Peer banned by test (1)"),
905        );
906        self.network_service.report_peer(
907            addr.peer_id,
908            ReputationChange::new_fatal("Peer banned by test (2)"),
909        );
910    }
911
912    /// Give the peer at `addr` a high reputation, which guarantees it is un-banned it.
913    pub fn unban_peer(&self, addr: MultiaddrWithPeerId) {
914        // If ReputationChange::new_fatal() has been called on the peer, we need to bump it twice
915        // to give it a positive reputation.
916        self.network_service.report_peer(
917            addr.peer_id,
918            ReputationChange::new(i32::MAX, "Peer unbanned by test (1)"),
919        );
920        self.network_service.report_peer(
921            addr.peer_id,
922            ReputationChange::new(i32::MAX, "Peer unbanned by test (2)"),
923        );
924    }
925
926    /// Take and stop the `MockConsensusNode` and delete its database lock file.
927    ///
928    /// Stopping and restarting a node can cause weird race conditions, with errors like:
929    /// "The system cannot find the path specified".
930    /// If this happens, try increasing the wait time in this method.
931    pub async fn stop(self) -> Result<(), std::io::Error> {
932        let lock_file_path = self.base_path.path().join("paritydb").join("lock");
933        // On Windows, sometimes open files can’t be deleted so `drop` first then delete
934        std::mem::drop(self);
935
936        // Give the node time to cleanup, exit, and release the lock file.
937        // TODO: fix the underlying issue or wait for the actual shutdown instead
938        sleep(Duration::from_secs(2)).await;
939
940        // The lock file already being deleted is not a fatal test error, so just log it
941        if let Err(err) = std::fs::remove_file(lock_file_path) {
942            tracing::error!("deleting paritydb lock file failed: {err:?}");
943        }
944        Ok(())
945    }
946}
947
948impl MockConsensusNode {
949    async fn collect_txn_from_pool(&self, parent_hash: Hash) -> Vec<ExtrinsicFor<Block>> {
950        self.transaction_pool
951            .ready_at(parent_hash)
952            .await
953            .map(|pending_tx| pending_tx.data().as_ref().clone())
954            .collect()
955    }
956
957    async fn mock_inherent_data(slot: Slot) -> Result<InherentData, Box<dyn Error>> {
958        let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new(
959            <Slot as Into<u64>>::into(slot) * SLOT_DURATION,
960        ));
961        let subspace_inherents =
962            sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]);
963
964        let inherent_data = (subspace_inherents, timestamp)
965            .create_inherent_data()
966            .await?;
967
968        Ok(inherent_data)
969    }
970
971    fn mock_subspace_digest(&self, slot: Slot) -> Digest {
972        let pre_digest: PreDigest<AccountId> = PreDigest::V0 {
973            slot,
974            solution: self.mock_solution.clone(),
975            pot_info: PreDigestPotInfo::V0 {
976                proof_of_time: Default::default(),
977                future_proof_of_time: Default::default(),
978            },
979        };
980        let mut digest = Digest::default();
981        digest.push(DigestItem::subspace_pre_digest(&pre_digest));
982        digest
983    }
984
985    /// Build block
986    async fn build_block(
987        &self,
988        slot: Slot,
989        parent_hash: BlockHashFor<Block>,
990        extrinsics: Vec<ExtrinsicFor<Block>>,
991    ) -> Result<(Block, StorageChanges), Box<dyn Error>> {
992        let inherent_digest = self.mock_subspace_digest(slot);
993
994        let inherent_data = Self::mock_inherent_data(slot).await?;
995
996        let mut block_builder = BlockBuilderBuilder::new(self.client.as_ref())
997            .on_parent_block(parent_hash)
998            .fetch_parent_block_number(self.client.as_ref())?
999            .with_inherent_digests(inherent_digest)
1000            .build()
1001            .expect("Creates new block builder");
1002
1003        let inherent_txns = block_builder.create_inherents(inherent_data)?;
1004
1005        for tx in inherent_txns.into_iter().chain(extrinsics) {
1006            if let Err(err) = sc_block_builder::BlockBuilder::push(&mut block_builder, tx) {
1007                tracing::error!("Invalid transaction while building block: {}", err);
1008            }
1009        }
1010
1011        let (block, storage_changes, _) = block_builder.build()?.into_inner();
1012        Ok((block, storage_changes))
1013    }
1014
1015    /// Import block
1016    async fn import_block(
1017        &self,
1018        block: Block,
1019        storage_changes: Option<StorageChanges>,
1020    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1021        let (header, body) = block.deconstruct();
1022
1023        let header_hash = header.hash();
1024        let header_number = header.number;
1025
1026        let block_import_params = {
1027            let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
1028            import_block.body = Some(body);
1029            import_block.state_action = match storage_changes {
1030                Some(changes) => {
1031                    StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes))
1032                }
1033                None => StateAction::Execute,
1034            };
1035            import_block
1036        };
1037
1038        let import_result = self.block_import.import_block(block_import_params).await?;
1039
1040        if let Some(finalized_block_hash) = self
1041            .finalize_block_depth
1042            .and_then(|depth| header_number.checked_sub(depth))
1043            .and_then(|block_to_finalize| {
1044                self.client
1045                    .hash(block_to_finalize)
1046                    .expect("Block hash not found for number: {block_to_finalize:?}")
1047            })
1048        {
1049            self.client
1050                .finalize_block(finalized_block_hash, None, true)
1051                .unwrap();
1052        }
1053
1054        match import_result {
1055            ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(header_hash),
1056            bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()),
1057        }
1058    }
1059
1060    /// Produce a new block with the slot on top of `parent_hash`, with optional
1061    /// specified extrinsic list.
1062    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1063    pub async fn produce_block_with_slot_at(
1064        &mut self,
1065        new_slot: NewSlot,
1066        parent_hash: BlockHashFor<Block>,
1067        maybe_extrinsics: Option<Vec<ExtrinsicFor<Block>>>,
1068    ) -> Result<BlockHashFor<Block>, Box<dyn Error>> {
1069        let block_timer = time::Instant::now();
1070
1071        let extrinsics = match maybe_extrinsics {
1072            Some(extrinsics) => extrinsics,
1073            None => self.collect_txn_from_pool(parent_hash).await,
1074        };
1075        let tx_hashes: Vec<_> = extrinsics
1076            .iter()
1077            .map(|t| self.transaction_pool.hash_of(t))
1078            .collect();
1079
1080        let (block, storage_changes) = self
1081            .build_block(new_slot.0, parent_hash, extrinsics)
1082            .await?;
1083
1084        log_new_block(&block, block_timer.elapsed().as_millis());
1085
1086        let res = match self.import_block(block, Some(storage_changes)).await {
1087            Ok(hash) => {
1088                // Remove the tx of the imported block from the tx pool incase re-include them
1089                // in the future block by accident.
1090                self.prune_txs_from_pool(tx_hashes.as_slice()).await?;
1091                Ok(hash)
1092            }
1093            err => err,
1094        };
1095        self.confirm_block_import_processed().await;
1096        res
1097    }
1098
1099    /// Produce a new block on top of the current best block, with the extrinsics collected from
1100    /// the transaction pool.
1101    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1102    pub async fn produce_block_with_slot(&mut self, slot: NewSlot) -> Result<(), Box<dyn Error>> {
1103        self.produce_block_with_slot_at(slot, self.client.info().best_hash, None)
1104            .await?;
1105        Ok(())
1106    }
1107
1108    /// Produce a new block on top of the current best block, with the specificed extrinsics.
1109    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1110    pub async fn produce_block_with_extrinsics(
1111        &mut self,
1112        extrinsics: Vec<ExtrinsicFor<Block>>,
1113    ) -> Result<(), Box<dyn Error>> {
1114        let slot = self.produce_slot();
1115        self.produce_block_with_slot_at(slot, self.client.info().best_hash, Some(extrinsics))
1116            .await?;
1117        Ok(())
1118    }
1119
1120    /// Produce `n` number of blocks.
1121    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1122    pub async fn produce_blocks(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1123        for _ in 0..n {
1124            let slot = self.produce_slot();
1125            self.produce_block_with_slot(slot).await?;
1126        }
1127        Ok(())
1128    }
1129
1130    /// Produce `n` number of blocks and wait for bundle submitted to the block.
1131    #[sc_tracing::logging::prefix_logs_with(self.log_prefix)]
1132    pub async fn produce_blocks_with_bundles(&mut self, n: u64) -> Result<(), Box<dyn Error>> {
1133        for _ in 0..n {
1134            let (slot, _) = self.produce_slot_and_wait_for_bundle_submission().await;
1135            self.produce_block_with_slot(slot).await?;
1136        }
1137        Ok(())
1138    }
1139
1140    /// Get the nonce of the node account
1141    pub fn account_nonce(&self) -> u32 {
1142        self.client
1143            .runtime_api()
1144            .account_nonce(self.client.info().best_hash, self.key.to_account_id())
1145            .expect("Fail to get account nonce")
1146    }
1147
1148    /// Get the nonce of the given account
1149    pub fn account_nonce_of(&self, account_id: AccountId) -> u32 {
1150        self.client
1151            .runtime_api()
1152            .account_nonce(self.client.info().best_hash, account_id)
1153            .expect("Fail to get account nonce")
1154    }
1155
1156    /// Construct an extrinsic.
1157    pub fn construct_extrinsic(
1158        &self,
1159        nonce: u32,
1160        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1161    ) -> UncheckedExtrinsic {
1162        construct_extrinsic_generic(&self.client, function, self.key, false, nonce, 0)
1163    }
1164
1165    /// Construct an unsigned general extrinsic.
1166    pub fn construct_unsigned_extrinsic(
1167        &self,
1168        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1169    ) -> UncheckedExtrinsic {
1170        construct_unsigned_extrinsic(&self.client, function)
1171    }
1172
1173    /// Construct and send extrinsic through rpc
1174    pub async fn construct_and_send_extrinsic_with(
1175        &self,
1176        function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1177    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1178        let nonce = self.account_nonce();
1179        let extrinsic = self.construct_extrinsic(nonce, function);
1180        self.rpc_handlers.send_transaction(extrinsic.into()).await
1181    }
1182
1183    /// Get the nonce of the given account
1184    pub async fn send_extrinsic(
1185        &self,
1186        extrinsic: impl Into<OpaqueExtrinsic>,
1187    ) -> Result<RpcTransactionOutput, RpcTransactionError> {
1188        self.rpc_handlers.send_transaction(extrinsic.into()).await
1189    }
1190}
1191
1192fn log_new_block(block: &Block, used_time_ms: u128) {
1193    tracing::info!(
1194        "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
1195        block.header().number(),
1196        used_time_ms,
1197        block.header().hash(),
1198        block.header().parent_hash(),
1199        block.extrinsics().len(),
1200        block.extrinsics()
1201            .iter()
1202            .map(|xt| BlakeTwo256::hash_of(xt).to_string())
1203            .collect::<Vec<_>>()
1204            .join(", ")
1205    );
1206}
1207
1208fn mock_import_queue<Block: BlockT, I>(
1209    block_import: I,
1210    spawner: &impl SpawnEssentialNamed,
1211) -> BasicQueue<Block>
1212where
1213    I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1214{
1215    BasicQueue::new(
1216        MockVerifier::default(),
1217        Box::new(block_import),
1218        None,
1219        spawner,
1220        None,
1221    )
1222}
1223
1224struct MockVerifier<Block> {
1225    _marker: PhantomData<Block>,
1226}
1227
1228impl<Block> Default for MockVerifier<Block> {
1229    fn default() -> Self {
1230        Self {
1231            _marker: PhantomData,
1232        }
1233    }
1234}
1235
1236#[async_trait::async_trait]
1237impl<Block> VerifierT<Block> for MockVerifier<Block>
1238where
1239    Block: BlockT,
1240{
1241    async fn verify(
1242        &self,
1243        block_params: BlockImportParams<Block>,
1244    ) -> Result<BlockImportParams<Block>, String> {
1245        Ok(block_params)
1246    }
1247}
1248
1249// `MockBlockImport` is mostly port from `sc-consensus-subspace::SubspaceBlockImport` with all
1250// the consensus related logic removed.
1251#[allow(clippy::type_complexity)]
1252struct MockBlockImport<Client, Block: BlockT> {
1253    inner: Arc<Client>,
1254    block_importing_notification_subscribers:
1255        Arc<Mutex<Vec<TracingUnboundedSender<(NumberFor<Block>, mpsc::Sender<()>)>>>>,
1256}
1257
1258impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1259    fn new(inner: Arc<Client>) -> Self {
1260        MockBlockImport {
1261            inner,
1262            block_importing_notification_subscribers: Arc::new(Mutex::new(Vec::new())),
1263        }
1264    }
1265
1266    // Subscribe the block importing notification
1267    fn block_importing_notification_stream(
1268        &self,
1269    ) -> TracingUnboundedReceiver<(NumberFor<Block>, mpsc::Sender<()>)> {
1270        let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100);
1271        self.block_importing_notification_subscribers
1272            .lock()
1273            .push(tx);
1274        rx
1275    }
1276}
1277
1278impl<Client, Block: BlockT> MockBlockImport<Client, Block> {
1279    fn clone(&self) -> Self {
1280        MockBlockImport {
1281            inner: self.inner.clone(),
1282            block_importing_notification_subscribers: self
1283                .block_importing_notification_subscribers
1284                .clone(),
1285        }
1286    }
1287}
1288
1289#[async_trait::async_trait]
1290impl<Client, Block> BlockImport<Block> for MockBlockImport<Client, Block>
1291where
1292    Block: BlockT,
1293    for<'r> &'r Client: BlockImport<Block, Error = ConsensusError> + Send + Sync,
1294    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
1295    Client::Api: ApiExt<Block>,
1296{
1297    type Error = ConsensusError;
1298
1299    async fn import_block(
1300        &self,
1301        mut block: BlockImportParams<Block>,
1302    ) -> Result<ImportResult, Self::Error> {
1303        let block_number = *block.header.number();
1304        block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
1305
1306        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
1307
1308        // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver
1309        // will block forever as there is still a sender not closed.
1310        {
1311            let value = (block_number, acknowledgement_sender);
1312            self.block_importing_notification_subscribers
1313                .lock()
1314                .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
1315        }
1316
1317        while acknowledgement_receiver.next().await.is_some() {
1318            // Wait for all the acknowledgements to finish.
1319        }
1320
1321        self.inner.import_block(block).await
1322    }
1323
1324    async fn check_block(
1325        &self,
1326        block: BlockCheckParams<Block>,
1327    ) -> Result<ImportResult, Self::Error> {
1328        self.inner.check_block(block).await
1329    }
1330}
1331
1332/// Produce the given number of blocks for both the primary node and the domain nodes
1333#[macro_export]
1334macro_rules! produce_blocks {
1335    ($primary_node:ident, $operator_node:ident, $count: literal $(, $domain_node:ident)*) => {
1336        {
1337            async {
1338                let domain_fut = {
1339                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1340                    futs.push(Box::pin($operator_node.wait_for_blocks($count)));
1341                    $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )*
1342                    futures::future::join_all(futs)
1343                };
1344                $primary_node.produce_blocks_with_bundles($count).await?;
1345                domain_fut.await;
1346                Ok::<(), Box<dyn std::error::Error>>(())
1347            }
1348        }
1349    };
1350}
1351
1352/// Producing one block for both the primary node and the domain nodes, where the primary node can
1353/// use the `produce_block_with_xxx` function (i.e. `produce_block_with_slot`) to produce block
1354#[macro_export]
1355macro_rules! produce_block_with {
1356    ($primary_node_produce_block:expr, $operator_node:ident $(, $domain_node:ident)*) => {
1357        {
1358            async {
1359                let domain_fut = {
1360                    let mut futs: Vec<std::pin::Pin<Box<dyn futures::Future<Output = ()>>>> = Vec::new();
1361                    futs.push(Box::pin($operator_node.wait_for_blocks(1)));
1362                    $( futs.push( Box::pin( $domain_node.wait_for_blocks(1) ) ); )*
1363                    futures::future::join_all(futs)
1364                };
1365                $primary_node_produce_block.await?;
1366                domain_fut.await;
1367                Ok::<(), Box<dyn std::error::Error>>(())
1368            }
1369        }
1370    };
1371}
1372
1373/// Keep producing block with a fixed interval until the given condition become `true`
1374#[macro_export]
1375macro_rules! produce_blocks_until {
1376    ($primary_node:ident, $operator_node:ident, $condition: block $(, $domain_node:ident)*) => {
1377        async {
1378            while !$condition {
1379                produce_blocks!($primary_node, $operator_node, 1 $(, $domain_node),*).await?;
1380                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1381            }
1382            Ok::<(), Box<dyn std::error::Error>>(())
1383        }
1384    };
1385}
1386
1387type BalanceOf<T> = <<T as pallet_transaction_payment::Config>::OnChargeTransaction as pallet_transaction_payment::OnChargeTransaction<T>>::Balance;
1388
1389fn get_signed_extra(
1390    current_block: u64,
1391    immortal: bool,
1392    nonce: u32,
1393    tip: BalanceOf<Runtime>,
1394) -> SignedExtra {
1395    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
1396        .checked_next_power_of_two()
1397        .map(|c| c / 2)
1398        .unwrap_or(2);
1399    (
1400        frame_system::CheckNonZeroSender::<Runtime>::new(),
1401        frame_system::CheckSpecVersion::<Runtime>::new(),
1402        frame_system::CheckTxVersion::<Runtime>::new(),
1403        frame_system::CheckGenesis::<Runtime>::new(),
1404        frame_system::CheckMortality::<Runtime>::from(if immortal {
1405            generic::Era::Immortal
1406        } else {
1407            generic::Era::mortal(period, current_block)
1408        }),
1409        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
1410        frame_system::CheckWeight::<Runtime>::new(),
1411        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(tip),
1412        DisablePallets,
1413        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
1414        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
1415        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
1416    )
1417}
1418
1419fn construct_extrinsic_raw_payload<Client>(
1420    client: impl AsRef<Client>,
1421    function: <Runtime as frame_system::Config>::RuntimeCall,
1422    immortal: bool,
1423    nonce: u32,
1424    tip: BalanceOf<Runtime>,
1425) -> (
1426    SignedPayload<<Runtime as frame_system::Config>::RuntimeCall, SignedExtra>,
1427    SignedExtra,
1428)
1429where
1430    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1431    u64: From<BlockNumberFor<Runtime>>,
1432    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1433{
1434    let current_block_hash = client.as_ref().info().best_hash;
1435    let current_block = client.as_ref().info().best_number.saturated_into();
1436    let genesis_block = client.as_ref().hash(0).unwrap().unwrap();
1437    let extra = get_signed_extra(current_block, immortal, nonce, tip);
1438    (
1439        generic::SignedPayload::<
1440            <Runtime as frame_system::Config>::RuntimeCall,
1441            SignedExtra,
1442        >::from_raw(
1443            function,
1444            extra.clone(),
1445            ((), 100, 1, genesis_block, current_block_hash, (), (), (), (), (), (),()),
1446        ),
1447        extra,
1448    )
1449}
1450
1451/// Construct an extrinsic that can be applied to the test runtime.
1452pub fn construct_extrinsic_generic<Client>(
1453    client: impl AsRef<Client>,
1454    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1455    caller: Sr25519Keyring,
1456    immortal: bool,
1457    nonce: u32,
1458    tip: BalanceOf<Runtime>,
1459) -> UncheckedExtrinsic
1460where
1461    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1462    u64: From<BlockNumberFor<Runtime>>,
1463    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1464{
1465    let function = function.into();
1466    let (raw_payload, extra) =
1467        construct_extrinsic_raw_payload(client, function.clone(), immortal, nonce, tip);
1468    let signature = raw_payload.using_encoded(|e| caller.sign(e));
1469    UncheckedExtrinsic::new_signed(
1470        function,
1471        MultiAddress::Id(caller.to_account_id()),
1472        Signature::Sr25519(signature),
1473        extra,
1474    )
1475}
1476
1477/// Construct a general unsigned extrinsic that can be applied to the test runtime.
1478fn construct_unsigned_extrinsic<Client>(
1479    client: impl AsRef<Client>,
1480    function: impl Into<<Runtime as frame_system::Config>::RuntimeCall>,
1481) -> UncheckedExtrinsic
1482where
1483    BalanceOf<Runtime>: Send + Sync + From<u64> + sp_runtime::FixedPointOperand,
1484    u64: From<BlockNumberFor<Runtime>>,
1485    Client: HeaderBackend<subspace_runtime_primitives::opaque::Block>,
1486{
1487    let function = function.into();
1488    let current_block = client.as_ref().info().best_number.saturated_into();
1489    let extra = get_signed_extra(current_block, true, 0, 0);
1490    UncheckedExtrinsic::new_transaction(function, extra)
1491}