1#![feature(
3 duration_constructors_lite,
4 impl_trait_in_assoc_type,
5 int_roundings,
6 type_alias_impl_trait,
7 type_changing_struct_update
8)]
9
10pub mod config;
11pub mod dsn;
12mod metrics;
13pub(crate) mod mmr;
14pub mod rpc;
15pub mod sync_from_dsn;
16mod task_spawner;
17mod utils;
18
19use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
20use crate::dsn::{DsnConfigurationError, create_dsn_instance};
21use crate::metrics::NodeMetrics;
22use crate::mmr::request_handler::MmrRequestHandler;
23use crate::sync_from_dsn::DsnPieceGetter;
24use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
25use crate::sync_from_dsn::snap_sync::snap_sync;
26use async_lock::Semaphore;
27use core::sync::atomic::{AtomicU32, Ordering};
28use cross_domain_message_gossip::xdm_gossip_peers_set_config;
29use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
30use frame_system_rpc_runtime_api::AccountNonceApi;
31use futures::FutureExt;
32use futures::channel::oneshot;
33use jsonrpsee::RpcModule;
34use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
35use parity_scale_codec::Decode;
36use parking_lot::Mutex;
37use prometheus_client::registry::Registry;
38use sc_basic_authorship::ProposerFactory;
39use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
40use sc_client_api::execution_extensions::ExtensionsFactory;
41use sc_client_api::{
42 AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
43};
44use sc_consensus::{
45 BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
46 DefaultImportQueue, ImportQueue, ImportResult,
47};
48use sc_consensus_slots::SlotProportion;
49use sc_consensus_subspace::SubspaceLink;
50use sc_consensus_subspace::archiver::{
51 ArchivedSegmentNotification, ObjectMappingNotification, SegmentHeadersStore,
52 create_subspace_archiver,
53};
54use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
55use sc_consensus_subspace::notification::SubspaceNotificationStream;
56use sc_consensus_subspace::slot_worker::{
57 NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
58 SubspaceSyncOracle,
59};
60use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
61use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
62use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
63use sc_network::service::traits::NetworkService;
64use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
65use sc_network_sync::block_relay_protocol::BlockRelayParams;
66use sc_network_sync::engine::SyncingEngine;
67use sc_network_sync::service::network::NetworkServiceProvider;
68use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
69use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
70use sc_proof_of_time::verifier::PotVerifier;
71use sc_service::error::Error as ServiceError;
72use sc_service::{
73 BuildNetworkAdvancedParams, Configuration, SpawnTasksParams, TaskManager,
74 build_network_advanced, build_polkadot_syncing_strategy,
75};
76use sc_subspace_block_relay::{
77 BlockRelayConfigurationError, NetworkWrapper, build_consensus_relay,
78};
79use sc_telemetry::{Telemetry, TelemetryWorker};
80use sc_transaction_pool::TransactionPoolHandle;
81use sc_transaction_pool_api::OffchainTransactionPoolFactory;
82use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
83use sp_block_builder::BlockBuilder;
84use sp_blockchain::HeaderMetadata;
85use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
86use sp_consensus_slots::Slot;
87use sp_consensus_subspace::digests::extract_pre_digest;
88use sp_consensus_subspace::{
89 KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
90};
91use sp_core::H256;
92use sp_core::offchain::OffchainDbExt;
93use sp_core::offchain::storage::OffchainDb;
94use sp_core::traits::SpawnEssentialNamed;
95use sp_domains::storage::StorageKey;
96use sp_domains::{BundleProducerElectionApi, DomainsApi};
97use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
98use sp_externalities::Extensions;
99use sp_messenger::MessengerApi;
100use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
101use sp_mmr_primitives::MmrApi;
102use sp_objects::ObjectsApi;
103use sp_offchain::OffchainWorkerApi;
104use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
105use sp_session::SessionKeys;
106use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
107use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
108use static_assertions::const_assert;
109use std::marker::PhantomData;
110use std::num::NonZeroUsize;
111use std::sync::Arc;
112use std::time::Duration;
113use subspace_core_primitives::pieces::Record;
114use subspace_core_primitives::pot::PotSeed;
115use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
116use subspace_erasure_coding::ErasureCoding;
117use subspace_kzg::Kzg;
118use subspace_networking::libp2p::multiaddr::Protocol;
119use subspace_networking::utils::piece_provider::PieceProvider;
120use subspace_proof_of_space::Table;
121use subspace_runtime_primitives::opaque::Block;
122use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, Hash, Nonce};
123use tokio::sync::broadcast;
124use tracing::{Instrument, debug, error, info};
125pub use utils::wait_for_block_import;
126
127const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
130
131const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
134const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
135const PIECE_PROVIDER_MULTIPLIER: usize = 10;
137
138#[derive(thiserror::Error, Debug)]
140pub enum Error {
141 #[error(transparent)]
143 Io(#[from] std::io::Error),
144
145 #[error(transparent)]
147 AddrFormatInvalid(#[from] std::net::AddrParseError),
148
149 #[error(transparent)]
151 Sub(#[from] sc_service::Error),
152
153 #[error(transparent)]
155 Consensus(#[from] sp_consensus::Error),
156
157 #[error(transparent)]
159 Telemetry(#[from] sc_telemetry::Error),
160
161 #[error(transparent)]
163 SubspaceDsn(#[from] DsnConfigurationError),
164
165 #[error(transparent)]
167 BlockRelay(#[from] BlockRelayConfigurationError),
168
169 #[error(transparent)]
171 Other(Box<dyn std::error::Error + Send + Sync>),
172}
173
174#[derive(Clone)]
176struct BlockImportWrapper<BI>(BI);
177
178#[async_trait::async_trait]
179impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
180where
181 Block: BlockT,
182 BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
183 + Send
184 + Sync,
185{
186 type Error = sp_consensus::Error;
187
188 async fn check_block(
189 &self,
190 block: BlockCheckParams<Block>,
191 ) -> Result<ImportResult, Self::Error> {
192 self.0
193 .check_block(block)
194 .await
195 .map_err(|error| sp_consensus::Error::Other(error.into()))
196 }
197
198 async fn import_block(
199 &self,
200 block: BlockImportParams<Block>,
201 ) -> Result<ImportResult, Self::Error> {
202 self.0
203 .import_block(block)
204 .await
205 .map_err(|error| sp_consensus::Error::Other(error.into()))
206 }
207}
208
209#[cfg(not(feature = "runtime-benchmarks"))]
211pub type HostFunctions = (
212 sp_io::SubstrateHostFunctions,
213 sp_consensus_subspace::consensus::HostFunctions,
214 sp_domains_fraud_proof::HostFunctions,
215 sp_subspace_mmr::HostFunctions,
216 sp_messenger_host_functions::HostFunctions,
217);
218
219#[cfg(feature = "runtime-benchmarks")]
221pub type HostFunctions = (
222 sp_io::SubstrateHostFunctions,
223 frame_benchmarking::benchmarking::HostFunctions,
224 sp_consensus_subspace::consensus::HostFunctions,
225 sp_domains_fraud_proof::HostFunctions,
226 sp_subspace_mmr::HostFunctions,
227 sp_messenger_host_functions::HostFunctions,
228);
229
230pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
232
233pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
235
236pub type FullBackend = sc_service::TFullBackend<Block>;
237pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
238
239struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
240 kzg: Kzg,
241 client: Arc<Client>,
242 backend: Arc<FullBackend>,
243 pot_verifier: PotVerifier,
244 domains_executor: Arc<sc_domains::RuntimeExecutor>,
245 confirmation_depth_k: BlockNumber,
246 _pos_table: PhantomData<(PosTable, DomainBlock)>,
247}
248
249impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
250 for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
251where
252 PosTable: Table,
253 Block: BlockT,
254 Block::Hash: From<H256> + Into<H256>,
255 DomainBlock: BlockT,
256 DomainBlock::Hash: Into<H256> + From<H256>,
257 Client: BlockBackend<Block>
258 + HeaderBackend<Block>
259 + ProvideRuntimeApi<Block>
260 + Send
261 + Sync
262 + 'static,
263 Client::Api: SubspaceApi<Block, PublicKey>
264 + DomainsApi<Block, DomainBlock::Header>
265 + BundleProducerElectionApi<Block, Balance>
266 + MmrApi<Block, H256, NumberFor<Block>>
267 + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
268{
269 fn extensions_for(
270 &self,
271 _block_hash: Block::Hash,
272 _block_number: NumberFor<Block>,
273 ) -> Extensions {
274 let confirmation_depth_k = self.confirmation_depth_k;
275 let mut exts = Extensions::new();
276 exts.register(KzgExtension::new(self.kzg.clone()));
277 exts.register(PosExtension::new::<PosTable>());
278 exts.register(PotExtension::new({
279 let client = Arc::clone(&self.client);
280 let pot_verifier = self.pot_verifier.clone();
281
282 Box::new(
283 move |parent_hash, slot, proof_of_time, quick_verification| {
284 let parent_hash = {
285 let mut converted_parent_hash = Block::Hash::default();
286 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
287 converted_parent_hash
288 };
289
290 let parent_header = match client.header(parent_hash) {
291 Ok(Some(parent_header)) => parent_header,
292 Ok(None) => {
293 if quick_verification {
294 error!(
295 %parent_hash,
296 "Header not found during proof of time verification"
297 );
298
299 return false;
300 } else {
301 debug!(
302 %parent_hash,
303 "Header not found during proof of time verification"
304 );
305
306 return true;
309 }
310 }
311 Err(error) => {
312 error!(
313 %error,
314 %parent_hash,
315 "Failed to retrieve header during proof of time verification"
316 );
317
318 return false;
319 }
320 };
321
322 let parent_pre_digest = match extract_pre_digest(&parent_header) {
323 Ok(parent_pre_digest) => parent_pre_digest,
324 Err(error) => {
325 error!(
326 %error,
327 %parent_hash,
328 parent_number = %parent_header.number(),
329 "Failed to extract pre-digest from parent header during proof of \
330 time verification, this must never happen"
331 );
332
333 return false;
334 }
335 };
336
337 let parent_slot = parent_pre_digest.slot();
338 if slot <= *parent_slot {
339 return false;
340 }
341
342 let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
343 Ok(pot_parameters) => pot_parameters,
344 Err(error) => {
345 debug!(
346 %error,
347 %parent_hash,
348 parent_number = %parent_header.number(),
349 "Failed to retrieve proof of time parameters during proof of time \
350 verification"
351 );
352
353 return false;
354 }
355 };
356
357 let pot_input = if parent_header.number().is_zero() {
358 PotNextSlotInput {
359 slot: parent_slot + Slot::from(1),
360 slot_iterations: pot_parameters.slot_iterations(),
361 seed: pot_verifier.genesis_seed(),
362 }
363 } else {
364 let pot_info = parent_pre_digest.pot_info();
365
366 PotNextSlotInput::derive(
367 pot_parameters.slot_iterations(),
368 parent_slot,
369 pot_info.proof_of_time(),
370 &pot_parameters.next_parameters_change(),
371 )
372 };
373
374 if quick_verification {
378 pot_verifier.try_is_output_valid(
379 pot_input,
380 Slot::from(slot) - parent_slot,
381 proof_of_time,
382 pot_parameters.next_parameters_change(),
383 )
384 } else {
385 pot_verifier.is_output_valid(
386 pot_input,
387 Slot::from(slot) - parent_slot,
388 proof_of_time,
389 pot_parameters.next_parameters_change(),
390 )
391 }
392 },
393 )
394 }));
395
396 exts.register(FraudProofExtension::new(Arc::new(
397 FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
398 self.client.clone(),
399 self.domains_executor.clone(),
400 move |client, executor| {
401 let extension_factory =
402 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
403 client,
404 executor,
405 confirmation_depth_k,
406 );
407 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
408 },
409 ),
410 )));
411
412 exts.register(SubspaceMmrExtension::new(Arc::new(
413 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
414 self.client.clone(),
415 confirmation_depth_k,
416 ),
417 )));
418
419 exts.register(MessengerExtension::new(Arc::new(
420 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
421 self.client.clone(),
422 self.domains_executor.clone(),
423 ),
424 )));
425
426 if let Some(offchain_storage) = self.backend.offchain_storage() {
429 let offchain_db = OffchainDb::new(offchain_storage);
430 exts.register(OffchainDbExt::new(offchain_db));
431 }
432
433 exts
434 }
435}
436
437pub struct OtherPartialComponents<RuntimeApi>
439where
440 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
441{
442 pub block_import: BoxBlockImport<Block>,
444 pub subspace_link: SubspaceLink<Block>,
446 pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
448 pub pot_verifier: PotVerifier,
450 pub sync_target_block_number: Arc<AtomicU32>,
452 pub telemetry: Option<Telemetry>,
454}
455
456type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
457 FullClient<RuntimeApi>,
458 FullBackend,
459 FullSelectChain,
460 DefaultImportQueue<Block>,
461 TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
462 OtherPartialComponents<RuntimeApi>,
463>;
464
465#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
467pub fn new_partial<PosTable, RuntimeApi>(
468 config: &Configuration,
471 snap_sync: bool,
473 pot_external_entropy: &[u8],
474) -> Result<PartialComponents<RuntimeApi>, ServiceError>
475where
476 PosTable: Table,
477 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
478 RuntimeApi::RuntimeApi: ApiExt<Block>
479 + Metadata<Block>
480 + BlockBuilder<Block>
481 + OffchainWorkerApi<Block>
482 + SessionKeys<Block>
483 + TaggedTransactionQueue<Block>
484 + SubspaceApi<Block, PublicKey>
485 + DomainsApi<Block, DomainHeader>
486 + FraudProofApi<Block, DomainHeader>
487 + BundleProducerElectionApi<Block, Balance>
488 + ObjectsApi<Block>
489 + MmrApi<Block, H256, NumberFor<Block>>
490 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
491{
492 let telemetry = config
493 .telemetry_endpoints
494 .clone()
495 .filter(|x| !x.is_empty())
496 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
497 let worker = TelemetryWorker::new(16)?;
498 let telemetry = worker.handle().new_telemetry(endpoints);
499 Ok((worker, telemetry))
500 })
501 .transpose()?;
502
503 let executor = sc_service::new_wasm_executor(&config.executor);
504 let domains_executor = sc_service::new_wasm_executor(&config.executor);
505
506 let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
507 ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
508 )?;
509
510 let backend = Arc::new(sc_client_db::Backend::new(
511 config.db_config(),
512 confirmation_depth_k.into(),
513 )?);
514
515 let genesis_block_builder = GenesisBlockBuilder::new(
516 config.chain_spec.as_storage_builder(),
517 !snap_sync,
518 backend.clone(),
519 executor.clone(),
520 )?;
521
522 let (client, backend, keystore_container, task_manager) =
523 sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
524 config,
525 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
526 executor.clone(),
527 backend,
528 genesis_block_builder,
529 false,
530 )?;
531
532 let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
534 rayon::join(Kzg::new, || {
535 ErasureCoding::new(
536 NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
537 .expect("Not zero; qed"),
538 )
539 .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
540 })
541 });
542 let erasure_coding = maybe_erasure_coding?;
543
544 let client = Arc::new(client);
545 let client_info = client.info();
546 let chain_constants = client
547 .runtime_api()
548 .chain_constants(client_info.best_hash)
549 .map_err(|error| ServiceError::Application(error.into()))?;
550
551 let pot_verifier = PotVerifier::new(
552 PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
553 POT_VERIFIER_CACHE_SIZE,
554 );
555
556 assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
558
559 client
560 .execution_extensions()
561 .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
562 kzg: kzg.clone(),
563 client: Arc::clone(&client),
564 pot_verifier: pot_verifier.clone(),
565 domains_executor: Arc::new(domains_executor),
566 backend: backend.clone(),
567 confirmation_depth_k: chain_constants.confirmation_depth_k(),
568 _pos_table: PhantomData,
569 });
570
571 let telemetry = telemetry.map(|(worker, telemetry)| {
572 task_manager
573 .spawn_handle()
574 .spawn("telemetry", None, worker.run());
575 telemetry
576 });
577
578 let select_chain = sc_consensus::LongestChain::new(backend.clone());
579
580 let segment_headers_store = tokio::task::block_in_place(|| {
581 SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
582 })
583 .map_err(|error| ServiceError::Application(error.into()))?;
584
585 let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
586 let segment_headers_store = segment_headers_store.clone();
587
588 let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
589 client.clone(),
590 client.clone(),
591 subspace_link.clone(),
592 {
593 let client = client.clone();
594 let segment_headers_store = segment_headers_store.clone();
595
596 move |parent_hash, ()| {
597 let client = client.clone();
598 let segment_headers_store = segment_headers_store.clone();
599
600 async move {
601 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
602
603 let parent_header = client
604 .header(parent_hash)?
605 .expect("Parent header must always exist when block is created; qed");
606
607 let parent_block_number = parent_header.number;
608
609 let subspace_inherents =
610 sp_consensus_subspace::inherents::InherentDataProvider::new(
611 segment_headers_store
612 .segment_headers_for_block(parent_block_number + 1),
613 );
614
615 Ok((timestamp, subspace_inherents))
616 }
617 }
618 },
619 segment_headers_store.clone(),
620 pot_verifier.clone(),
621 );
622
623 let sync_target_block_number = Arc::new(AtomicU32::new(0));
624 let transaction_pool = Arc::from(
625 sc_transaction_pool::Builder::new(
626 task_manager.spawn_essential_handle(),
627 client.clone(),
628 config.role.is_authority().into(),
629 )
630 .with_options(config.transaction_pool.clone())
631 .with_prometheus(config.prometheus_registry())
632 .build(),
633 );
634
635 let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
636 client: client.clone(),
637 chain_constants,
638 kzg,
639 telemetry: telemetry.as_ref().map(|x| x.handle()),
640 reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
641 sync_target_block_number: Arc::clone(&sync_target_block_number),
642 is_authoring_blocks: config.role.is_authority(),
643 pot_verifier: pot_verifier.clone(),
644 });
645
646 let import_queue = BasicQueue::new(
647 verifier,
648 Box::new(BlockImportWrapper(block_import.clone())),
649 None,
650 &task_manager.spawn_essential_handle(),
651 config.prometheus_registry(),
652 );
653
654 let other = OtherPartialComponents {
655 block_import: Box::new(BlockImportWrapper(block_import.clone())),
656 subspace_link,
657 segment_headers_store,
658 pot_verifier,
659 sync_target_block_number,
660 telemetry,
661 };
662
663 Ok(PartialComponents {
664 client,
665 backend,
666 task_manager,
667 import_queue,
668 keystore_container,
669 select_chain,
670 transaction_pool,
671 other,
672 })
673}
674
675pub struct NewFull<Client>
677where
678 Client: ProvideRuntimeApi<Block>
679 + AuxStore
680 + BlockBackend<Block>
681 + BlockIdTo<Block>
682 + HeaderBackend<Block>
683 + HeaderMetadata<Block, Error = sp_blockchain::Error>
684 + 'static,
685 Client::Api: TaggedTransactionQueue<Block>
686 + DomainsApi<Block, DomainHeader>
687 + FraudProofApi<Block, DomainHeader>
688 + SubspaceApi<Block, PublicKey>
689 + MmrApi<Block, H256, NumberFor<Block>>
690 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
691{
692 pub task_manager: TaskManager,
694 pub client: Arc<Client>,
696 pub select_chain: FullSelectChain,
698 pub network_service: Arc<dyn NetworkService + Send + Sync>,
700 pub xdm_gossip_notification_service: Box<dyn NotificationService>,
702 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
704 pub rpc_handlers: sc_service::RpcHandlers,
706 pub backend: Arc<FullBackend>,
708 pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
710 pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
713 pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
715 pub block_importing_notification_stream:
717 SubspaceNotificationStream<BlockImportingNotification<Block>>,
718 pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
720 pub archived_segment_notification_stream:
722 SubspaceNotificationStream<ArchivedSegmentNotification>,
723 pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
725}
726
727type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
728
729pub async fn new_full<PosTable, RuntimeApi>(
731 mut config: SubspaceConfiguration,
732 partial_components: PartialComponents<RuntimeApi>,
733 prometheus_registry: Option<&mut Registry>,
734 enable_rpc_extensions: bool,
735 block_proposal_slot_portion: SlotProportion,
736 consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
737) -> Result<FullNode<RuntimeApi>, Error>
738where
739 PosTable: Table,
740 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
741 RuntimeApi::RuntimeApi: ApiExt<Block>
742 + Metadata<Block>
743 + AccountNonceApi<Block, AccountId, Nonce>
744 + BlockBuilder<Block>
745 + OffchainWorkerApi<Block>
746 + SessionKeys<Block>
747 + TaggedTransactionQueue<Block>
748 + TransactionPaymentApi<Block, Balance>
749 + SubspaceApi<Block, PublicKey>
750 + DomainsApi<Block, DomainHeader>
751 + FraudProofApi<Block, DomainHeader>
752 + ObjectsApi<Block>
753 + MmrApi<Block, Hash, BlockNumber>
754 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
755{
756 let PartialComponents {
757 client,
758 backend,
759 mut task_manager,
760 import_queue,
761 keystore_container,
762 select_chain,
763 transaction_pool,
764 other,
765 } = partial_components;
766 let OtherPartialComponents {
767 block_import,
768 subspace_link,
769 segment_headers_store,
770 pot_verifier,
771 sync_target_block_number,
772 mut telemetry,
773 } = other;
774
775 let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
776 let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
777 SubspaceNetworking::Reuse {
778 node,
779 bootstrap_nodes,
780 piece_getter,
781 } => (node, bootstrap_nodes, piece_getter),
782 SubspaceNetworking::Create { config: dsn_config } => {
783 let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
784
785 debug!(
786 chain_type=?config.base.chain_spec.chain_type(),
787 genesis_hash=%hex::encode(client.chain_info().genesis_hash),
788 "Setting DSN protocol version..."
789 );
790
791 let out_connections = dsn_config.max_out_connections;
792 let (node, mut node_runner) = create_dsn_instance(
793 dsn_protocol_version,
794 dsn_config.clone(),
795 prometheus_registry,
796 )?;
797
798 info!("Subspace networking initialized: Node ID is {}", node.id());
799
800 node.on_new_listener(Arc::new({
801 let node = node.clone();
802
803 move |address| {
804 info!(
805 "DSN listening on {}",
806 address.clone().with(Protocol::P2p(node.id()))
807 );
808 }
809 }))
810 .detach();
811
812 task_manager
813 .spawn_essential_handle()
814 .spawn_essential_blocking(
815 "node-runner",
816 Some("subspace-networking"),
817 Box::pin(
818 async move {
819 node_runner.run().await;
820 }
821 .in_current_span(),
822 ),
823 );
824
825 let piece_provider = PieceProvider::new(
826 node.clone(),
827 SegmentCommitmentPieceValidator::new(
828 node.clone(),
829 subspace_link.kzg().clone(),
830 segment_headers_store.clone(),
831 ),
832 Arc::new(Semaphore::new(
833 out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
834 )),
835 );
836
837 (
838 node,
839 dsn_config.bootstrap_nodes,
840 Arc::new(DsnPieceGetter::new(piece_provider)) as _,
841 )
842 }
843 };
844
845 let dsn_bootstrap_nodes = {
846 if bootstrap_nodes.is_empty() {
849 let (node_address_sender, node_address_receiver) = oneshot::channel();
850 let _handler = node.on_new_listener(Arc::new({
851 let node_address_sender = Mutex::new(Some(node_address_sender));
852
853 move |address| {
854 if matches!(address.iter().next(), Some(Protocol::Ip4(_)))
855 && let Some(node_address_sender) = node_address_sender.lock().take()
856 && let Err(err) = node_address_sender.send(address.clone())
857 {
858 debug!(?err, "Couldn't send a node address to the channel.");
859 }
860 }
861 }));
862
863 let mut node_listeners = node.listeners();
864
865 if node_listeners.is_empty() {
866 let Ok(listener) = node_address_receiver.await else {
867 return Err(Error::Other(
868 "Oneshot receiver dropped before DSN node listener was ready"
869 .to_string()
870 .into(),
871 ));
872 };
873
874 node_listeners = vec![listener];
875 }
876
877 node_listeners.iter_mut().for_each(|multiaddr| {
878 multiaddr.push(Protocol::P2p(node.id()));
879 });
880
881 node_listeners
882 } else {
883 bootstrap_nodes
884 }
885 };
886
887 let substrate_prometheus_registry = config
888 .base
889 .prometheus_config
890 .as_ref()
891 .map(|prometheus_config| prometheus_config.registry.clone());
892 let import_queue_service1 = import_queue.service();
893 let import_queue_service2 = import_queue.service();
894 let network_wrapper = Arc::new(NetworkWrapper::default());
895 let block_relay = build_consensus_relay(
896 network_wrapper.clone(),
897 client.clone(),
898 transaction_pool.clone(),
899 substrate_prometheus_registry.as_ref(),
900 )
901 .map_err(Error::BlockRelay)?;
902 let mut net_config = sc_network::config::FullNetworkConfiguration::new(
903 &config.base.network,
904 substrate_prometheus_registry.clone(),
905 );
906 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
907 xdm_gossip_peers_set_config();
908 net_config.add_notification_protocol(xdm_gossip_notification_config);
909 let (pot_gossip_notification_config, pot_gossip_notification_service) =
910 pot_gossip_peers_set_config();
911 net_config.add_notification_protocol(pot_gossip_notification_config);
912 let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
913
914 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
915 + net_config
916 .network_config
917 .default_peers_set
918 .reserved_nodes
919 .len();
920
921 let protocol_id = config.base.protocol_id();
922 let fork_id = config.base.chain_spec.fork_id();
923
924 let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
926 NetworkWorker<Block, BlockHashFor<Block>>,
927 >(fork_id, client.clone(), num_peer_hint);
928 task_manager.spawn_handle().spawn(
929 "domain-block-er-request-handler",
930 Some("networking"),
931 handler.run(),
932 );
933 net_config.add_request_response_protocol(protocol_config);
934
935 if let Some(offchain_storage) = backend.offchain_storage() {
936 let (handler, protocol_config) =
938 MmrRequestHandler::new::<NetworkWorker<Block, BlockHashFor<Block>>>(
939 &config.base.protocol_id(),
940 fork_id,
941 client.clone(),
942 num_peer_hint,
943 offchain_storage,
944 );
945 task_manager
946 .spawn_handle()
947 .spawn("mmr-request-handler", Some("networking"), handler.run());
948
949 net_config.add_request_response_protocol(protocol_config);
950 }
951
952 let network_service_provider = NetworkServiceProvider::new();
953 let network_service_handle = network_service_provider.handle();
954 let (network_service, system_rpc_tx, tx_handler_controller, sync_service) = {
955 let spawn_handle = task_manager.spawn_handle();
956 let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
957
958 let block_downloader = {
960 let BlockRelayParams {
961 mut server,
962 downloader,
963 request_response_config,
964 } = block_relay;
965 net_config.add_request_response_protocol(request_response_config);
966
967 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
968 server.run().await;
969 });
970
971 downloader
972 };
973
974 let syncing_strategy = build_polkadot_syncing_strategy(
975 protocol_id.clone(),
976 fork_id,
977 &mut net_config,
978 None,
979 block_downloader,
980 client.clone(),
981 &spawn_handle,
982 substrate_prometheus_registry.as_ref(),
983 )?;
984
985 let (syncing_engine, sync_service, block_announce_config) =
986 SyncingEngine::new::<NetworkWorker<_, _>>(
987 Roles::from(&config.base.role),
988 Arc::clone(&client),
989 substrate_prometheus_registry.as_ref(),
990 metrics.clone(),
991 &net_config,
992 protocol_id.clone(),
993 fork_id,
994 Box::new(DefaultBlockAnnounceValidator),
995 syncing_strategy,
996 network_service_provider.handle(),
997 import_queue.service(),
998 net_config.peer_store_handle(),
999 config.base.network.force_synced,
1000 )
1001 .map_err(sc_service::Error::from)?;
1002
1003 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1004
1005 build_network_advanced(BuildNetworkAdvancedParams {
1006 role: config.base.role,
1007 protocol_id,
1008 fork_id,
1009 ipfs_server: config.base.network.ipfs_server,
1010 announce_block: config.base.announce_block,
1011 net_config,
1012 client: Arc::clone(&client),
1013 transaction_pool: Arc::clone(&transaction_pool),
1014 spawn_handle,
1015 import_queue,
1016 sync_service,
1017 block_announce_config,
1018 network_service_provider,
1019 metrics_registry: substrate_prometheus_registry.as_ref(),
1020 metrics,
1021 })?
1022 };
1023
1024 task_manager.spawn_handle().spawn(
1025 "sync-target-follower",
1026 None,
1027 Box::pin({
1028 let sync_service = sync_service.clone();
1029 let sync_target_block_number = Arc::clone(&sync_target_block_number);
1030
1031 async move {
1032 loop {
1033 let best_seen_block = sync_service
1034 .status()
1035 .await
1036 .map(|status| status.best_seen_block.unwrap_or_default())
1037 .unwrap_or_default();
1038 sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1039
1040 tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1041 }
1042 }
1043 }),
1044 );
1045
1046 let sync_oracle = SubspaceSyncOracle::new(
1047 config.base.force_authoring,
1048 Arc::clone(&pause_sync),
1049 sync_service.clone(),
1050 );
1051
1052 let subspace_archiver = tokio::task::block_in_place(|| {
1053 create_subspace_archiver(
1054 segment_headers_store.clone(),
1055 subspace_link.clone(),
1056 client.clone(),
1057 sync_oracle.clone(),
1058 telemetry.as_ref().map(|telemetry| telemetry.handle()),
1059 config.create_object_mappings,
1060 )
1061 })
1062 .map_err(ServiceError::Client)?;
1063
1064 task_manager
1065 .spawn_essential_handle()
1066 .spawn_essential_blocking(
1067 "subspace-archiver",
1068 None,
1069 Box::pin(async move {
1070 if let Err(error) = subspace_archiver.await {
1071 error!(%error, "Archiver exited with error");
1072 }
1073 }),
1074 );
1075
1076 network_wrapper.set(network_service.clone());
1077
1078 if !config.base.network.force_synced {
1079 pause_sync.store(true, Ordering::Release);
1081 }
1082
1083 let snap_sync_task = snap_sync(
1084 segment_headers_store.clone(),
1085 node.clone(),
1086 fork_id.map(|fork_id| fork_id.to_string()),
1087 Arc::clone(&client),
1088 import_queue_service1,
1089 pause_sync.clone(),
1090 piece_getter.clone(),
1091 sync_service.clone(),
1092 network_service_handle,
1093 subspace_link.erasure_coding().clone(),
1094 consensus_snap_sync_target_block_receiver,
1095 backend.offchain_storage(),
1096 network_service.clone(),
1097 );
1098
1099 let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1100 segment_headers_store.clone(),
1101 Arc::clone(&network_service),
1102 node.clone(),
1103 Arc::clone(&client),
1104 import_queue_service2,
1105 sync_service.clone(),
1106 sync_target_block_number,
1107 pause_sync,
1108 piece_getter,
1109 subspace_link.erasure_coding().clone(),
1110 );
1111 task_manager
1112 .spawn_handle()
1113 .spawn("observer", Some("sync-from-dsn"), observer);
1114 task_manager
1115 .spawn_essential_handle()
1116 .spawn_essential_blocking(
1117 "worker",
1118 Some("sync-from-dsn"),
1119 Box::pin(async move {
1120 if config.sync == ChainSyncMode::Snap
1122 && let Err(error) = snap_sync_task.in_current_span().await
1123 {
1124 error!(%error, "Snap sync exited with a fatal error");
1125 return;
1126 }
1127
1128 if let Err(error) = worker.await {
1129 error!(%error, "Sync from DSN exited with an error");
1130 }
1131 }),
1132 );
1133
1134 if let Some(registry) = substrate_prometheus_registry.as_ref() {
1135 match NodeMetrics::new(
1136 client.clone(),
1137 client.every_import_notification_stream(),
1138 registry,
1139 ) {
1140 Ok(node_metrics) => {
1141 task_manager.spawn_handle().spawn(
1142 "node_metrics",
1143 None,
1144 Box::pin(async move {
1145 node_metrics.run().await;
1146 }),
1147 );
1148 }
1149 Err(err) => {
1150 error!("Failed to initialize node metrics: {err:?}");
1151 }
1152 }
1153 }
1154
1155 let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1156
1157 if config.base.offchain_worker.enabled {
1158 let offchain_workers =
1159 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1160 runtime_api_provider: client.clone(),
1161 is_validator: config.base.role.is_authority(),
1162 keystore: Some(keystore_container.keystore()),
1163 offchain_db: backend.offchain_storage(),
1164 transaction_pool: Some(offchain_tx_pool_factory.clone()),
1165 network_provider: Arc::new(network_service.clone()),
1166 enable_http_requests: true,
1167 custom_extensions: |_| vec![],
1168 })?;
1169 task_manager.spawn_handle().spawn(
1170 "offchain-workers-runner",
1171 "offchain-worker",
1172 offchain_workers
1173 .run(client.clone(), task_manager.spawn_handle())
1174 .boxed(),
1175 );
1176 }
1177
1178 if offchain_indexing_enabled {
1180 task_manager.spawn_essential_handle().spawn_blocking(
1181 "mmr-gadget",
1182 None,
1183 mmr_gadget::MmrGadget::start(
1184 client.clone(),
1185 backend.clone(),
1186 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1187 ),
1188 );
1189 }
1190
1191 let backoff_authoring_blocks: Option<()> = None;
1192
1193 let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1194 let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1195 let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1196 let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1197 let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1198
1199 let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1200 config.is_timekeeper,
1201 config.timekeeper_cpu_cores,
1202 client.clone(),
1203 pot_verifier.clone(),
1204 Arc::clone(&network_service),
1205 pot_gossip_notification_service,
1206 sync_service.clone(),
1207 sync_oracle.clone(),
1208 )
1209 .map_err(|error| Error::Other(error.into()))?;
1210
1211 let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1212
1213 task_manager
1214 .spawn_essential_handle()
1215 .spawn("pot-source", Some("pot"), pot_source_worker.run());
1216 task_manager
1217 .spawn_essential_handle()
1218 .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1219
1220 if config.base.role.is_authority() || config.force_new_slot_notifications {
1221 let proposer_factory = ProposerFactory::new(
1222 task_manager.spawn_handle(),
1223 client.clone(),
1224 transaction_pool.clone(),
1225 substrate_prometheus_registry.as_ref(),
1226 telemetry.as_ref().map(|x| x.handle()),
1227 );
1228
1229 let subspace_slot_worker =
1230 SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1231 client: client.clone(),
1232 env: proposer_factory,
1233 block_import,
1234 sync_oracle: sync_oracle.clone(),
1235 justification_sync_link: sync_service.clone(),
1236 force_authoring: config.base.force_authoring,
1237 backoff_authoring_blocks,
1238 subspace_link: subspace_link.clone(),
1239 segment_headers_store: segment_headers_store.clone(),
1240 block_proposal_slot_portion,
1241 max_block_proposal_slot_portion: None,
1242 telemetry: telemetry.as_ref().map(|x| x.handle()),
1243 offchain_tx_pool_factory,
1244 pot_verifier,
1245 });
1246
1247 let create_inherent_data_providers = {
1248 let client = client.clone();
1249 let segment_headers_store = segment_headers_store.clone();
1250
1251 move |parent_hash, ()| {
1252 let client = client.clone();
1253 let segment_headers_store = segment_headers_store.clone();
1254
1255 async move {
1256 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1257
1258 let parent_header = client
1259 .header(parent_hash)?
1260 .expect("Parent header must always exist when block is created; qed");
1261
1262 let parent_block_number = parent_header.number;
1263
1264 let subspace_inherents =
1265 sp_consensus_subspace::inherents::InherentDataProvider::new(
1266 segment_headers_store
1267 .segment_headers_for_block(parent_block_number + 1),
1268 );
1269
1270 Ok((timestamp, subspace_inherents))
1271 }
1272 }
1273 };
1274
1275 info!("🧑🌾 Starting Subspace Authorship worker");
1276 let slot_worker_task = sc_proof_of_time::start_slot_worker(
1277 subspace_link.chain_constants().slot_duration(),
1278 client.clone(),
1279 select_chain.clone(),
1280 subspace_slot_worker,
1281 sync_oracle.clone(),
1282 create_inherent_data_providers,
1283 pot_slot_info_stream,
1284 );
1285
1286 task_manager.spawn_essential_handle().spawn_blocking(
1289 "subspace-proposer",
1290 Some("block-authoring"),
1291 slot_worker_task,
1292 );
1293 }
1294
1295 config.base.prometheus_config.take();
1297
1298 let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1299 network: network_service.clone(),
1300 client: client.clone(),
1301 keystore: keystore_container.keystore(),
1302 task_manager: &mut task_manager,
1303 transaction_pool: transaction_pool.clone(),
1304 rpc_builder: if enable_rpc_extensions {
1305 let client = client.clone();
1306 let new_slot_notification_stream = new_slot_notification_stream.clone();
1307 let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1308 let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1309 let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1310 let transaction_pool = transaction_pool.clone();
1311 let backend = backend.clone();
1312
1313 Box::new(move |subscription_executor| {
1314 let deps = rpc::FullDeps {
1315 client: client.clone(),
1316 pool: transaction_pool.clone(),
1317 subscription_executor,
1318 new_slot_notification_stream: new_slot_notification_stream.clone(),
1319 reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1320 object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1321 archived_segment_notification_stream: archived_segment_notification_stream
1322 .clone(),
1323 dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1324 segment_headers_store: segment_headers_store.clone(),
1325 sync_oracle: sync_oracle.clone(),
1326 kzg: subspace_link.kzg().clone(),
1327 erasure_coding: subspace_link.erasure_coding().clone(),
1328 backend: backend.clone(),
1329 };
1330
1331 rpc::create_full(deps).map_err(Into::into)
1332 })
1333 } else {
1334 Box::new(|_| Ok(RpcModule::new(())))
1335 },
1336 backend: backend.clone(),
1337 system_rpc_tx,
1338 config: config.base,
1339 telemetry: telemetry.as_mut(),
1340 tx_handler_controller,
1341 sync_service: sync_service.clone(),
1342 })?;
1343
1344 Ok(NewFull {
1345 task_manager,
1346 client,
1347 select_chain,
1348 network_service,
1349 xdm_gossip_notification_service,
1350 sync_service,
1351 rpc_handlers,
1352 backend,
1353 pot_slot_info_stream: additional_pot_slot_info_stream,
1354 new_slot_notification_stream,
1355 reward_signing_notification_stream,
1356 block_importing_notification_stream,
1357 object_mapping_notification_stream,
1358 archived_segment_notification_stream,
1359 transaction_pool,
1360 })
1361}
1362
1363fn confirmation_depth_storage_key() -> StorageKey {
1365 StorageKey(
1366 frame_support::storage::storage_prefix(
1367 "RuntimeConfigs".as_bytes(),
1370 "ConfirmationDepthK".as_bytes(),
1372 )
1373 .to_vec(),
1374 )
1375}
1376
1377fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1378 let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1379 let spec: serde_json::Value =
1380 serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1381 let encoded_confirmation_depth = hex::decode(
1382 spec.pointer(format!("/genesis/raw/top/{storage_key}").as_str())?
1383 .as_str()?
1384 .trim_start_matches("0x"),
1385 )
1386 .ok()?;
1387 u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1388}
1389
1390#[cfg(test)]
1391mod test {
1392 use static_assertions::const_assert_eq;
1393 use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1394 use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1395
1396 #[test]
1399 fn max_block_length_consistent() {
1400 const_assert_eq!(
1401 CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1402 ARCHIVER_MAX_BLOCK_LENGTH,
1403 );
1404 }
1405}