1use crate::network::build_network;
2use crate::providers::{BlockImportProvider, RpcProvider};
3use crate::{FullBackend, FullClient};
4use cross_domain_message_gossip::ChainMsg;
5use domain_block_builder::CustomGenesisBlockBuilder;
6use domain_block_preprocessor::inherents::CreateInherentDataProvider;
7use domain_client_message_relayer::GossipMessageSink;
8use domain_client_operator::snap_sync::ConsensusChainSyncParams;
9use domain_client_operator::{Operator, OperatorParams, OperatorStreams};
10use domain_runtime_primitives::opaque::{Block, Header};
11use domain_runtime_primitives::{Balance, Hash};
12use futures::Stream;
13use futures::channel::mpsc;
14use pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi;
15use sc_client_api::{
16 AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider,
17 ProofProvider,
18};
19use sc_consensus::{BasicQueue, BoxBlockImport};
20use sc_domains::{ExtensionsFactory, RuntimeExecutor};
21use sc_network::service::traits::NetworkService;
22use sc_network::{NetworkPeers, NetworkWorker, NotificationMetrics};
23use sc_service::{
24 BuildNetworkParams, Configuration as ServiceConfiguration, NetworkStarter, PartialComponents,
25 SpawnTasksParams, TFullBackend, TaskManager,
26};
27use sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle};
28use sc_transaction_pool::{BasicPool, FullChainApi};
29use sc_transaction_pool_api::OffchainTransactionPoolFactory;
30use sc_utils::mpsc::{TracingUnboundedReceiver, tracing_unbounded};
31use serde::de::DeserializeOwned;
32use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
33use sp_block_builder::BlockBuilder;
34use sp_blockchain::{HeaderBackend, HeaderMetadata};
35use sp_consensus::SyncOracle;
36use sp_consensus_slots::Slot;
37use sp_core::traits::SpawnEssentialNamed;
38use sp_core::{Decode, Encode, H256};
39use sp_domains::core_api::DomainCoreApi;
40use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId};
41use sp_domains_fraud_proof::FraudProofApi;
42use sp_messenger::messages::ChainId;
43use sp_messenger::{MessengerApi, RelayerApi};
44use sp_mmr_primitives::MmrApi;
45use sp_offchain::OffchainWorkerApi;
46use sp_runtime::traits::{Block as BlockT, NumberFor};
47use sp_session::SessionKeys;
48use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
49use std::fmt::{Debug, Display};
50use std::marker::PhantomData;
51use std::str::FromStr;
52use std::sync::Arc;
53use subspace_core_primitives::pot::PotOutput;
54use subspace_runtime_primitives::{HeaderFor, Nonce};
55use substrate_frame_rpc_system::AccountNonceApi;
56
57pub type DomainOperator<Block, CBlock, CClient, RuntimeApi> = Operator<
58 Block,
59 CBlock,
60 FullClient<Block, RuntimeApi>,
61 CClient,
62 FullPool<RuntimeApi>,
63 FullBackend<Block>,
64 RuntimeExecutor,
65>;
66
67pub struct NewFull<C, CodeExecutor, CBlock, CClient, RuntimeApi, AccountId>
69where
70 Block: BlockT,
71 CBlock: BlockT,
72 NumberFor<CBlock>: From<NumberFor<Block>>,
73 CBlock::Hash: From<Hash>,
74 CClient: HeaderBackend<CBlock>
75 + BlockBackend<CBlock>
76 + ProvideRuntimeApi<CBlock>
77 + Send
78 + Sync
79 + 'static,
80 CClient::Api:
81 DomainsApi<CBlock, Header> + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>,
82 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
83 RuntimeApi::RuntimeApi: ApiExt<Block>
84 + Metadata<Block>
85 + AccountNonceApi<Block, AccountId, Nonce>
86 + BlockBuilder<Block>
87 + OffchainWorkerApi<Block>
88 + SessionKeys<Block>
89 + TaggedTransactionQueue<Block>
90 + TransactionPaymentRuntimeApi<Block, Balance>
91 + DomainCoreApi<Block>
92 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
93 + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
94 AccountId: Encode + Decode,
95{
96 pub task_manager: TaskManager,
98 pub client: C,
100 pub backend: Arc<FullBackend<Block>>,
102 pub code_executor: Arc<CodeExecutor>,
104 pub network_service: Arc<dyn NetworkService>,
106 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
108 pub rpc_handlers: sc_service::RpcHandlers,
110 pub network_starter: NetworkStarter,
112 pub operator: DomainOperator<Block, CBlock, CClient, RuntimeApi>,
114 pub transaction_pool: Arc<FullPool<RuntimeApi>>,
116
117 _phantom_data: PhantomData<AccountId>,
118}
119
120pub type FullPool<RuntimeApi> =
122 BasicPool<FullChainApi<FullClient<Block, RuntimeApi>, Block>, Block>;
123
124#[allow(clippy::type_complexity)]
126#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
127fn new_partial<RuntimeApi, CBlock, CClient, BIMP>(
128 domain_id: DomainId,
129 config: &ServiceConfiguration,
130 consensus_client: Arc<CClient>,
131 domain_backend: Arc<FullBackend<Block>>,
132 block_import_provider: &BIMP,
133 confirmation_depth_k: NumberFor<CBlock>,
134 snap_sync: bool,
135) -> Result<
136 PartialComponents<
137 FullClient<Block, RuntimeApi>,
138 FullBackend<Block>,
139 (),
140 sc_consensus::DefaultImportQueue<Block>,
141 FullPool<RuntimeApi>,
142 (
143 Option<Telemetry>,
144 Option<TelemetryWorkerHandle>,
145 Arc<RuntimeExecutor>,
146 BoxBlockImport<Block>,
147 ),
148 >,
149 sc_service::Error,
150>
151where
152 CBlock: BlockT,
153 NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
154 CBlock::Hash: From<Hash> + Into<Hash>,
155 CClient: HeaderBackend<CBlock>
156 + BlockBackend<CBlock>
157 + ProvideRuntimeApi<CBlock>
158 + Send
159 + Sync
160 + 'static,
161 CClient::Api: DomainsApi<CBlock, Header>
162 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
163 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
164 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
165 RuntimeApi::RuntimeApi: TaggedTransactionQueue<Block>
166 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
167 + ApiExt<Block>,
168 BIMP: BlockImportProvider<Block, FullClient<Block, RuntimeApi>>,
169{
170 let telemetry = config
171 .telemetry_endpoints
172 .clone()
173 .filter(|x| !x.is_empty())
174 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
175 let worker = TelemetryWorker::new(16)?;
176 let telemetry = worker.handle().new_telemetry(endpoints);
177 Ok((worker, telemetry))
178 })
179 .transpose()?;
180
181 let executor = sc_service::new_wasm_executor(&config.executor);
182
183 let genesis_block_builder = CustomGenesisBlockBuilder::<_, CBlock, _, _, _>::new(
184 domain_id,
185 consensus_client.clone(),
186 config.chain_spec.as_storage_builder(),
187 !snap_sync,
188 domain_backend.clone(),
189 executor.clone(),
190 )?;
191
192 let (client, backend, keystore_container, task_manager) =
193 sc_service::new_full_parts_with_genesis_builder(
194 config,
195 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
196 executor.clone(),
197 domain_backend,
198 genesis_block_builder,
199 false,
200 )?;
201
202 let client = Arc::new(client);
203
204 let executor = Arc::new(executor);
205 client.execution_extensions().set_extensions_factory(
206 ExtensionsFactory::<_, CBlock, Block, _>::new(
207 consensus_client.clone(),
208 executor.clone(),
209 confirmation_depth_k.into(),
210 ),
211 );
212
213 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
214
215 let telemetry = telemetry.map(|(worker, telemetry)| {
216 task_manager
217 .spawn_handle()
218 .spawn("telemetry", None, worker.run());
219 telemetry
220 });
221
222 let transaction_pool = Arc::from(BasicPool::new_full(
223 Default::default(),
224 config.role.is_authority().into(),
225 config.prometheus_registry(),
226 task_manager.spawn_essential_handle(),
227 client.clone(),
228 ));
229
230 let import_queue = BasicQueue::new(
231 domain_client_consensus_relay_chain::Verifier::default(),
232 Box::new(block_import_provider.block_import(client.clone())),
233 None,
234 &task_manager.spawn_essential_handle(),
235 config.prometheus_registry(),
236 );
237
238 let params = PartialComponents {
239 backend,
240 client: client.clone(),
241 import_queue,
242 keystore_container,
243 task_manager,
244 transaction_pool,
245 select_chain: (),
246 other: (
247 telemetry,
248 telemetry_worker_handle,
249 executor,
250 Box::new(block_import_provider.block_import(client)) as Box<_>,
251 ),
252 };
253
254 Ok(params)
255}
256
257pub struct DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>
258where
259 CBlock: BlockT,
260{
261 pub domain_id: DomainId,
262 pub domain_config: ServiceConfiguration,
263 pub domain_created_at: NumberFor<CBlock>,
264 pub maybe_operator_id: Option<OperatorId>,
265 pub consensus_client: Arc<CClient>,
266 pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
267 pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
268 pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
269 pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
270 pub gossip_message_sink: GossipMessageSink,
271 pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
272 pub provider: Provider,
273 pub skip_empty_bundle_production: bool,
274 pub skip_out_of_order_slot: bool,
275 pub confirmation_depth_k: NumberFor<CBlock>,
276 pub challenge_period: NumberFor<CBlock>,
277 pub consensus_chain_sync_params: Option<ConsensusChainSyncParams<CBlock, HeaderFor<Block>>>,
278 pub domain_backend: Arc<FullBackend<Block>>,
279}
280
281pub async fn new_full<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, RuntimeApi, AccountId, Provider>(
283 domain_params: DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>,
284) -> sc_service::error::Result<
285 NewFull<
286 Arc<FullClient<Block, RuntimeApi>>,
287 RuntimeExecutor,
288 CBlock,
289 CClient,
290 RuntimeApi,
291 AccountId,
292 >,
293>
294where
295 CBlock: BlockT,
296 NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
297 CBlock::Hash: From<Hash> + Into<Hash>,
298 CClient: HeaderBackend<CBlock>
299 + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
300 + BlockBackend<CBlock>
301 + ProofProvider<CBlock>
302 + ProvideRuntimeApi<CBlock>
303 + BlockchainEvents<CBlock>
304 + AuxStore
305 + Send
306 + Sync
307 + 'static,
308 CClient::Api: DomainsApi<CBlock, Header>
309 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>
310 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
311 + BundleProducerElectionApi<CBlock, subspace_runtime_primitives::Balance>
312 + FraudProofApi<CBlock, Header>
313 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
314 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + Unpin + 'static,
315 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + Unpin + 'static,
316 NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static,
317 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
318 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
319 RuntimeApi::RuntimeApi: ApiExt<Block>
320 + Metadata<Block>
321 + BlockBuilder<Block>
322 + OffchainWorkerApi<Block>
323 + SessionKeys<Block>
324 + DomainCoreApi<Block>
325 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
326 + TaggedTransactionQueue<Block>
327 + AccountNonceApi<Block, AccountId, Nonce>
328 + TransactionPaymentRuntimeApi<Block, Balance>
329 + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
330 AccountId: DeserializeOwned
331 + Encode
332 + Decode
333 + Clone
334 + Debug
335 + Display
336 + FromStr
337 + Sync
338 + Send
339 + 'static,
340 Provider: RpcProvider<
341 Block,
342 FullClient<Block, RuntimeApi>,
343 FullPool<RuntimeApi>,
344 FullChainApi<FullClient<Block, RuntimeApi>, Block>,
345 TFullBackend<Block>,
346 AccountId,
347 CreateInherentDataProvider<CClient, CBlock>,
348 > + BlockImportProvider<Block, FullClient<Block, RuntimeApi>>
349 + 'static,
350{
351 let DomainParams {
352 domain_id,
353 maybe_operator_id,
354 mut domain_config,
355 domain_created_at,
356 consensus_client,
357 consensus_offchain_tx_pool_factory,
358 domain_sync_oracle,
359 consensus_network,
360 operator_streams,
361 gossip_message_sink,
362 domain_message_receiver,
363 provider,
364 skip_empty_bundle_production,
365 skip_out_of_order_slot,
366 confirmation_depth_k,
367 consensus_chain_sync_params,
368 challenge_period,
369 domain_backend,
370 } = domain_params;
371
372 let params = new_partial(
376 domain_id,
377 &domain_config,
378 consensus_client.clone(),
379 domain_backend,
380 &provider,
381 confirmation_depth_k,
382 consensus_chain_sync_params.is_some(),
383 )?;
384
385 let (mut telemetry, _telemetry_worker_handle, code_executor, block_import) = params.other;
386
387 let client = params.client.clone();
388 let backend = params.backend.clone();
389
390 let transaction_pool = params.transaction_pool.clone();
391 let mut task_manager = params.task_manager;
392 let net_config = sc_network::config::FullNetworkConfiguration::<_, _, NetworkWorker<_, _>>::new(
393 &domain_config.network,
394 domain_config
395 .prometheus_config
396 .as_ref()
397 .map(|cfg| cfg.registry.clone()),
398 );
399
400 let (
401 network_service,
402 system_rpc_tx,
403 tx_handler_controller,
404 network_starter,
405 sync_service,
406 network_service_handle,
407 block_downloader,
408 ) = build_network(BuildNetworkParams {
409 config: &domain_config,
410 net_config,
411 client: client.clone(),
412 transaction_pool: transaction_pool.clone(),
413 spawn_handle: task_manager.spawn_handle(),
414 import_queue: params.import_queue,
415 block_announce_validator_builder: None,
417 warp_sync_config: None,
418 block_relay: None,
419 metrics: NotificationMetrics::new(
420 domain_config
421 .prometheus_config
422 .as_ref()
423 .map(|cfg| &cfg.registry),
424 ),
425 })?;
426
427 let fork_id = domain_config.chain_spec.fork_id().map(String::from);
428
429 let is_authority = domain_config.role.is_authority();
430 domain_config.rpc.id_provider = provider.rpc_id();
431 let rpc_builder = {
432 let deps = crate::rpc::FullDeps {
433 client: client.clone(),
434 pool: transaction_pool.clone(),
435 graph: transaction_pool.pool().clone(),
436 network: network_service.clone(),
437 sync: sync_service.clone(),
438 is_authority,
439 prometheus_registry: domain_config.prometheus_registry().cloned(),
440 database_source: domain_config.database.clone(),
441 task_spawner: task_manager.spawn_handle(),
442 backend: backend.clone(),
443 create_inherent_data_provider: CreateInherentDataProvider::new(
448 consensus_client.clone(),
449 None,
451 domain_id,
452 ),
453 };
454
455 let spawn_essential = task_manager.spawn_essential_handle();
456 let rpc_deps = provider.deps(deps)?;
457 Box::new(move |subscription_task_executor| {
458 let spawn_essential = spawn_essential.clone();
459 provider
460 .rpc_builder(
461 rpc_deps.clone(),
462 subscription_task_executor,
463 spawn_essential,
464 )
465 .map_err(Into::into)
466 })
467 };
468
469 let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
470 rpc_builder,
471 client: client.clone(),
472 transaction_pool: transaction_pool.clone(),
473 task_manager: &mut task_manager,
474 config: domain_config,
475 keystore: params.keystore_container.keystore(),
476 backend: backend.clone(),
477 network: network_service.clone(),
478 system_rpc_tx,
479 tx_handler_controller,
480 sync_service: sync_service.clone(),
481 telemetry: telemetry.as_mut(),
482 })?;
483
484 let spawn_essential = task_manager.spawn_essential_handle();
485 let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
486
487 let operator = Operator::new(
488 Box::new(spawn_essential.clone()),
489 OperatorParams {
490 domain_id,
491 domain_created_at,
492 consensus_client: consensus_client.clone(),
493 consensus_offchain_tx_pool_factory,
494 domain_sync_oracle: domain_sync_oracle.clone(),
495 client: client.clone(),
496 transaction_pool: transaction_pool.clone(),
497 backend: backend.clone(),
498 code_executor: code_executor.clone(),
499 maybe_operator_id,
500 keystore: params.keystore_container.keystore(),
501 bundle_sender: Arc::new(bundle_sender),
502 operator_streams,
503 consensus_confirmation_depth_k: confirmation_depth_k,
504 block_import: Arc::new(block_import),
505 skip_empty_bundle_production,
506 skip_out_of_order_slot,
507 sync_service: sync_service.clone(),
508 network_service: Arc::clone(&network_service),
509 block_downloader,
510 consensus_chain_sync_params,
511 domain_fork_id: fork_id,
512 domain_network_service_handle: network_service_handle,
513 challenge_period,
514 },
515 )
516 .await?;
517
518 if is_authority {
519 let relayer_worker = domain_client_message_relayer::worker::start_relaying_messages(
520 domain_id,
521 consensus_client.clone(),
522 client.clone(),
523 confirmation_depth_k,
524 domain_sync_oracle.clone(),
527 gossip_message_sink.clone(),
528 );
529
530 spawn_essential.spawn_essential_blocking("domain-relayer", None, Box::pin(relayer_worker));
531
532 let channel_update_worker =
533 domain_client_message_relayer::worker::gossip_channel_updates::<_, _, CBlock, _>(
534 ChainId::Domain(domain_id),
535 client.clone(),
536 domain_sync_oracle.clone(),
539 gossip_message_sink,
540 );
541
542 spawn_essential.spawn_essential_blocking(
543 "domain-channel-update-worker",
544 None,
545 Box::pin(channel_update_worker),
546 );
547 }
548
549 let domain_listener =
551 cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
552 ChainId::Domain(domain_id),
553 consensus_client.clone(),
554 client.clone(),
555 params.transaction_pool.clone(),
556 consensus_network,
557 domain_message_receiver,
558 code_executor.clone(),
559 domain_sync_oracle,
560 );
561
562 spawn_essential.spawn_essential_blocking(
563 "domain-message-listener",
564 None,
565 Box::pin(domain_listener),
566 );
567
568 let new_full = NewFull {
569 task_manager,
570 client,
571 backend,
572 code_executor,
573 network_service,
574 sync_service,
575 rpc_handlers,
576 network_starter,
577 operator,
578 transaction_pool: params.transaction_pool,
579 _phantom_data: Default::default(),
580 };
581
582 Ok(new_full)
583}