1use crate::network::build_network;
2use crate::providers::{BlockImportProvider, RpcProvider};
3use crate::{FullBackend, FullClient};
4use cross_domain_message_gossip::ChainMsg;
5use domain_block_builder::CustomGenesisBlockBuilder;
6use domain_block_preprocessor::inherents::CreateInherentDataProvider;
7use domain_client_message_relayer::GossipMessageSink;
8use domain_client_operator::snap_sync::ConsensusChainSyncParams;
9use domain_client_operator::{Operator, OperatorParams, OperatorStreams};
10use domain_runtime_primitives::opaque::{Block, Header};
11use domain_runtime_primitives::{Balance, Hash};
12use futures::channel::mpsc;
13use futures::Stream;
14use pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi;
15use sc_client_api::{
16 AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider,
17 ProofProvider,
18};
19use sc_consensus::{BasicQueue, BoxBlockImport};
20use sc_domains::{ExtensionsFactory, RuntimeExecutor};
21use sc_network::service::traits::NetworkService;
22use sc_network::{NetworkPeers, NetworkWorker, NotificationMetrics};
23use sc_service::{
24 BuildNetworkParams, Configuration as ServiceConfiguration, NetworkStarter, PartialComponents,
25 SpawnTasksParams, TFullBackend, TaskManager,
26};
27use sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle};
28use sc_transaction_pool::{BasicPool, FullChainApi};
29use sc_transaction_pool_api::OffchainTransactionPoolFactory;
30use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
31use serde::de::DeserializeOwned;
32use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
33use sp_block_builder::BlockBuilder;
34use sp_blockchain::{HeaderBackend, HeaderMetadata};
35use sp_consensus::SyncOracle;
36use sp_consensus_slots::Slot;
37use sp_core::traits::SpawnEssentialNamed;
38use sp_core::{Decode, Encode, H256};
39use sp_domains::core_api::DomainCoreApi;
40use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId};
41use sp_domains_fraud_proof::FraudProofApi;
42use sp_messenger::messages::ChainId;
43use sp_messenger::{MessengerApi, RelayerApi};
44use sp_mmr_primitives::MmrApi;
45use sp_offchain::OffchainWorkerApi;
46use sp_runtime::traits::{Block as BlockT, NumberFor};
47use sp_session::SessionKeys;
48use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
49use std::fmt::{Debug, Display};
50use std::marker::PhantomData;
51use std::str::FromStr;
52use std::sync::Arc;
53use subspace_core_primitives::pot::PotOutput;
54use subspace_runtime_primitives::Nonce;
55use substrate_frame_rpc_system::AccountNonceApi;
56
57pub type DomainOperator<Block, CBlock, CClient, RuntimeApi> = Operator<
58 Block,
59 CBlock,
60 FullClient<Block, RuntimeApi>,
61 CClient,
62 FullPool<RuntimeApi>,
63 FullBackend<Block>,
64 RuntimeExecutor,
65>;
66
67pub struct NewFull<C, CodeExecutor, CBlock, CClient, RuntimeApi, AccountId>
69where
70 Block: BlockT,
71 CBlock: BlockT,
72 NumberFor<CBlock>: From<NumberFor<Block>>,
73 CBlock::Hash: From<Hash>,
74 CClient: HeaderBackend<CBlock>
75 + BlockBackend<CBlock>
76 + ProvideRuntimeApi<CBlock>
77 + Send
78 + Sync
79 + 'static,
80 CClient::Api:
81 DomainsApi<CBlock, Header> + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>,
82 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
83 RuntimeApi::RuntimeApi: ApiExt<Block>
84 + Metadata<Block>
85 + AccountNonceApi<Block, AccountId, Nonce>
86 + BlockBuilder<Block>
87 + OffchainWorkerApi<Block>
88 + SessionKeys<Block>
89 + TaggedTransactionQueue<Block>
90 + TransactionPaymentRuntimeApi<Block, Balance>
91 + DomainCoreApi<Block>
92 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
93 + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
94 AccountId: Encode + Decode,
95{
96 pub task_manager: TaskManager,
98 pub client: C,
100 pub backend: Arc<FullBackend<Block>>,
102 pub code_executor: Arc<CodeExecutor>,
104 pub network_service: Arc<dyn NetworkService>,
106 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
108 pub rpc_handlers: sc_service::RpcHandlers,
110 pub network_starter: NetworkStarter,
112 pub operator: DomainOperator<Block, CBlock, CClient, RuntimeApi>,
114 pub transaction_pool: Arc<FullPool<RuntimeApi>>,
116
117 _phantom_data: PhantomData<AccountId>,
118}
119
120pub type FullPool<RuntimeApi> =
122 BasicPool<FullChainApi<FullClient<Block, RuntimeApi>, Block>, Block>;
123
124#[allow(clippy::type_complexity)]
126#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
127fn new_partial<RuntimeApi, CBlock, CClient, BIMP>(
128 domain_id: DomainId,
129 config: &ServiceConfiguration,
130 consensus_client: Arc<CClient>,
131 domain_backend: Arc<FullBackend<Block>>,
132 block_import_provider: &BIMP,
133 confirmation_depth_k: NumberFor<CBlock>,
134 snap_sync: bool,
135) -> Result<
136 PartialComponents<
137 FullClient<Block, RuntimeApi>,
138 FullBackend<Block>,
139 (),
140 sc_consensus::DefaultImportQueue<Block>,
141 FullPool<RuntimeApi>,
142 (
143 Option<Telemetry>,
144 Option<TelemetryWorkerHandle>,
145 Arc<RuntimeExecutor>,
146 BoxBlockImport<Block>,
147 ),
148 >,
149 sc_service::Error,
150>
151where
152 CBlock: BlockT,
153 NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
154 CBlock::Hash: From<Hash> + Into<Hash>,
155 CClient: HeaderBackend<CBlock>
156 + BlockBackend<CBlock>
157 + ProvideRuntimeApi<CBlock>
158 + Send
159 + Sync
160 + 'static,
161 CClient::Api: DomainsApi<CBlock, Header>
162 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
163 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
164 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
165 RuntimeApi::RuntimeApi: TaggedTransactionQueue<Block>
166 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
167 + ApiExt<Block>,
168 BIMP: BlockImportProvider<Block, FullClient<Block, RuntimeApi>>,
169{
170 let telemetry = config
171 .telemetry_endpoints
172 .clone()
173 .filter(|x| !x.is_empty())
174 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
175 let worker = TelemetryWorker::new(16)?;
176 let telemetry = worker.handle().new_telemetry(endpoints);
177 Ok((worker, telemetry))
178 })
179 .transpose()?;
180
181 let executor = sc_service::new_wasm_executor(&config.executor);
182
183 let genesis_block_builder = CustomGenesisBlockBuilder::<_, CBlock, _, _, _>::new(
184 domain_id,
185 consensus_client.clone(),
186 config.chain_spec.as_storage_builder(),
187 !snap_sync,
188 domain_backend.clone(),
189 executor.clone(),
190 )?;
191
192 let (client, backend, keystore_container, task_manager) =
193 sc_service::new_full_parts_with_genesis_builder(
194 config,
195 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
196 executor.clone(),
197 domain_backend,
198 genesis_block_builder,
199 false,
200 )?;
201
202 let client = Arc::new(client);
203
204 let executor = Arc::new(executor);
205 client.execution_extensions().set_extensions_factory(
206 ExtensionsFactory::<_, CBlock, Block, _>::new(
207 consensus_client.clone(),
208 executor.clone(),
209 confirmation_depth_k.into(),
210 ),
211 );
212
213 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
214
215 let telemetry = telemetry.map(|(worker, telemetry)| {
216 task_manager
217 .spawn_handle()
218 .spawn("telemetry", None, worker.run());
219 telemetry
220 });
221
222 let transaction_pool = Arc::from(BasicPool::new_full(
223 Default::default(),
224 config.role.is_authority().into(),
225 config.prometheus_registry(),
226 task_manager.spawn_essential_handle(),
227 client.clone(),
228 ));
229
230 let import_queue = BasicQueue::new(
231 domain_client_consensus_relay_chain::Verifier::default(),
232 Box::new(block_import_provider.block_import(client.clone())),
233 None,
234 &task_manager.spawn_essential_handle(),
235 config.prometheus_registry(),
236 );
237
238 let params = PartialComponents {
239 backend,
240 client: client.clone(),
241 import_queue,
242 keystore_container,
243 task_manager,
244 transaction_pool,
245 select_chain: (),
246 other: (
247 telemetry,
248 telemetry_worker_handle,
249 executor,
250 Box::new(block_import_provider.block_import(client)) as Box<_>,
251 ),
252 };
253
254 Ok(params)
255}
256
257pub struct DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>
258where
259 CBlock: BlockT,
260{
261 pub domain_id: DomainId,
262 pub domain_config: ServiceConfiguration,
263 pub domain_created_at: NumberFor<CBlock>,
264 pub maybe_operator_id: Option<OperatorId>,
265 pub consensus_client: Arc<CClient>,
266 pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
267 pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
268 pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
269 pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
270 pub gossip_message_sink: GossipMessageSink,
271 pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
272 pub provider: Provider,
273 pub skip_empty_bundle_production: bool,
274 pub skip_out_of_order_slot: bool,
275 pub confirmation_depth_k: NumberFor<CBlock>,
276 pub challenge_period: NumberFor<CBlock>,
277 pub consensus_chain_sync_params:
278 Option<ConsensusChainSyncParams<CBlock, <Block as BlockT>::Header>>,
279 pub domain_backend: Arc<FullBackend<Block>>,
280}
281
282pub async fn new_full<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, RuntimeApi, AccountId, Provider>(
284 domain_params: DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>,
285) -> sc_service::error::Result<
286 NewFull<
287 Arc<FullClient<Block, RuntimeApi>>,
288 RuntimeExecutor,
289 CBlock,
290 CClient,
291 RuntimeApi,
292 AccountId,
293 >,
294>
295where
296 CBlock: BlockT,
297 NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
298 CBlock::Hash: From<Hash> + Into<Hash>,
299 CClient: HeaderBackend<CBlock>
300 + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
301 + BlockBackend<CBlock>
302 + ProofProvider<CBlock>
303 + ProvideRuntimeApi<CBlock>
304 + BlockchainEvents<CBlock>
305 + AuxStore
306 + Send
307 + Sync
308 + 'static,
309 CClient::Api: DomainsApi<CBlock, Header>
310 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>
311 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
312 + BundleProducerElectionApi<CBlock, subspace_runtime_primitives::Balance>
313 + FraudProofApi<CBlock, Header>
314 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
315 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + Unpin + 'static,
316 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + Unpin + 'static,
317 NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static,
318 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
319 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
320 RuntimeApi::RuntimeApi: ApiExt<Block>
321 + Metadata<Block>
322 + BlockBuilder<Block>
323 + OffchainWorkerApi<Block>
324 + SessionKeys<Block>
325 + DomainCoreApi<Block>
326 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
327 + TaggedTransactionQueue<Block>
328 + AccountNonceApi<Block, AccountId, Nonce>
329 + TransactionPaymentRuntimeApi<Block, Balance>
330 + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
331 AccountId: DeserializeOwned
332 + Encode
333 + Decode
334 + Clone
335 + Debug
336 + Display
337 + FromStr
338 + Sync
339 + Send
340 + 'static,
341 Provider: RpcProvider<
342 Block,
343 FullClient<Block, RuntimeApi>,
344 FullPool<RuntimeApi>,
345 FullChainApi<FullClient<Block, RuntimeApi>, Block>,
346 TFullBackend<Block>,
347 AccountId,
348 CreateInherentDataProvider<CClient, CBlock>,
349 > + BlockImportProvider<Block, FullClient<Block, RuntimeApi>>
350 + 'static,
351{
352 let DomainParams {
353 domain_id,
354 maybe_operator_id,
355 mut domain_config,
356 domain_created_at,
357 consensus_client,
358 consensus_offchain_tx_pool_factory,
359 domain_sync_oracle,
360 consensus_network,
361 operator_streams,
362 gossip_message_sink,
363 domain_message_receiver,
364 provider,
365 skip_empty_bundle_production,
366 skip_out_of_order_slot,
367 confirmation_depth_k,
368 consensus_chain_sync_params,
369 challenge_period,
370 domain_backend,
371 } = domain_params;
372
373 let params = new_partial(
377 domain_id,
378 &domain_config,
379 consensus_client.clone(),
380 domain_backend,
381 &provider,
382 confirmation_depth_k,
383 consensus_chain_sync_params.is_some(),
384 )?;
385
386 let (mut telemetry, _telemetry_worker_handle, code_executor, block_import) = params.other;
387
388 let client = params.client.clone();
389 let backend = params.backend.clone();
390
391 let transaction_pool = params.transaction_pool.clone();
392 let mut task_manager = params.task_manager;
393 let net_config = sc_network::config::FullNetworkConfiguration::<_, _, NetworkWorker<_, _>>::new(
394 &domain_config.network,
395 domain_config
396 .prometheus_config
397 .as_ref()
398 .map(|cfg| cfg.registry.clone()),
399 );
400
401 let (
402 network_service,
403 system_rpc_tx,
404 tx_handler_controller,
405 network_starter,
406 sync_service,
407 network_service_handle,
408 block_downloader,
409 ) = build_network(BuildNetworkParams {
410 config: &domain_config,
411 net_config,
412 client: client.clone(),
413 transaction_pool: transaction_pool.clone(),
414 spawn_handle: task_manager.spawn_handle(),
415 import_queue: params.import_queue,
416 block_announce_validator_builder: None,
418 warp_sync_config: None,
419 block_relay: None,
420 metrics: NotificationMetrics::new(
421 domain_config
422 .prometheus_config
423 .as_ref()
424 .map(|cfg| &cfg.registry),
425 ),
426 })?;
427
428 let fork_id = domain_config.chain_spec.fork_id().map(String::from);
429
430 let is_authority = domain_config.role.is_authority();
431 domain_config.rpc.id_provider = provider.rpc_id();
432 let rpc_builder = {
433 let deps = crate::rpc::FullDeps {
434 client: client.clone(),
435 pool: transaction_pool.clone(),
436 graph: transaction_pool.pool().clone(),
437 network: network_service.clone(),
438 sync: sync_service.clone(),
439 is_authority,
440 prometheus_registry: domain_config.prometheus_registry().cloned(),
441 database_source: domain_config.database.clone(),
442 task_spawner: task_manager.spawn_handle(),
443 backend: backend.clone(),
444 create_inherent_data_provider: CreateInherentDataProvider::new(
449 consensus_client.clone(),
450 None,
452 domain_id,
453 ),
454 };
455
456 let spawn_essential = task_manager.spawn_essential_handle();
457 let rpc_deps = provider.deps(deps)?;
458 Box::new(move |subscription_task_executor| {
459 let spawn_essential = spawn_essential.clone();
460 provider
461 .rpc_builder(
462 rpc_deps.clone(),
463 subscription_task_executor,
464 spawn_essential,
465 )
466 .map_err(Into::into)
467 })
468 };
469
470 let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
471 rpc_builder,
472 client: client.clone(),
473 transaction_pool: transaction_pool.clone(),
474 task_manager: &mut task_manager,
475 config: domain_config,
476 keystore: params.keystore_container.keystore(),
477 backend: backend.clone(),
478 network: network_service.clone(),
479 system_rpc_tx,
480 tx_handler_controller,
481 sync_service: sync_service.clone(),
482 telemetry: telemetry.as_mut(),
483 })?;
484
485 let spawn_essential = task_manager.spawn_essential_handle();
486 let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
487
488 let operator = Operator::new(
489 Box::new(spawn_essential.clone()),
490 OperatorParams {
491 domain_id,
492 domain_created_at,
493 consensus_client: consensus_client.clone(),
494 consensus_offchain_tx_pool_factory,
495 domain_sync_oracle: domain_sync_oracle.clone(),
496 client: client.clone(),
497 transaction_pool: transaction_pool.clone(),
498 backend: backend.clone(),
499 code_executor: code_executor.clone(),
500 maybe_operator_id,
501 keystore: params.keystore_container.keystore(),
502 bundle_sender: Arc::new(bundle_sender),
503 operator_streams,
504 consensus_confirmation_depth_k: confirmation_depth_k,
505 block_import: Arc::new(block_import),
506 skip_empty_bundle_production,
507 skip_out_of_order_slot,
508 sync_service: sync_service.clone(),
509 network_service: Arc::clone(&network_service),
510 block_downloader,
511 consensus_chain_sync_params,
512 domain_fork_id: fork_id,
513 domain_network_service_handle: network_service_handle,
514 challenge_period,
515 },
516 )
517 .await?;
518
519 if is_authority {
520 let relayer_worker = domain_client_message_relayer::worker::start_relaying_messages(
521 domain_id,
522 consensus_client.clone(),
523 client.clone(),
524 confirmation_depth_k,
525 domain_sync_oracle.clone(),
528 gossip_message_sink.clone(),
529 );
530
531 spawn_essential.spawn_essential_blocking("domain-relayer", None, Box::pin(relayer_worker));
532
533 let channel_update_worker =
534 domain_client_message_relayer::worker::gossip_channel_updates::<_, _, CBlock, _>(
535 ChainId::Domain(domain_id),
536 client.clone(),
537 domain_sync_oracle.clone(),
540 gossip_message_sink,
541 );
542
543 spawn_essential.spawn_essential_blocking(
544 "domain-channel-update-worker",
545 None,
546 Box::pin(channel_update_worker),
547 );
548 }
549
550 let domain_listener =
552 cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
553 ChainId::Domain(domain_id),
554 consensus_client.clone(),
555 client.clone(),
556 params.transaction_pool.clone(),
557 consensus_network,
558 domain_message_receiver,
559 code_executor.clone(),
560 domain_sync_oracle,
561 );
562
563 spawn_essential.spawn_essential_blocking(
564 "domain-message-listener",
565 None,
566 Box::pin(domain_listener),
567 );
568
569 let new_full = NewFull {
570 task_manager,
571 client,
572 backend,
573 code_executor,
574 network_service,
575 sync_service,
576 rpc_handlers,
577 network_starter,
578 operator,
579 transaction_pool: params.transaction_pool,
580 _phantom_data: Default::default(),
581 };
582
583 Ok(new_full)
584}