domain_service/
domain.rs

1use crate::network::build_network;
2use crate::providers::{BlockImportProvider, RpcProvider};
3use crate::{FullBackend, FullClient};
4use cross_domain_message_gossip::ChainMsg;
5use domain_block_builder::CustomGenesisBlockBuilder;
6use domain_block_preprocessor::inherents::CreateInherentDataProvider;
7use domain_client_message_relayer::GossipMessageSink;
8use domain_client_operator::snap_sync::ConsensusChainSyncParams;
9use domain_client_operator::{Operator, OperatorParams, OperatorStreams};
10use domain_runtime_primitives::opaque::{Block, Header};
11use domain_runtime_primitives::{Balance, Hash};
12use futures::Stream;
13use futures::channel::mpsc;
14use pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi;
15use sc_client_api::{
16    AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider,
17    ProofProvider,
18};
19use sc_consensus::{BasicQueue, BoxBlockImport};
20use sc_domains::{ExtensionsFactory, RuntimeExecutor};
21use sc_network::service::traits::NetworkService;
22use sc_network::{NetworkPeers, NetworkWorker, NotificationMetrics};
23use sc_service::{
24    BuildNetworkParams, Configuration as ServiceConfiguration, PartialComponents, SpawnTasksParams,
25    TFullBackend, TaskManager,
26};
27use sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle};
28use sc_transaction_pool::{BasicPool, FullChainApi};
29use sc_transaction_pool_api::OffchainTransactionPoolFactory;
30use sc_utils::mpsc::{TracingUnboundedReceiver, tracing_unbounded};
31use serde::de::DeserializeOwned;
32use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
33use sp_block_builder::BlockBuilder;
34use sp_blockchain::{HeaderBackend, HeaderMetadata};
35use sp_consensus::SyncOracle;
36use sp_consensus_slots::Slot;
37use sp_core::traits::SpawnEssentialNamed;
38use sp_core::{Decode, Encode, H256};
39use sp_domains::core_api::DomainCoreApi;
40use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId};
41use sp_domains_fraud_proof::FraudProofApi;
42use sp_messenger::messages::ChainId;
43use sp_messenger::{MessengerApi, RelayerApi};
44use sp_mmr_primitives::MmrApi;
45use sp_offchain::OffchainWorkerApi;
46use sp_runtime::traits::{Block as BlockT, NumberFor};
47use sp_session::SessionKeys;
48use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
49use std::fmt::{Debug, Display};
50use std::marker::PhantomData;
51use std::str::FromStr;
52use std::sync::Arc;
53use subspace_core_primitives::pot::PotOutput;
54use subspace_runtime_primitives::{HeaderFor, Nonce};
55use substrate_frame_rpc_system::AccountNonceApi;
56
57pub type DomainOperator<Block, CBlock, CClient, RuntimeApi> = Operator<
58    Block,
59    CBlock,
60    FullClient<Block, RuntimeApi>,
61    CClient,
62    FullPool<RuntimeApi>,
63    FullBackend<Block>,
64    RuntimeExecutor,
65>;
66
67/// Domain full node along with some other components.
68pub struct NewFull<C, CodeExecutor, CBlock, CClient, RuntimeApi, AccountId>
69where
70    Block: BlockT,
71    CBlock: BlockT,
72    NumberFor<CBlock>: From<NumberFor<Block>>,
73    CBlock::Hash: From<Hash>,
74    CClient: HeaderBackend<CBlock>
75        + BlockBackend<CBlock>
76        + ProvideRuntimeApi<CBlock>
77        + Send
78        + Sync
79        + 'static,
80    CClient::Api:
81        DomainsApi<CBlock, Header> + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>,
82    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
83    RuntimeApi::RuntimeApi: ApiExt<Block>
84        + Metadata<Block>
85        + AccountNonceApi<Block, AccountId, Nonce>
86        + BlockBuilder<Block>
87        + OffchainWorkerApi<Block>
88        + SessionKeys<Block>
89        + TaggedTransactionQueue<Block>
90        + TransactionPaymentRuntimeApi<Block, Balance>
91        + DomainCoreApi<Block>
92        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
93        + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
94    AccountId: Encode + Decode,
95{
96    /// Task manager.
97    pub task_manager: TaskManager,
98    /// Full client.
99    pub client: C,
100    /// Backend.
101    pub backend: Arc<FullBackend<Block>>,
102    /// Code executor.
103    pub code_executor: Arc<CodeExecutor>,
104    /// Network service.
105    pub network_service: Arc<dyn NetworkService>,
106    /// Sync service.
107    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
108    /// RPCHandlers to make RPC queries.
109    pub rpc_handlers: sc_service::RpcHandlers,
110    /// Operator.
111    pub operator: DomainOperator<Block, CBlock, CClient, RuntimeApi>,
112    /// Transaction pool
113    pub transaction_pool: Arc<FullPool<RuntimeApi>>,
114
115    _phantom_data: PhantomData<AccountId>,
116}
117
118/// A transaction pool for a full node.
119pub type FullPool<RuntimeApi> =
120    BasicPool<FullChainApi<FullClient<Block, RuntimeApi>, Block>, Block>;
121
122/// Constructs a partial domain node.
123#[allow(clippy::type_complexity)]
124#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
125fn new_partial<RuntimeApi, CBlock, CClient, BIMP>(
126    domain_id: DomainId,
127    config: &ServiceConfiguration,
128    consensus_client: Arc<CClient>,
129    domain_backend: Arc<FullBackend<Block>>,
130    block_import_provider: &BIMP,
131    confirmation_depth_k: NumberFor<CBlock>,
132    snap_sync: bool,
133) -> Result<
134    PartialComponents<
135        FullClient<Block, RuntimeApi>,
136        FullBackend<Block>,
137        (),
138        sc_consensus::DefaultImportQueue<Block>,
139        FullPool<RuntimeApi>,
140        (
141            Option<Telemetry>,
142            Option<TelemetryWorkerHandle>,
143            Arc<RuntimeExecutor>,
144            BoxBlockImport<Block>,
145        ),
146    >,
147    sc_service::Error,
148>
149where
150    CBlock: BlockT,
151    NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
152    CBlock::Hash: From<Hash> + Into<Hash>,
153    CClient: HeaderBackend<CBlock>
154        + BlockBackend<CBlock>
155        + ProvideRuntimeApi<CBlock>
156        + Send
157        + Sync
158        + 'static,
159    CClient::Api: DomainsApi<CBlock, Header>
160        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
161        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
162    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
163    RuntimeApi::RuntimeApi: TaggedTransactionQueue<Block>
164        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
165        + ApiExt<Block>,
166    BIMP: BlockImportProvider<Block, FullClient<Block, RuntimeApi>>,
167{
168    let telemetry = config
169        .telemetry_endpoints
170        .clone()
171        .filter(|x| !x.is_empty())
172        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
173            let worker = TelemetryWorker::new(16)?;
174            let telemetry = worker.handle().new_telemetry(endpoints);
175            Ok((worker, telemetry))
176        })
177        .transpose()?;
178
179    let executor = sc_service::new_wasm_executor(&config.executor);
180
181    let genesis_block_builder = CustomGenesisBlockBuilder::<_, CBlock, _, _, _>::new(
182        domain_id,
183        consensus_client.clone(),
184        config.chain_spec.as_storage_builder(),
185        !snap_sync,
186        domain_backend.clone(),
187        executor.clone(),
188    )?;
189
190    let (client, backend, keystore_container, task_manager) =
191        sc_service::new_full_parts_with_genesis_builder(
192            config,
193            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
194            executor.clone(),
195            domain_backend,
196            genesis_block_builder,
197            false,
198        )?;
199
200    let client = Arc::new(client);
201
202    let executor = Arc::new(executor);
203    client.execution_extensions().set_extensions_factory(
204        ExtensionsFactory::<_, CBlock, Block, _>::new(
205            consensus_client.clone(),
206            executor.clone(),
207            confirmation_depth_k.into(),
208        ),
209    );
210
211    let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
212
213    let telemetry = telemetry.map(|(worker, telemetry)| {
214        task_manager
215            .spawn_handle()
216            .spawn("telemetry", None, worker.run());
217        telemetry
218    });
219
220    let transaction_pool = Arc::from(BasicPool::new_full(
221        Default::default(),
222        config.role.is_authority().into(),
223        config.prometheus_registry(),
224        task_manager.spawn_essential_handle(),
225        client.clone(),
226    ));
227
228    let import_queue = BasicQueue::new(
229        domain_client_consensus_relay_chain::Verifier::default(),
230        Box::new(block_import_provider.block_import(client.clone())),
231        None,
232        &task_manager.spawn_essential_handle(),
233        config.prometheus_registry(),
234    );
235
236    let params = PartialComponents {
237        backend,
238        client: client.clone(),
239        import_queue,
240        keystore_container,
241        task_manager,
242        transaction_pool,
243        select_chain: (),
244        other: (
245            telemetry,
246            telemetry_worker_handle,
247            executor,
248            Box::new(block_import_provider.block_import(client)) as Box<_>,
249        ),
250    };
251
252    Ok(params)
253}
254
255pub struct DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>
256where
257    CBlock: BlockT,
258{
259    pub domain_id: DomainId,
260    pub domain_config: ServiceConfiguration,
261    pub domain_created_at: NumberFor<CBlock>,
262    pub maybe_operator_id: Option<OperatorId>,
263    pub consensus_client: Arc<CClient>,
264    pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
265    pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
266    pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
267    pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
268    pub gossip_message_sink: GossipMessageSink,
269    pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
270    pub provider: Provider,
271    pub skip_empty_bundle_production: bool,
272    pub skip_out_of_order_slot: bool,
273    pub confirmation_depth_k: NumberFor<CBlock>,
274    pub challenge_period: NumberFor<CBlock>,
275    pub consensus_chain_sync_params: Option<ConsensusChainSyncParams<CBlock, HeaderFor<Block>>>,
276    pub domain_backend: Arc<FullBackend<Block>>,
277}
278
279/// Builds service for a domain full node.
280pub async fn new_full<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, RuntimeApi, AccountId, Provider>(
281    domain_params: DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>,
282) -> sc_service::error::Result<
283    NewFull<
284        Arc<FullClient<Block, RuntimeApi>>,
285        RuntimeExecutor,
286        CBlock,
287        CClient,
288        RuntimeApi,
289        AccountId,
290    >,
291>
292where
293    CBlock: BlockT,
294    NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
295    CBlock::Hash: From<Hash> + Into<Hash>,
296    CClient: HeaderBackend<CBlock>
297        + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
298        + BlockBackend<CBlock>
299        + ProofProvider<CBlock>
300        + ProvideRuntimeApi<CBlock>
301        + BlockchainEvents<CBlock>
302        + AuxStore
303        + Send
304        + Sync
305        + 'static,
306    CClient::Api: DomainsApi<CBlock, Header>
307        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>
308        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
309        + BundleProducerElectionApi<CBlock, subspace_runtime_primitives::Balance>
310        + FraudProofApi<CBlock, Header>
311        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
312    IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + Unpin + 'static,
313    CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + Unpin + 'static,
314    NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static,
315    ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
316    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
317    RuntimeApi::RuntimeApi: ApiExt<Block>
318        + Metadata<Block>
319        + BlockBuilder<Block>
320        + OffchainWorkerApi<Block>
321        + SessionKeys<Block>
322        + DomainCoreApi<Block>
323        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
324        + TaggedTransactionQueue<Block>
325        + AccountNonceApi<Block, AccountId, Nonce>
326        + TransactionPaymentRuntimeApi<Block, Balance>
327        + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
328    AccountId: DeserializeOwned
329        + Encode
330        + Decode
331        + Clone
332        + Debug
333        + Display
334        + FromStr
335        + Sync
336        + Send
337        + 'static,
338    Provider: RpcProvider<
339            Block,
340            FullClient<Block, RuntimeApi>,
341            FullPool<RuntimeApi>,
342            TFullBackend<Block>,
343            AccountId,
344            CreateInherentDataProvider<CClient, CBlock>,
345        > + BlockImportProvider<Block, FullClient<Block, RuntimeApi>>
346        + 'static,
347{
348    let DomainParams {
349        domain_id,
350        maybe_operator_id,
351        mut domain_config,
352        domain_created_at,
353        consensus_client,
354        consensus_offchain_tx_pool_factory,
355        domain_sync_oracle,
356        consensus_network,
357        operator_streams,
358        gossip_message_sink,
359        domain_message_receiver,
360        provider,
361        skip_empty_bundle_production,
362        skip_out_of_order_slot,
363        confirmation_depth_k,
364        consensus_chain_sync_params,
365        challenge_period,
366        domain_backend,
367    } = domain_params;
368
369    // TODO: Do we even need block announcement on domain node?
370    // domain_config.announce_block = false;
371
372    let params = new_partial(
373        domain_id,
374        &domain_config,
375        consensus_client.clone(),
376        domain_backend,
377        &provider,
378        confirmation_depth_k,
379        consensus_chain_sync_params.is_some(),
380    )?;
381
382    let (mut telemetry, _telemetry_worker_handle, code_executor, block_import) = params.other;
383
384    let client = params.client.clone();
385    let backend = params.backend.clone();
386
387    let transaction_pool = params.transaction_pool.clone();
388    let mut task_manager = params.task_manager;
389    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, NetworkWorker<_, _>>::new(
390        &domain_config.network,
391        domain_config
392            .prometheus_config
393            .as_ref()
394            .map(|cfg| cfg.registry.clone()),
395    );
396
397    let (
398        network_service,
399        system_rpc_tx,
400        tx_handler_controller,
401        sync_service,
402        network_service_handle,
403        block_downloader,
404    ) = build_network(BuildNetworkParams {
405        config: &domain_config,
406        net_config,
407        client: client.clone(),
408        transaction_pool: transaction_pool.clone(),
409        spawn_handle: task_manager.spawn_handle(),
410        import_queue: params.import_queue,
411        // TODO: we might want to re-enable this some day.
412        block_announce_validator_builder: None,
413        warp_sync_config: None,
414        block_relay: None,
415        metrics: NotificationMetrics::new(
416            domain_config
417                .prometheus_config
418                .as_ref()
419                .map(|cfg| &cfg.registry),
420        ),
421    })?;
422
423    let fork_id = domain_config.chain_spec.fork_id().map(String::from);
424
425    let is_authority = domain_config.role.is_authority();
426    domain_config.rpc.id_provider = provider.rpc_id();
427    let rpc_builder = {
428        let deps = crate::rpc::FullDeps {
429            client: client.clone(),
430            pool: transaction_pool.clone(),
431            network: network_service.clone(),
432            sync: sync_service.clone(),
433            is_authority,
434            prometheus_registry: domain_config.prometheus_registry().cloned(),
435            database_source: domain_config.database.clone(),
436            task_spawner: task_manager.spawn_handle(),
437            backend: backend.clone(),
438            // This is required by the eth rpc to create pending state using the underlying
439            // consensus provider. In our case, the consensus provider is empty and
440            // as a result this is not used at all. Providing this just to make the api
441            // compatible
442            create_inherent_data_provider: CreateInherentDataProvider::new(
443                consensus_client.clone(),
444                // It is safe to pass empty consensus hash here as explained above
445                None,
446                domain_id,
447            ),
448        };
449
450        let spawn_essential = task_manager.spawn_essential_handle();
451        let rpc_deps = provider.deps(deps)?;
452        Box::new(move |subscription_task_executor| {
453            let spawn_essential = spawn_essential.clone();
454            provider
455                .rpc_builder(
456                    rpc_deps.clone(),
457                    subscription_task_executor,
458                    spawn_essential,
459                )
460                .map_err(Into::into)
461        })
462    };
463
464    let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
465        rpc_builder,
466        client: client.clone(),
467        transaction_pool: transaction_pool.clone(),
468        task_manager: &mut task_manager,
469        config: domain_config,
470        keystore: params.keystore_container.keystore(),
471        backend: backend.clone(),
472        network: network_service.clone(),
473        system_rpc_tx,
474        tx_handler_controller,
475        sync_service: sync_service.clone(),
476        telemetry: telemetry.as_mut(),
477    })?;
478
479    let spawn_essential = task_manager.spawn_essential_handle();
480    let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
481
482    let operator = Operator::new(
483        Box::new(spawn_essential.clone()),
484        OperatorParams {
485            domain_id,
486            domain_created_at,
487            consensus_client: consensus_client.clone(),
488            consensus_offchain_tx_pool_factory,
489            domain_sync_oracle: domain_sync_oracle.clone(),
490            client: client.clone(),
491            transaction_pool: transaction_pool.clone(),
492            backend: backend.clone(),
493            code_executor: code_executor.clone(),
494            maybe_operator_id,
495            keystore: params.keystore_container.keystore(),
496            bundle_sender: Arc::new(bundle_sender),
497            operator_streams,
498            consensus_confirmation_depth_k: confirmation_depth_k,
499            block_import: Arc::new(block_import),
500            skip_empty_bundle_production,
501            skip_out_of_order_slot,
502            sync_service: sync_service.clone(),
503            network_service: Arc::clone(&network_service),
504            block_downloader,
505            consensus_chain_sync_params,
506            domain_fork_id: fork_id,
507            domain_network_service_handle: network_service_handle,
508            challenge_period,
509        },
510    )
511    .await?;
512
513    if is_authority {
514        let relayer_worker = domain_client_message_relayer::worker::start_relaying_messages(
515            domain_id,
516            consensus_client.clone(),
517            client.clone(),
518            confirmation_depth_k,
519            // domain relayer will use consensus chain sync oracle instead of domain sync oracle
520            // since domain sync oracle will always return `synced` due to force sync being set.
521            domain_sync_oracle.clone(),
522            gossip_message_sink.clone(),
523        );
524
525        spawn_essential.spawn_essential_blocking("domain-relayer", None, Box::pin(relayer_worker));
526
527        let channel_update_worker =
528            domain_client_message_relayer::worker::gossip_channel_updates::<_, _, CBlock, _>(
529                ChainId::Domain(domain_id),
530                client.clone(),
531                // domain will use consensus chain sync oracle instead of domain sync oracle
532                // since domain sync oracle will always return `synced` due to force sync being set.
533                domain_sync_oracle.clone(),
534                gossip_message_sink,
535            );
536
537        spawn_essential.spawn_essential_blocking(
538            "domain-channel-update-worker",
539            None,
540            Box::pin(channel_update_worker),
541        );
542    }
543
544    // Start cross domain message listener for domain
545    let domain_listener =
546        cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
547            ChainId::Domain(domain_id),
548            consensus_client.clone(),
549            client.clone(),
550            params.transaction_pool.clone(),
551            consensus_network,
552            domain_message_receiver,
553            code_executor.clone(),
554            domain_sync_oracle,
555        );
556
557    spawn_essential.spawn_essential_blocking(
558        "domain-message-listener",
559        None,
560        Box::pin(domain_listener),
561    );
562
563    let new_full = NewFull {
564        task_manager,
565        client,
566        backend,
567        code_executor,
568        network_service,
569        sync_service,
570        rpc_handlers,
571        operator,
572        transaction_pool: params.transaction_pool,
573        _phantom_data: Default::default(),
574    };
575
576    Ok(new_full)
577}