domain_service/
domain.rs

1use crate::network::build_network;
2use crate::providers::{BlockImportProvider, RpcProvider};
3use crate::{FullBackend, FullClient};
4use cross_domain_message_gossip::ChainMsg;
5use domain_block_builder::CustomGenesisBlockBuilder;
6use domain_block_preprocessor::inherents::CreateInherentDataProvider;
7use domain_client_message_relayer::GossipMessageSink;
8use domain_client_operator::snap_sync::ConsensusChainSyncParams;
9use domain_client_operator::{Operator, OperatorParams, OperatorStreams};
10use domain_runtime_primitives::opaque::{Block, Header};
11use domain_runtime_primitives::{Balance, Hash};
12use futures::Stream;
13use futures::channel::mpsc;
14use pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi;
15use sc_client_api::{
16    AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider,
17    ProofProvider,
18};
19use sc_consensus::{BasicQueue, BoxBlockImport};
20use sc_domains::{ExtensionsFactory, RuntimeExecutor};
21use sc_network::service::traits::NetworkService;
22use sc_network::{NetworkPeers, NetworkWorker, NotificationMetrics};
23use sc_service::{
24    BuildNetworkParams, Configuration as ServiceConfiguration, NetworkStarter, PartialComponents,
25    SpawnTasksParams, TFullBackend, TaskManager,
26};
27use sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle};
28use sc_transaction_pool::{BasicPool, FullChainApi};
29use sc_transaction_pool_api::OffchainTransactionPoolFactory;
30use sc_utils::mpsc::{TracingUnboundedReceiver, tracing_unbounded};
31use serde::de::DeserializeOwned;
32use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
33use sp_block_builder::BlockBuilder;
34use sp_blockchain::{HeaderBackend, HeaderMetadata};
35use sp_consensus::SyncOracle;
36use sp_consensus_slots::Slot;
37use sp_core::traits::SpawnEssentialNamed;
38use sp_core::{Decode, Encode, H256};
39use sp_domains::core_api::DomainCoreApi;
40use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId};
41use sp_domains_fraud_proof::FraudProofApi;
42use sp_messenger::messages::ChainId;
43use sp_messenger::{MessengerApi, RelayerApi};
44use sp_mmr_primitives::MmrApi;
45use sp_offchain::OffchainWorkerApi;
46use sp_runtime::traits::{Block as BlockT, NumberFor};
47use sp_session::SessionKeys;
48use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
49use std::fmt::{Debug, Display};
50use std::marker::PhantomData;
51use std::str::FromStr;
52use std::sync::Arc;
53use subspace_core_primitives::pot::PotOutput;
54use subspace_runtime_primitives::{HeaderFor, Nonce};
55use substrate_frame_rpc_system::AccountNonceApi;
56
57pub type DomainOperator<Block, CBlock, CClient, RuntimeApi> = Operator<
58    Block,
59    CBlock,
60    FullClient<Block, RuntimeApi>,
61    CClient,
62    FullPool<RuntimeApi>,
63    FullBackend<Block>,
64    RuntimeExecutor,
65>;
66
67/// Domain full node along with some other components.
68pub struct NewFull<C, CodeExecutor, CBlock, CClient, RuntimeApi, AccountId>
69where
70    Block: BlockT,
71    CBlock: BlockT,
72    NumberFor<CBlock>: From<NumberFor<Block>>,
73    CBlock::Hash: From<Hash>,
74    CClient: HeaderBackend<CBlock>
75        + BlockBackend<CBlock>
76        + ProvideRuntimeApi<CBlock>
77        + Send
78        + Sync
79        + 'static,
80    CClient::Api:
81        DomainsApi<CBlock, Header> + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>,
82    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
83    RuntimeApi::RuntimeApi: ApiExt<Block>
84        + Metadata<Block>
85        + AccountNonceApi<Block, AccountId, Nonce>
86        + BlockBuilder<Block>
87        + OffchainWorkerApi<Block>
88        + SessionKeys<Block>
89        + TaggedTransactionQueue<Block>
90        + TransactionPaymentRuntimeApi<Block, Balance>
91        + DomainCoreApi<Block>
92        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
93        + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
94    AccountId: Encode + Decode,
95{
96    /// Task manager.
97    pub task_manager: TaskManager,
98    /// Full client.
99    pub client: C,
100    /// Backend.
101    pub backend: Arc<FullBackend<Block>>,
102    /// Code executor.
103    pub code_executor: Arc<CodeExecutor>,
104    /// Network service.
105    pub network_service: Arc<dyn NetworkService>,
106    /// Sync service.
107    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
108    /// RPCHandlers to make RPC queries.
109    pub rpc_handlers: sc_service::RpcHandlers,
110    /// Network starter.
111    pub network_starter: NetworkStarter,
112    /// Operator.
113    pub operator: DomainOperator<Block, CBlock, CClient, RuntimeApi>,
114    /// Transaction pool
115    pub transaction_pool: Arc<FullPool<RuntimeApi>>,
116
117    _phantom_data: PhantomData<AccountId>,
118}
119
120/// A transaction pool for a full node.
121pub type FullPool<RuntimeApi> =
122    BasicPool<FullChainApi<FullClient<Block, RuntimeApi>, Block>, Block>;
123
124/// Constructs a partial domain node.
125#[allow(clippy::type_complexity)]
126#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
127fn new_partial<RuntimeApi, CBlock, CClient, BIMP>(
128    domain_id: DomainId,
129    config: &ServiceConfiguration,
130    consensus_client: Arc<CClient>,
131    domain_backend: Arc<FullBackend<Block>>,
132    block_import_provider: &BIMP,
133    confirmation_depth_k: NumberFor<CBlock>,
134    snap_sync: bool,
135) -> Result<
136    PartialComponents<
137        FullClient<Block, RuntimeApi>,
138        FullBackend<Block>,
139        (),
140        sc_consensus::DefaultImportQueue<Block>,
141        FullPool<RuntimeApi>,
142        (
143            Option<Telemetry>,
144            Option<TelemetryWorkerHandle>,
145            Arc<RuntimeExecutor>,
146            BoxBlockImport<Block>,
147        ),
148    >,
149    sc_service::Error,
150>
151where
152    CBlock: BlockT,
153    NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
154    CBlock::Hash: From<Hash> + Into<Hash>,
155    CClient: HeaderBackend<CBlock>
156        + BlockBackend<CBlock>
157        + ProvideRuntimeApi<CBlock>
158        + Send
159        + Sync
160        + 'static,
161    CClient::Api: DomainsApi<CBlock, Header>
162        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
163        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
164    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
165    RuntimeApi::RuntimeApi: TaggedTransactionQueue<Block>
166        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
167        + ApiExt<Block>,
168    BIMP: BlockImportProvider<Block, FullClient<Block, RuntimeApi>>,
169{
170    let telemetry = config
171        .telemetry_endpoints
172        .clone()
173        .filter(|x| !x.is_empty())
174        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
175            let worker = TelemetryWorker::new(16)?;
176            let telemetry = worker.handle().new_telemetry(endpoints);
177            Ok((worker, telemetry))
178        })
179        .transpose()?;
180
181    let executor = sc_service::new_wasm_executor(&config.executor);
182
183    let genesis_block_builder = CustomGenesisBlockBuilder::<_, CBlock, _, _, _>::new(
184        domain_id,
185        consensus_client.clone(),
186        config.chain_spec.as_storage_builder(),
187        !snap_sync,
188        domain_backend.clone(),
189        executor.clone(),
190    )?;
191
192    let (client, backend, keystore_container, task_manager) =
193        sc_service::new_full_parts_with_genesis_builder(
194            config,
195            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
196            executor.clone(),
197            domain_backend,
198            genesis_block_builder,
199            false,
200        )?;
201
202    let client = Arc::new(client);
203
204    let executor = Arc::new(executor);
205    client.execution_extensions().set_extensions_factory(
206        ExtensionsFactory::<_, CBlock, Block, _>::new(
207            consensus_client.clone(),
208            executor.clone(),
209            confirmation_depth_k.into(),
210        ),
211    );
212
213    let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
214
215    let telemetry = telemetry.map(|(worker, telemetry)| {
216        task_manager
217            .spawn_handle()
218            .spawn("telemetry", None, worker.run());
219        telemetry
220    });
221
222    let transaction_pool = Arc::from(BasicPool::new_full(
223        Default::default(),
224        config.role.is_authority().into(),
225        config.prometheus_registry(),
226        task_manager.spawn_essential_handle(),
227        client.clone(),
228    ));
229
230    let import_queue = BasicQueue::new(
231        domain_client_consensus_relay_chain::Verifier::default(),
232        Box::new(block_import_provider.block_import(client.clone())),
233        None,
234        &task_manager.spawn_essential_handle(),
235        config.prometheus_registry(),
236    );
237
238    let params = PartialComponents {
239        backend,
240        client: client.clone(),
241        import_queue,
242        keystore_container,
243        task_manager,
244        transaction_pool,
245        select_chain: (),
246        other: (
247            telemetry,
248            telemetry_worker_handle,
249            executor,
250            Box::new(block_import_provider.block_import(client)) as Box<_>,
251        ),
252    };
253
254    Ok(params)
255}
256
257pub struct DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>
258where
259    CBlock: BlockT,
260{
261    pub domain_id: DomainId,
262    pub domain_config: ServiceConfiguration,
263    pub domain_created_at: NumberFor<CBlock>,
264    pub maybe_operator_id: Option<OperatorId>,
265    pub consensus_client: Arc<CClient>,
266    pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
267    pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
268    pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
269    pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
270    pub gossip_message_sink: GossipMessageSink,
271    pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
272    pub provider: Provider,
273    pub skip_empty_bundle_production: bool,
274    pub skip_out_of_order_slot: bool,
275    pub confirmation_depth_k: NumberFor<CBlock>,
276    pub challenge_period: NumberFor<CBlock>,
277    pub consensus_chain_sync_params: Option<ConsensusChainSyncParams<CBlock, HeaderFor<Block>>>,
278    pub domain_backend: Arc<FullBackend<Block>>,
279}
280
281/// Builds service for a domain full node.
282pub async fn new_full<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, RuntimeApi, AccountId, Provider>(
283    domain_params: DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>,
284) -> sc_service::error::Result<
285    NewFull<
286        Arc<FullClient<Block, RuntimeApi>>,
287        RuntimeExecutor,
288        CBlock,
289        CClient,
290        RuntimeApi,
291        AccountId,
292    >,
293>
294where
295    CBlock: BlockT,
296    NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
297    CBlock::Hash: From<Hash> + Into<Hash>,
298    CClient: HeaderBackend<CBlock>
299        + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
300        + BlockBackend<CBlock>
301        + ProofProvider<CBlock>
302        + ProvideRuntimeApi<CBlock>
303        + BlockchainEvents<CBlock>
304        + AuxStore
305        + Send
306        + Sync
307        + 'static,
308    CClient::Api: DomainsApi<CBlock, Header>
309        + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>
310        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
311        + BundleProducerElectionApi<CBlock, subspace_runtime_primitives::Balance>
312        + FraudProofApi<CBlock, Header>
313        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
314    IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + Unpin + 'static,
315    CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + Unpin + 'static,
316    NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static,
317    ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
318    RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
319    RuntimeApi::RuntimeApi: ApiExt<Block>
320        + Metadata<Block>
321        + BlockBuilder<Block>
322        + OffchainWorkerApi<Block>
323        + SessionKeys<Block>
324        + DomainCoreApi<Block>
325        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
326        + TaggedTransactionQueue<Block>
327        + AccountNonceApi<Block, AccountId, Nonce>
328        + TransactionPaymentRuntimeApi<Block, Balance>
329        + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
330    AccountId: DeserializeOwned
331        + Encode
332        + Decode
333        + Clone
334        + Debug
335        + Display
336        + FromStr
337        + Sync
338        + Send
339        + 'static,
340    Provider: RpcProvider<
341            Block,
342            FullClient<Block, RuntimeApi>,
343            FullPool<RuntimeApi>,
344            FullChainApi<FullClient<Block, RuntimeApi>, Block>,
345            TFullBackend<Block>,
346            AccountId,
347            CreateInherentDataProvider<CClient, CBlock>,
348        > + BlockImportProvider<Block, FullClient<Block, RuntimeApi>>
349        + 'static,
350{
351    let DomainParams {
352        domain_id,
353        maybe_operator_id,
354        mut domain_config,
355        domain_created_at,
356        consensus_client,
357        consensus_offchain_tx_pool_factory,
358        domain_sync_oracle,
359        consensus_network,
360        operator_streams,
361        gossip_message_sink,
362        domain_message_receiver,
363        provider,
364        skip_empty_bundle_production,
365        skip_out_of_order_slot,
366        confirmation_depth_k,
367        consensus_chain_sync_params,
368        challenge_period,
369        domain_backend,
370    } = domain_params;
371
372    // TODO: Do we even need block announcement on domain node?
373    // domain_config.announce_block = false;
374
375    let params = new_partial(
376        domain_id,
377        &domain_config,
378        consensus_client.clone(),
379        domain_backend,
380        &provider,
381        confirmation_depth_k,
382        consensus_chain_sync_params.is_some(),
383    )?;
384
385    let (mut telemetry, _telemetry_worker_handle, code_executor, block_import) = params.other;
386
387    let client = params.client.clone();
388    let backend = params.backend.clone();
389
390    let transaction_pool = params.transaction_pool.clone();
391    let mut task_manager = params.task_manager;
392    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, NetworkWorker<_, _>>::new(
393        &domain_config.network,
394        domain_config
395            .prometheus_config
396            .as_ref()
397            .map(|cfg| cfg.registry.clone()),
398    );
399
400    let (
401        network_service,
402        system_rpc_tx,
403        tx_handler_controller,
404        network_starter,
405        sync_service,
406        network_service_handle,
407        block_downloader,
408    ) = build_network(BuildNetworkParams {
409        config: &domain_config,
410        net_config,
411        client: client.clone(),
412        transaction_pool: transaction_pool.clone(),
413        spawn_handle: task_manager.spawn_handle(),
414        import_queue: params.import_queue,
415        // TODO: we might want to re-enable this some day.
416        block_announce_validator_builder: None,
417        warp_sync_config: None,
418        block_relay: None,
419        metrics: NotificationMetrics::new(
420            domain_config
421                .prometheus_config
422                .as_ref()
423                .map(|cfg| &cfg.registry),
424        ),
425    })?;
426
427    let fork_id = domain_config.chain_spec.fork_id().map(String::from);
428
429    let is_authority = domain_config.role.is_authority();
430    domain_config.rpc.id_provider = provider.rpc_id();
431    let rpc_builder = {
432        let deps = crate::rpc::FullDeps {
433            client: client.clone(),
434            pool: transaction_pool.clone(),
435            graph: transaction_pool.pool().clone(),
436            network: network_service.clone(),
437            sync: sync_service.clone(),
438            is_authority,
439            prometheus_registry: domain_config.prometheus_registry().cloned(),
440            database_source: domain_config.database.clone(),
441            task_spawner: task_manager.spawn_handle(),
442            backend: backend.clone(),
443            // This is required by the eth rpc to create pending state using the underlying
444            // consensus provider. In our case, the consensus provider is empty and
445            // as a result this is not used at all. Providing this just to make the api
446            // compatible
447            create_inherent_data_provider: CreateInherentDataProvider::new(
448                consensus_client.clone(),
449                // It is safe to pass empty consensus hash here as explained above
450                None,
451                domain_id,
452            ),
453        };
454
455        let spawn_essential = task_manager.spawn_essential_handle();
456        let rpc_deps = provider.deps(deps)?;
457        Box::new(move |subscription_task_executor| {
458            let spawn_essential = spawn_essential.clone();
459            provider
460                .rpc_builder(
461                    rpc_deps.clone(),
462                    subscription_task_executor,
463                    spawn_essential,
464                )
465                .map_err(Into::into)
466        })
467    };
468
469    let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
470        rpc_builder,
471        client: client.clone(),
472        transaction_pool: transaction_pool.clone(),
473        task_manager: &mut task_manager,
474        config: domain_config,
475        keystore: params.keystore_container.keystore(),
476        backend: backend.clone(),
477        network: network_service.clone(),
478        system_rpc_tx,
479        tx_handler_controller,
480        sync_service: sync_service.clone(),
481        telemetry: telemetry.as_mut(),
482    })?;
483
484    let spawn_essential = task_manager.spawn_essential_handle();
485    let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
486
487    let operator = Operator::new(
488        Box::new(spawn_essential.clone()),
489        OperatorParams {
490            domain_id,
491            domain_created_at,
492            consensus_client: consensus_client.clone(),
493            consensus_offchain_tx_pool_factory,
494            domain_sync_oracle: domain_sync_oracle.clone(),
495            client: client.clone(),
496            transaction_pool: transaction_pool.clone(),
497            backend: backend.clone(),
498            code_executor: code_executor.clone(),
499            maybe_operator_id,
500            keystore: params.keystore_container.keystore(),
501            bundle_sender: Arc::new(bundle_sender),
502            operator_streams,
503            consensus_confirmation_depth_k: confirmation_depth_k,
504            block_import: Arc::new(block_import),
505            skip_empty_bundle_production,
506            skip_out_of_order_slot,
507            sync_service: sync_service.clone(),
508            network_service: Arc::clone(&network_service),
509            block_downloader,
510            consensus_chain_sync_params,
511            domain_fork_id: fork_id,
512            domain_network_service_handle: network_service_handle,
513            challenge_period,
514        },
515    )
516    .await?;
517
518    if is_authority {
519        let relayer_worker = domain_client_message_relayer::worker::start_relaying_messages(
520            domain_id,
521            consensus_client.clone(),
522            client.clone(),
523            confirmation_depth_k,
524            // domain relayer will use consensus chain sync oracle instead of domain sync oracle
525            // since domain sync oracle will always return `synced` due to force sync being set.
526            domain_sync_oracle.clone(),
527            gossip_message_sink.clone(),
528        );
529
530        spawn_essential.spawn_essential_blocking("domain-relayer", None, Box::pin(relayer_worker));
531
532        let channel_update_worker =
533            domain_client_message_relayer::worker::gossip_channel_updates::<_, _, CBlock, _>(
534                ChainId::Domain(domain_id),
535                client.clone(),
536                // domain will use consensus chain sync oracle instead of domain sync oracle
537                // since domain sync oracle will always return `synced` due to force sync being set.
538                domain_sync_oracle.clone(),
539                gossip_message_sink,
540            );
541
542        spawn_essential.spawn_essential_blocking(
543            "domain-channel-update-worker",
544            None,
545            Box::pin(channel_update_worker),
546        );
547    }
548
549    // Start cross domain message listener for domain
550    let domain_listener =
551        cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
552            ChainId::Domain(domain_id),
553            consensus_client.clone(),
554            client.clone(),
555            params.transaction_pool.clone(),
556            consensus_network,
557            domain_message_receiver,
558            code_executor.clone(),
559            domain_sync_oracle,
560        );
561
562    spawn_essential.spawn_essential_blocking(
563        "domain-message-listener",
564        None,
565        Box::pin(domain_listener),
566    );
567
568    let new_full = NewFull {
569        task_manager,
570        client,
571        backend,
572        code_executor,
573        network_service,
574        sync_service,
575        rpc_handlers,
576        network_starter,
577        operator,
578        transaction_pool: params.transaction_pool,
579        _phantom_data: Default::default(),
580    };
581
582    Ok(new_full)
583}