domain_service/
domain.rs

1use crate::network::build_network;
2use crate::providers::{BlockImportProvider, RpcProvider};
3use crate::{FullBackend, FullClient};
4use cross_domain_message_gossip::ChainMsg;
5use domain_block_builder::CustomGenesisBlockBuilder;
6use domain_block_preprocessor::inherents::CreateInherentDataProvider;
7use domain_client_message_relayer::GossipMessageSink;
8use domain_client_operator::snap_sync::ConsensusChainSyncParams;
9use domain_client_operator::{Operator, OperatorParams, OperatorStreams};
10use domain_runtime_primitives::opaque::{Block, Header};
11use domain_runtime_primitives::{Balance, Hash};
12use futures::channel::mpsc;
13use futures::Stream;
14use pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi;
15use sc_client_api::{
16    AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider,
17    ProofProvider,
18};
19use sc_consensus::{BasicQueue, BoxBlockImport};
20use sc_domains::{ExtensionsFactory, RuntimeExecutor};
21use sc_network::service::traits::NetworkService;
22use sc_network::{NetworkPeers, NetworkWorker, NotificationMetrics};
23use sc_service::{
24    BuildNetworkParams, Configuration as ServiceConfiguration, NetworkStarter, PartialComponents,
25    SpawnTasksParams, TFullBackend, TaskManager,
26};
27use sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle};
28use sc_transaction_pool::{BasicPool, FullChainApi};
29use sc_transaction_pool_api::OffchainTransactionPoolFactory;
30use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
31use serde::de::DeserializeOwned;
32use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
33use sp_block_builder::BlockBuilder;
34use sp_blockchain::{HeaderBackend, HeaderMetadata};
35use sp_consensus::SyncOracle;
36use sp_consensus_slots::Slot;
37use sp_core::traits::SpawnEssentialNamed;
38use sp_core::{Decode, Encode, H256};
39use sp_domains::core_api::DomainCoreApi;
40use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId};
41use sp_domains_fraud_proof::FraudProofApi;
42use sp_messenger::messages::ChainId;
43use sp_messenger::{MessengerApi, RelayerApi};
44use sp_mmr_primitives::MmrApi;
45use sp_offchain::OffchainWorkerApi;
46use sp_runtime::traits::{Block as BlockT, NumberFor};
47use sp_session::SessionKeys;
48use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
49use std::fmt::{Debug, Display};
50use std::marker::PhantomData;
51use std::str::FromStr;
52use std::sync::Arc;
53use subspace_core_primitives::pot::PotOutput;
54use subspace_runtime_primitives::Nonce;
55use substrate_frame_rpc_system::AccountNonceApi;
56
57pub type DomainOperator<Block, CBlock, CClient, RuntimeApi> = Operator<
58    Block,
59    CBlock,
60    FullClient<Block, RuntimeApi>,
61    CClient,
62    FullPool<RuntimeApi>,
63    FullBackend<Block>,
64    RuntimeExecutor,
65>;
66
67/// Domain full node along with some other components.
68pub struct NewFull<C, CodeExecutor, CBlock, CClient, RuntimeApi, AccountId>
69where
70    Block: BlockT,
71    CBlock: BlockT,
72    NumberFor<CBlock>: From<NumberFor<Block>>,
73    CBlock::Hash: From<Hash>,
74    CClient: HeaderBackend<CBlock>
75        + BlockBackend<CBlock>
76        + ProvideRuntimeApi<CBlock>
77        + Send
78        + Sync
79        + 'static,
80    CClient::Api:
81        DomainsApi<CBlock, Header> + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>,
82    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
83    RuntimeApi::RuntimeApi: ApiExt<Block>
84        + Metadata<Block>
85        + AccountNonceApi<Block, AccountId, Nonce>
86        + BlockBuilder<Block>
87        + OffchainWorkerApi<Block>
88        + SessionKeys<Block>
89        + TaggedTransactionQueue<Block>
90        + TransactionPaymentRuntimeApi<Block, Balance>
91        + DomainCoreApi<Block>
92        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
93        + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
94    AccountId: Encode + Decode,
95{
96    /// Task manager.
97    pub task_manager: TaskManager,
98    /// Full client.
99    pub client: C,
100    /// Backend.
101    pub backend: Arc<FullBackend<Block>>,
102    /// Code executor.
103    pub code_executor: Arc<CodeExecutor>,
104    /// Network service.
105    pub network_service: Arc<dyn NetworkService>,
106    /// Sync service.
107    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
108    /// RPCHandlers to make RPC queries.
109    pub rpc_handlers: sc_service::RpcHandlers,
110    /// Network starter.
111    pub network_starter: NetworkStarter,
112    /// Operator.
113    pub operator: DomainOperator<Block, CBlock, CClient, RuntimeApi>,
114    /// Transaction pool
115    pub transaction_pool: Arc<FullPool<RuntimeApi>>,
116
117    _phantom_data: PhantomData<AccountId>,
118}
119
120/// A transaction pool for a full node.
121pub type FullPool<RuntimeApi> =
122    BasicPool<FullChainApi<FullClient<Block, RuntimeApi>, Block>, Block>;
123
124/// Constructs a partial domain node.
125#[allow(clippy::type_complexity)]
126#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
127fn new_partial<RuntimeApi, CBlock, CClient, BIMP>(
128    domain_id: DomainId,
129    config: &ServiceConfiguration,
130    consensus_client: Arc<CClient>,
131    domain_backend: Arc<FullBackend<Block>>,
132    block_import_provider: &BIMP,
133    confirmation_depth_k: NumberFor<CBlock>,
134    snap_sync: bool,
135) -> Result<
136    PartialComponents<
137        FullClient<Block, RuntimeApi>,
138        FullBackend<Block>,
139        (),
140        sc_consensus::DefaultImportQueue<Block>,
141        FullPool<RuntimeApi>,
142        (
143            Option<Telemetry>,
144            Option<TelemetryWorkerHandle>,
145            Arc<RuntimeExecutor>,
146            BoxBlockImport<Block>,
147        ),
148    >,
149    sc_service::Error,
150>
151where
152    CBlock: BlockT,
153    NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
154    CBlock::Hash: From<Hash> + Into<Hash>,
155    CClient: HeaderBackend<CBlock>
156        + BlockBackend<CBlock>
157        + ProvideRuntimeApi<CBlock>
158        + Send
159        + Sync
160        + 'static,
161    CClient::Api: DomainsApi<CBlock, Header>
162        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
163        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
164    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
165    RuntimeApi::RuntimeApi: TaggedTransactionQueue<Block>
166        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
167        + ApiExt<Block>,
168    BIMP: BlockImportProvider<Block, FullClient<Block, RuntimeApi>>,
169{
170    let telemetry = config
171        .telemetry_endpoints
172        .clone()
173        .filter(|x| !x.is_empty())
174        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
175            let worker = TelemetryWorker::new(16)?;
176            let telemetry = worker.handle().new_telemetry(endpoints);
177            Ok((worker, telemetry))
178        })
179        .transpose()?;
180
181    let executor = sc_service::new_wasm_executor(&config.executor);
182
183    let genesis_block_builder = CustomGenesisBlockBuilder::<_, CBlock, _, _, _>::new(
184        domain_id,
185        consensus_client.clone(),
186        config.chain_spec.as_storage_builder(),
187        !snap_sync,
188        domain_backend.clone(),
189        executor.clone(),
190    )?;
191
192    let (client, backend, keystore_container, task_manager) =
193        sc_service::new_full_parts_with_genesis_builder(
194            config,
195            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
196            executor.clone(),
197            domain_backend,
198            genesis_block_builder,
199            false,
200        )?;
201
202    let client = Arc::new(client);
203
204    let executor = Arc::new(executor);
205    client.execution_extensions().set_extensions_factory(
206        ExtensionsFactory::<_, CBlock, Block, _>::new(
207            consensus_client.clone(),
208            executor.clone(),
209            confirmation_depth_k.into(),
210        ),
211    );
212
213    let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
214
215    let telemetry = telemetry.map(|(worker, telemetry)| {
216        task_manager
217            .spawn_handle()
218            .spawn("telemetry", None, worker.run());
219        telemetry
220    });
221
222    let transaction_pool = Arc::from(BasicPool::new_full(
223        Default::default(),
224        config.role.is_authority().into(),
225        config.prometheus_registry(),
226        task_manager.spawn_essential_handle(),
227        client.clone(),
228    ));
229
230    let import_queue = BasicQueue::new(
231        domain_client_consensus_relay_chain::Verifier::default(),
232        Box::new(block_import_provider.block_import(client.clone())),
233        None,
234        &task_manager.spawn_essential_handle(),
235        config.prometheus_registry(),
236    );
237
238    let params = PartialComponents {
239        backend,
240        client: client.clone(),
241        import_queue,
242        keystore_container,
243        task_manager,
244        transaction_pool,
245        select_chain: (),
246        other: (
247            telemetry,
248            telemetry_worker_handle,
249            executor,
250            Box::new(block_import_provider.block_import(client)) as Box<_>,
251        ),
252    };
253
254    Ok(params)
255}
256
257pub struct DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>
258where
259    CBlock: BlockT,
260{
261    pub domain_id: DomainId,
262    pub domain_config: ServiceConfiguration,
263    pub domain_created_at: NumberFor<CBlock>,
264    pub maybe_operator_id: Option<OperatorId>,
265    pub consensus_client: Arc<CClient>,
266    pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
267    pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
268    pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
269    pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
270    pub gossip_message_sink: GossipMessageSink,
271    pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
272    pub provider: Provider,
273    pub skip_empty_bundle_production: bool,
274    pub skip_out_of_order_slot: bool,
275    pub confirmation_depth_k: NumberFor<CBlock>,
276    pub challenge_period: NumberFor<CBlock>,
277    pub consensus_chain_sync_params:
278        Option<ConsensusChainSyncParams<CBlock, <Block as BlockT>::Header>>,
279    pub domain_backend: Arc<FullBackend<Block>>,
280}
281
282/// Builds service for a domain full node.
283pub async fn new_full<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, RuntimeApi, AccountId, Provider>(
284    domain_params: DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>,
285) -> sc_service::error::Result<
286    NewFull<
287        Arc<FullClient<Block, RuntimeApi>>,
288        RuntimeExecutor,
289        CBlock,
290        CClient,
291        RuntimeApi,
292        AccountId,
293    >,
294>
295where
296    CBlock: BlockT,
297    NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
298    CBlock::Hash: From<Hash> + Into<Hash>,
299    CClient: HeaderBackend<CBlock>
300        + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
301        + BlockBackend<CBlock>
302        + ProofProvider<CBlock>
303        + ProvideRuntimeApi<CBlock>
304        + BlockchainEvents<CBlock>
305        + AuxStore
306        + Send
307        + Sync
308        + 'static,
309    CClient::Api: DomainsApi<CBlock, Header>
310        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>
311        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
312        + BundleProducerElectionApi<CBlock, subspace_runtime_primitives::Balance>
313        + FraudProofApi<CBlock, Header>
314        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
315    IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + Unpin + 'static,
316    CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + Unpin + 'static,
317    NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static,
318    ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
319    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
320    RuntimeApi::RuntimeApi: ApiExt<Block>
321        + Metadata<Block>
322        + BlockBuilder<Block>
323        + OffchainWorkerApi<Block>
324        + SessionKeys<Block>
325        + DomainCoreApi<Block>
326        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
327        + TaggedTransactionQueue<Block>
328        + AccountNonceApi<Block, AccountId, Nonce>
329        + TransactionPaymentRuntimeApi<Block, Balance>
330        + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
331    AccountId: DeserializeOwned
332        + Encode
333        + Decode
334        + Clone
335        + Debug
336        + Display
337        + FromStr
338        + Sync
339        + Send
340        + 'static,
341    Provider: RpcProvider<
342            Block,
343            FullClient<Block, RuntimeApi>,
344            FullPool<RuntimeApi>,
345            FullChainApi<FullClient<Block, RuntimeApi>, Block>,
346            TFullBackend<Block>,
347            AccountId,
348            CreateInherentDataProvider<CClient, CBlock>,
349        > + BlockImportProvider<Block, FullClient<Block, RuntimeApi>>
350        + 'static,
351{
352    let DomainParams {
353        domain_id,
354        maybe_operator_id,
355        mut domain_config,
356        domain_created_at,
357        consensus_client,
358        consensus_offchain_tx_pool_factory,
359        domain_sync_oracle,
360        consensus_network,
361        operator_streams,
362        gossip_message_sink,
363        domain_message_receiver,
364        provider,
365        skip_empty_bundle_production,
366        skip_out_of_order_slot,
367        confirmation_depth_k,
368        consensus_chain_sync_params,
369        challenge_period,
370        domain_backend,
371    } = domain_params;
372
373    // TODO: Do we even need block announcement on domain node?
374    // domain_config.announce_block = false;
375
376    let params = new_partial(
377        domain_id,
378        &domain_config,
379        consensus_client.clone(),
380        domain_backend,
381        &provider,
382        confirmation_depth_k,
383        consensus_chain_sync_params.is_some(),
384    )?;
385
386    let (mut telemetry, _telemetry_worker_handle, code_executor, block_import) = params.other;
387
388    let client = params.client.clone();
389    let backend = params.backend.clone();
390
391    let transaction_pool = params.transaction_pool.clone();
392    let mut task_manager = params.task_manager;
393    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, NetworkWorker<_, _>>::new(
394        &domain_config.network,
395        domain_config
396            .prometheus_config
397            .as_ref()
398            .map(|cfg| cfg.registry.clone()),
399    );
400
401    let (
402        network_service,
403        system_rpc_tx,
404        tx_handler_controller,
405        network_starter,
406        sync_service,
407        network_service_handle,
408        block_downloader,
409    ) = build_network(BuildNetworkParams {
410        config: &domain_config,
411        net_config,
412        client: client.clone(),
413        transaction_pool: transaction_pool.clone(),
414        spawn_handle: task_manager.spawn_handle(),
415        import_queue: params.import_queue,
416        // TODO: we might want to re-enable this some day.
417        block_announce_validator_builder: None,
418        warp_sync_config: None,
419        block_relay: None,
420        metrics: NotificationMetrics::new(
421            domain_config
422                .prometheus_config
423                .as_ref()
424                .map(|cfg| &cfg.registry),
425        ),
426    })?;
427
428    let fork_id = domain_config.chain_spec.fork_id().map(String::from);
429
430    let is_authority = domain_config.role.is_authority();
431    domain_config.rpc.id_provider = provider.rpc_id();
432    let rpc_builder = {
433        let deps = crate::rpc::FullDeps {
434            client: client.clone(),
435            pool: transaction_pool.clone(),
436            graph: transaction_pool.pool().clone(),
437            network: network_service.clone(),
438            sync: sync_service.clone(),
439            is_authority,
440            prometheus_registry: domain_config.prometheus_registry().cloned(),
441            database_source: domain_config.database.clone(),
442            task_spawner: task_manager.spawn_handle(),
443            backend: backend.clone(),
444            // This is required by the eth rpc to create pending state using the underlying
445            // consensus provider. In our case, the consensus provider is empty and
446            // as a result this is not used at all. Providing this just to make the api
447            // compatible
448            create_inherent_data_provider: CreateInherentDataProvider::new(
449                consensus_client.clone(),
450                // It is safe to pass empty consensus hash here as explained above
451                None,
452                domain_id,
453            ),
454        };
455
456        let spawn_essential = task_manager.spawn_essential_handle();
457        let rpc_deps = provider.deps(deps)?;
458        Box::new(move |subscription_task_executor| {
459            let spawn_essential = spawn_essential.clone();
460            provider
461                .rpc_builder(
462                    rpc_deps.clone(),
463                    subscription_task_executor,
464                    spawn_essential,
465                )
466                .map_err(Into::into)
467        })
468    };
469
470    let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
471        rpc_builder,
472        client: client.clone(),
473        transaction_pool: transaction_pool.clone(),
474        task_manager: &mut task_manager,
475        config: domain_config,
476        keystore: params.keystore_container.keystore(),
477        backend: backend.clone(),
478        network: network_service.clone(),
479        system_rpc_tx,
480        tx_handler_controller,
481        sync_service: sync_service.clone(),
482        telemetry: telemetry.as_mut(),
483    })?;
484
485    let spawn_essential = task_manager.spawn_essential_handle();
486    let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
487
488    let operator = Operator::new(
489        Box::new(spawn_essential.clone()),
490        OperatorParams {
491            domain_id,
492            domain_created_at,
493            consensus_client: consensus_client.clone(),
494            consensus_offchain_tx_pool_factory,
495            domain_sync_oracle: domain_sync_oracle.clone(),
496            client: client.clone(),
497            transaction_pool: transaction_pool.clone(),
498            backend: backend.clone(),
499            code_executor: code_executor.clone(),
500            maybe_operator_id,
501            keystore: params.keystore_container.keystore(),
502            bundle_sender: Arc::new(bundle_sender),
503            operator_streams,
504            consensus_confirmation_depth_k: confirmation_depth_k,
505            block_import: Arc::new(block_import),
506            skip_empty_bundle_production,
507            skip_out_of_order_slot,
508            sync_service: sync_service.clone(),
509            network_service: Arc::clone(&network_service),
510            block_downloader,
511            consensus_chain_sync_params,
512            domain_fork_id: fork_id,
513            domain_network_service_handle: network_service_handle,
514            challenge_period,
515        },
516    )
517    .await?;
518
519    if is_authority {
520        let relayer_worker = domain_client_message_relayer::worker::start_relaying_messages(
521            domain_id,
522            consensus_client.clone(),
523            client.clone(),
524            confirmation_depth_k,
525            // domain relayer will use consensus chain sync oracle instead of domain sync oracle
526            // since domain sync oracle will always return `synced` due to force sync being set.
527            domain_sync_oracle.clone(),
528            gossip_message_sink.clone(),
529        );
530
531        spawn_essential.spawn_essential_blocking("domain-relayer", None, Box::pin(relayer_worker));
532
533        let channel_update_worker =
534            domain_client_message_relayer::worker::gossip_channel_updates::<_, _, CBlock, _>(
535                ChainId::Domain(domain_id),
536                client.clone(),
537                // domain will use consensus chain sync oracle instead of domain sync oracle
538                // since domain sync oracle will always return `synced` due to force sync being set.
539                domain_sync_oracle.clone(),
540                gossip_message_sink,
541            );
542
543        spawn_essential.spawn_essential_blocking(
544            "domain-channel-update-worker",
545            None,
546            Box::pin(channel_update_worker),
547        );
548    }
549
550    // Start cross domain message listener for domain
551    let domain_listener =
552        cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
553            ChainId::Domain(domain_id),
554            consensus_client.clone(),
555            client.clone(),
556            params.transaction_pool.clone(),
557            consensus_network,
558            domain_message_receiver,
559            code_executor.clone(),
560            domain_sync_oracle,
561        );
562
563    spawn_essential.spawn_essential_blocking(
564        "domain-message-listener",
565        None,
566        Box::pin(domain_listener),
567    );
568
569    let new_full = NewFull {
570        task_manager,
571        client,
572        backend,
573        code_executor,
574        network_service,
575        sync_service,
576        rpc_handlers,
577        network_starter,
578        operator,
579        transaction_pool: params.transaction_pool,
580        _phantom_data: Default::default(),
581    };
582
583    Ok(new_full)
584}