1use crate::network::build_network;
2use crate::providers::{BlockImportProvider, RpcProvider};
3use crate::{FullBackend, FullClient};
4use cross_domain_message_gossip::ChainMsg;
5use domain_block_builder::CustomGenesisBlockBuilder;
6use domain_block_preprocessor::inherents::CreateInherentDataProvider;
7use domain_client_message_relayer::GossipMessageSink;
8use domain_client_operator::snap_sync::ConsensusChainSyncParams;
9use domain_client_operator::{Operator, OperatorParams, OperatorStreams};
10use domain_runtime_primitives::opaque::{Block, Header};
11use domain_runtime_primitives::{Balance, Hash};
12use futures::Stream;
13use futures::channel::mpsc;
14use pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi;
15use sc_client_api::{
16 AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider,
17 ProofProvider,
18};
19use sc_consensus::{BasicQueue, BoxBlockImport};
20use sc_domains::{ExtensionsFactory, RuntimeExecutor};
21use sc_network::service::traits::NetworkService;
22use sc_network::{NetworkPeers, NetworkWorker, NotificationMetrics};
23use sc_service::{
24 BuildNetworkParams, Configuration as ServiceConfiguration, PartialComponents, SpawnTasksParams,
25 TFullBackend, TaskManager,
26};
27use sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle};
28use sc_transaction_pool::{BasicPool, FullChainApi};
29use sc_transaction_pool_api::OffchainTransactionPoolFactory;
30use sc_utils::mpsc::{TracingUnboundedReceiver, tracing_unbounded};
31use serde::de::DeserializeOwned;
32use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
33use sp_block_builder::BlockBuilder;
34use sp_blockchain::{HeaderBackend, HeaderMetadata};
35use sp_consensus::SyncOracle;
36use sp_consensus_slots::Slot;
37use sp_core::traits::SpawnEssentialNamed;
38use sp_core::{Decode, Encode, H256};
39use sp_domains::core_api::DomainCoreApi;
40use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId};
41use sp_domains_fraud_proof::FraudProofApi;
42use sp_messenger::messages::ChainId;
43use sp_messenger::{MessengerApi, RelayerApi};
44use sp_mmr_primitives::MmrApi;
45use sp_offchain::OffchainWorkerApi;
46use sp_runtime::traits::{Block as BlockT, NumberFor};
47use sp_session::SessionKeys;
48use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
49use std::fmt::{Debug, Display};
50use std::marker::PhantomData;
51use std::str::FromStr;
52use std::sync::Arc;
53use subspace_core_primitives::pot::PotOutput;
54use subspace_runtime_primitives::{HeaderFor, Nonce};
55use substrate_frame_rpc_system::AccountNonceApi;
56
57pub type DomainOperator<Block, CBlock, CClient, RuntimeApi> = Operator<
58 Block,
59 CBlock,
60 FullClient<Block, RuntimeApi>,
61 CClient,
62 FullPool<RuntimeApi>,
63 FullBackend<Block>,
64 RuntimeExecutor,
65>;
66
67pub struct NewFull<C, CodeExecutor, CBlock, CClient, RuntimeApi, AccountId>
69where
70 Block: BlockT,
71 CBlock: BlockT,
72 NumberFor<CBlock>: From<NumberFor<Block>>,
73 CBlock::Hash: From<Hash>,
74 CClient: HeaderBackend<CBlock>
75 + BlockBackend<CBlock>
76 + ProvideRuntimeApi<CBlock>
77 + Send
78 + Sync
79 + 'static,
80 CClient::Api:
81 DomainsApi<CBlock, Header> + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>,
82 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
83 RuntimeApi::RuntimeApi: ApiExt<Block>
84 + Metadata<Block>
85 + AccountNonceApi<Block, AccountId, Nonce>
86 + BlockBuilder<Block>
87 + OffchainWorkerApi<Block>
88 + SessionKeys<Block>
89 + TaggedTransactionQueue<Block>
90 + TransactionPaymentRuntimeApi<Block, Balance>
91 + DomainCoreApi<Block>
92 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
93 + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
94 AccountId: Encode + Decode,
95{
96 pub task_manager: TaskManager,
98 pub client: C,
100 pub backend: Arc<FullBackend<Block>>,
102 pub code_executor: Arc<CodeExecutor>,
104 pub network_service: Arc<dyn NetworkService>,
106 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
108 pub rpc_handlers: sc_service::RpcHandlers,
110 pub operator: DomainOperator<Block, CBlock, CClient, RuntimeApi>,
112 pub transaction_pool: Arc<FullPool<RuntimeApi>>,
114
115 _phantom_data: PhantomData<AccountId>,
116}
117
118pub type FullPool<RuntimeApi> =
120 BasicPool<FullChainApi<FullClient<Block, RuntimeApi>, Block>, Block>;
121
122#[allow(clippy::type_complexity)]
124#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
125fn new_partial<RuntimeApi, CBlock, CClient, BIMP>(
126 domain_id: DomainId,
127 config: &ServiceConfiguration,
128 consensus_client: Arc<CClient>,
129 domain_backend: Arc<FullBackend<Block>>,
130 block_import_provider: &BIMP,
131 confirmation_depth_k: NumberFor<CBlock>,
132 snap_sync: bool,
133) -> Result<
134 PartialComponents<
135 FullClient<Block, RuntimeApi>,
136 FullBackend<Block>,
137 (),
138 sc_consensus::DefaultImportQueue<Block>,
139 FullPool<RuntimeApi>,
140 (
141 Option<Telemetry>,
142 Option<TelemetryWorkerHandle>,
143 Arc<RuntimeExecutor>,
144 BoxBlockImport<Block>,
145 ),
146 >,
147 sc_service::Error,
148>
149where
150 CBlock: BlockT,
151 NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
152 CBlock::Hash: From<Hash> + Into<Hash>,
153 CClient: HeaderBackend<CBlock>
154 + BlockBackend<CBlock>
155 + ProvideRuntimeApi<CBlock>
156 + Send
157 + Sync
158 + 'static,
159 CClient::Api: DomainsApi<CBlock, Header>
160 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
161 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
162 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
163 RuntimeApi::RuntimeApi: TaggedTransactionQueue<Block>
164 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
165 + ApiExt<Block>,
166 BIMP: BlockImportProvider<Block, FullClient<Block, RuntimeApi>>,
167{
168 let telemetry = config
169 .telemetry_endpoints
170 .clone()
171 .filter(|x| !x.is_empty())
172 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
173 let worker = TelemetryWorker::new(16)?;
174 let telemetry = worker.handle().new_telemetry(endpoints);
175 Ok((worker, telemetry))
176 })
177 .transpose()?;
178
179 let executor = sc_service::new_wasm_executor(&config.executor);
180
181 let genesis_block_builder = CustomGenesisBlockBuilder::<_, CBlock, _, _, _>::new(
182 domain_id,
183 consensus_client.clone(),
184 config.chain_spec.as_storage_builder(),
185 !snap_sync,
186 domain_backend.clone(),
187 executor.clone(),
188 )?;
189
190 let (client, backend, keystore_container, task_manager) =
191 sc_service::new_full_parts_with_genesis_builder(
192 config,
193 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
194 executor.clone(),
195 domain_backend,
196 genesis_block_builder,
197 false,
198 )?;
199
200 let client = Arc::new(client);
201
202 let executor = Arc::new(executor);
203 client.execution_extensions().set_extensions_factory(
204 ExtensionsFactory::<_, CBlock, Block, _>::new(
205 consensus_client.clone(),
206 executor.clone(),
207 confirmation_depth_k.into(),
208 ),
209 );
210
211 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
212
213 let telemetry = telemetry.map(|(worker, telemetry)| {
214 task_manager
215 .spawn_handle()
216 .spawn("telemetry", None, worker.run());
217 telemetry
218 });
219
220 let transaction_pool = Arc::from(BasicPool::new_full(
221 Default::default(),
222 config.role.is_authority().into(),
223 config.prometheus_registry(),
224 task_manager.spawn_essential_handle(),
225 client.clone(),
226 ));
227
228 let import_queue = BasicQueue::new(
229 domain_client_consensus_relay_chain::Verifier::default(),
230 Box::new(block_import_provider.block_import(client.clone())),
231 None,
232 &task_manager.spawn_essential_handle(),
233 config.prometheus_registry(),
234 );
235
236 let params = PartialComponents {
237 backend,
238 client: client.clone(),
239 import_queue,
240 keystore_container,
241 task_manager,
242 transaction_pool,
243 select_chain: (),
244 other: (
245 telemetry,
246 telemetry_worker_handle,
247 executor,
248 Box::new(block_import_provider.block_import(client)) as Box<_>,
249 ),
250 };
251
252 Ok(params)
253}
254
255pub struct DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>
256where
257 CBlock: BlockT,
258{
259 pub domain_id: DomainId,
260 pub domain_config: ServiceConfiguration,
261 pub domain_created_at: NumberFor<CBlock>,
262 pub maybe_operator_id: Option<OperatorId>,
263 pub consensus_client: Arc<CClient>,
264 pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
265 pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
266 pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
267 pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
268 pub gossip_message_sink: GossipMessageSink,
269 pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
270 pub provider: Provider,
271 pub skip_empty_bundle_production: bool,
272 pub skip_out_of_order_slot: bool,
273 pub confirmation_depth_k: NumberFor<CBlock>,
274 pub challenge_period: NumberFor<CBlock>,
275 pub consensus_chain_sync_params: Option<ConsensusChainSyncParams<CBlock, HeaderFor<Block>>>,
276 pub domain_backend: Arc<FullBackend<Block>>,
277}
278
279pub async fn new_full<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, RuntimeApi, AccountId, Provider>(
281 domain_params: DomainParams<CBlock, CClient, IBNS, CIBNS, NSNS, ASS, Provider>,
282) -> sc_service::error::Result<
283 NewFull<
284 Arc<FullClient<Block, RuntimeApi>>,
285 RuntimeExecutor,
286 CBlock,
287 CClient,
288 RuntimeApi,
289 AccountId,
290 >,
291>
292where
293 CBlock: BlockT,
294 NumberFor<CBlock>: From<NumberFor<Block>> + Into<u32>,
295 CBlock::Hash: From<Hash> + Into<Hash>,
296 CClient: HeaderBackend<CBlock>
297 + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
298 + BlockBackend<CBlock>
299 + ProofProvider<CBlock>
300 + ProvideRuntimeApi<CBlock>
301 + BlockchainEvents<CBlock>
302 + AuxStore
303 + Send
304 + Sync
305 + 'static,
306 CClient::Api: DomainsApi<CBlock, Header>
307 + RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>
308 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
309 + BundleProducerElectionApi<CBlock, subspace_runtime_primitives::Balance>
310 + FraudProofApi<CBlock, Header>
311 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
312 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + Unpin + 'static,
313 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + Unpin + 'static,
314 NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static,
315 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
316 RuntimeApi: ConstructRuntimeApi<Block, FullClient<Block, RuntimeApi>> + Send + Sync + 'static,
317 RuntimeApi::RuntimeApi: ApiExt<Block>
318 + Metadata<Block>
319 + BlockBuilder<Block>
320 + OffchainWorkerApi<Block>
321 + SessionKeys<Block>
322 + DomainCoreApi<Block>
323 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
324 + TaggedTransactionQueue<Block>
325 + AccountNonceApi<Block, AccountId, Nonce>
326 + TransactionPaymentRuntimeApi<Block, Balance>
327 + RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
328 AccountId: DeserializeOwned
329 + Encode
330 + Decode
331 + Clone
332 + Debug
333 + Display
334 + FromStr
335 + Sync
336 + Send
337 + 'static,
338 Provider: RpcProvider<
339 Block,
340 FullClient<Block, RuntimeApi>,
341 FullPool<RuntimeApi>,
342 TFullBackend<Block>,
343 AccountId,
344 CreateInherentDataProvider<CClient, CBlock>,
345 > + BlockImportProvider<Block, FullClient<Block, RuntimeApi>>
346 + 'static,
347{
348 let DomainParams {
349 domain_id,
350 maybe_operator_id,
351 mut domain_config,
352 domain_created_at,
353 consensus_client,
354 consensus_offchain_tx_pool_factory,
355 domain_sync_oracle,
356 consensus_network,
357 operator_streams,
358 gossip_message_sink,
359 domain_message_receiver,
360 provider,
361 skip_empty_bundle_production,
362 skip_out_of_order_slot,
363 confirmation_depth_k,
364 consensus_chain_sync_params,
365 challenge_period,
366 domain_backend,
367 } = domain_params;
368
369 let params = new_partial(
373 domain_id,
374 &domain_config,
375 consensus_client.clone(),
376 domain_backend,
377 &provider,
378 confirmation_depth_k,
379 consensus_chain_sync_params.is_some(),
380 )?;
381
382 let (mut telemetry, _telemetry_worker_handle, code_executor, block_import) = params.other;
383
384 let client = params.client.clone();
385 let backend = params.backend.clone();
386
387 let transaction_pool = params.transaction_pool.clone();
388 let mut task_manager = params.task_manager;
389 let net_config = sc_network::config::FullNetworkConfiguration::<_, _, NetworkWorker<_, _>>::new(
390 &domain_config.network,
391 domain_config
392 .prometheus_config
393 .as_ref()
394 .map(|cfg| cfg.registry.clone()),
395 );
396
397 let (
398 network_service,
399 system_rpc_tx,
400 tx_handler_controller,
401 sync_service,
402 network_service_handle,
403 block_downloader,
404 ) = build_network(BuildNetworkParams {
405 config: &domain_config,
406 net_config,
407 client: client.clone(),
408 transaction_pool: transaction_pool.clone(),
409 spawn_handle: task_manager.spawn_handle(),
410 import_queue: params.import_queue,
411 block_announce_validator_builder: None,
413 warp_sync_config: None,
414 block_relay: None,
415 metrics: NotificationMetrics::new(
416 domain_config
417 .prometheus_config
418 .as_ref()
419 .map(|cfg| &cfg.registry),
420 ),
421 })?;
422
423 let fork_id = domain_config.chain_spec.fork_id().map(String::from);
424
425 let is_authority = domain_config.role.is_authority();
426 domain_config.rpc.id_provider = provider.rpc_id();
427 let rpc_builder = {
428 let deps = crate::rpc::FullDeps {
429 client: client.clone(),
430 pool: transaction_pool.clone(),
431 network: network_service.clone(),
432 sync: sync_service.clone(),
433 is_authority,
434 prometheus_registry: domain_config.prometheus_registry().cloned(),
435 database_source: domain_config.database.clone(),
436 task_spawner: task_manager.spawn_handle(),
437 backend: backend.clone(),
438 create_inherent_data_provider: CreateInherentDataProvider::new(
443 consensus_client.clone(),
444 None,
446 domain_id,
447 ),
448 };
449
450 let spawn_essential = task_manager.spawn_essential_handle();
451 let rpc_deps = provider.deps(deps)?;
452 Box::new(move |subscription_task_executor| {
453 let spawn_essential = spawn_essential.clone();
454 provider
455 .rpc_builder(
456 rpc_deps.clone(),
457 subscription_task_executor,
458 spawn_essential,
459 )
460 .map_err(Into::into)
461 })
462 };
463
464 let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
465 rpc_builder,
466 client: client.clone(),
467 transaction_pool: transaction_pool.clone(),
468 task_manager: &mut task_manager,
469 config: domain_config,
470 keystore: params.keystore_container.keystore(),
471 backend: backend.clone(),
472 network: network_service.clone(),
473 system_rpc_tx,
474 tx_handler_controller,
475 sync_service: sync_service.clone(),
476 telemetry: telemetry.as_mut(),
477 })?;
478
479 let spawn_essential = task_manager.spawn_essential_handle();
480 let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
481
482 let operator = Operator::new(
483 Box::new(spawn_essential.clone()),
484 OperatorParams {
485 domain_id,
486 domain_created_at,
487 consensus_client: consensus_client.clone(),
488 consensus_offchain_tx_pool_factory,
489 domain_sync_oracle: domain_sync_oracle.clone(),
490 client: client.clone(),
491 transaction_pool: transaction_pool.clone(),
492 backend: backend.clone(),
493 code_executor: code_executor.clone(),
494 maybe_operator_id,
495 keystore: params.keystore_container.keystore(),
496 bundle_sender: Arc::new(bundle_sender),
497 operator_streams,
498 consensus_confirmation_depth_k: confirmation_depth_k,
499 block_import: Arc::new(block_import),
500 skip_empty_bundle_production,
501 skip_out_of_order_slot,
502 sync_service: sync_service.clone(),
503 network_service: Arc::clone(&network_service),
504 block_downloader,
505 consensus_chain_sync_params,
506 domain_fork_id: fork_id,
507 domain_network_service_handle: network_service_handle,
508 challenge_period,
509 },
510 )
511 .await?;
512
513 if is_authority {
514 let relayer_worker = domain_client_message_relayer::worker::start_relaying_messages(
515 domain_id,
516 consensus_client.clone(),
517 client.clone(),
518 confirmation_depth_k,
519 domain_sync_oracle.clone(),
522 gossip_message_sink.clone(),
523 );
524
525 spawn_essential.spawn_essential_blocking("domain-relayer", None, Box::pin(relayer_worker));
526
527 let channel_update_worker =
528 domain_client_message_relayer::worker::gossip_channel_updates::<_, _, CBlock, _>(
529 ChainId::Domain(domain_id),
530 client.clone(),
531 domain_sync_oracle.clone(),
534 gossip_message_sink,
535 );
536
537 spawn_essential.spawn_essential_blocking(
538 "domain-channel-update-worker",
539 None,
540 Box::pin(channel_update_worker),
541 );
542 }
543
544 let domain_listener =
546 cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>(
547 ChainId::Domain(domain_id),
548 consensus_client.clone(),
549 client.clone(),
550 params.transaction_pool.clone(),
551 consensus_network,
552 domain_message_receiver,
553 code_executor.clone(),
554 domain_sync_oracle,
555 );
556
557 spawn_essential.spawn_essential_blocking(
558 "domain-message-listener",
559 None,
560 Box::pin(domain_listener),
561 );
562
563 let new_full = NewFull {
564 task_manager,
565 client,
566 backend,
567 code_executor,
568 network_service,
569 sync_service,
570 rpc_handlers,
571 operator,
572 transaction_pool: params.transaction_pool,
573 _phantom_data: Default::default(),
574 };
575
576 Ok(new_full)
577}