1pub mod config;
4pub mod dsn;
5mod metrics;
6pub(crate) mod mmr;
7pub mod rpc;
8pub mod sync_from_dsn;
9mod task_spawner;
10mod utils;
11
12use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
13use crate::dsn::{DsnConfigurationError, create_dsn_instance};
14use crate::metrics::NodeMetrics;
15use crate::mmr::request_handler::MmrRequestHandler;
16use crate::sync_from_dsn::DsnPieceGetter;
17use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
18use crate::sync_from_dsn::snap_sync::snap_sync;
19use async_lock::Semaphore;
20use core::sync::atomic::{AtomicU32, Ordering};
21use cross_domain_message_gossip::xdm_gossip_peers_set_config;
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system_rpc_runtime_api::AccountNonceApi;
24use futures::FutureExt;
25use futures::channel::oneshot;
26use jsonrpsee::RpcModule;
27use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
28use parity_scale_codec::Decode;
29use parking_lot::Mutex;
30use prometheus_client::registry::Registry;
31use sc_basic_authorship::ProposerFactory;
32use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
33use sc_client_api::execution_extensions::ExtensionsFactory;
34use sc_client_api::{
35 AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
36};
37use sc_consensus::{
38 BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
39 DefaultImportQueue, ImportQueue, ImportResult,
40};
41use sc_consensus_slots::SlotProportion;
42use sc_consensus_subspace::SubspaceLink;
43use sc_consensus_subspace::archiver::{
44 ArchivedSegmentNotification, ObjectMappingNotification, SegmentHeadersStore,
45 create_subspace_archiver,
46};
47use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
48use sc_consensus_subspace::notification::SubspaceNotificationStream;
49use sc_consensus_subspace::slot_worker::{
50 NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
51 SubspaceSyncOracle,
52};
53use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
54use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
55use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
56use sc_network::service::traits::NetworkService;
57use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
58use sc_network_sync::block_relay_protocol::BlockRelayParams;
59use sc_network_sync::engine::SyncingEngine;
60use sc_network_sync::service::network::NetworkServiceProvider;
61use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
62use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
63use sc_proof_of_time::verifier::PotVerifier;
64use sc_service::error::Error as ServiceError;
65use sc_service::{
66 BuildNetworkAdvancedParams, Configuration, SpawnTasksParams, TaskManager,
67 build_network_advanced, build_polkadot_syncing_strategy,
68};
69use sc_subspace_block_relay::{
70 BlockRelayConfigurationError, NetworkWrapper, build_consensus_relay,
71};
72use sc_telemetry::{Telemetry, TelemetryWorker};
73use sc_transaction_pool::TransactionPoolHandle;
74use sc_transaction_pool_api::OffchainTransactionPoolFactory;
75use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
76use sp_block_builder::BlockBuilder;
77use sp_blockchain::HeaderMetadata;
78use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
79use sp_consensus_slots::Slot;
80use sp_consensus_subspace::digests::extract_pre_digest;
81use sp_consensus_subspace::{
82 KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
83};
84use sp_core::H256;
85use sp_core::offchain::OffchainDbExt;
86use sp_core::offchain::storage::OffchainDb;
87use sp_core::traits::SpawnEssentialNamed;
88use sp_domains::storage::StorageKey;
89use sp_domains::{BundleProducerElectionApi, DomainsApi};
90use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
91use sp_externalities::Extensions;
92use sp_messenger::MessengerApi;
93use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
94use sp_mmr_primitives::MmrApi;
95use sp_objects::ObjectsApi;
96use sp_offchain::OffchainWorkerApi;
97use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
98use sp_session::SessionKeys;
99use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
100use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
101use static_assertions::const_assert;
102use std::marker::PhantomData;
103use std::num::NonZeroUsize;
104use std::sync::Arc;
105use std::time::Duration;
106use subspace_core_primitives::pieces::Record;
107use subspace_core_primitives::pot::PotSeed;
108use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
109use subspace_erasure_coding::ErasureCoding;
110use subspace_kzg::Kzg;
111use subspace_networking::libp2p::multiaddr::Protocol;
112use subspace_networking::utils::piece_provider::PieceProvider;
113use subspace_proof_of_space::Table;
114use subspace_runtime_primitives::opaque::Block;
115use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, Hash, Nonce};
116use tokio::sync::broadcast;
117use tracing::{Instrument, debug, error, info};
118pub use utils::wait_for_block_import;
119
120const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
123
124const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
127const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
128const PIECE_PROVIDER_MULTIPLIER: usize = 10;
130
131#[derive(thiserror::Error, Debug)]
133pub enum Error {
134 #[error(transparent)]
136 Io(#[from] std::io::Error),
137
138 #[error(transparent)]
140 AddrFormatInvalid(#[from] std::net::AddrParseError),
141
142 #[error(transparent)]
144 Sub(#[from] sc_service::Error),
145
146 #[error(transparent)]
148 Consensus(#[from] sp_consensus::Error),
149
150 #[error(transparent)]
152 Telemetry(#[from] sc_telemetry::Error),
153
154 #[error(transparent)]
156 SubspaceDsn(#[from] DsnConfigurationError),
157
158 #[error(transparent)]
160 BlockRelay(#[from] BlockRelayConfigurationError),
161
162 #[error(transparent)]
164 Other(Box<dyn std::error::Error + Send + Sync>),
165}
166
167#[derive(Clone)]
169struct BlockImportWrapper<BI>(BI);
170
171#[async_trait::async_trait]
172impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
173where
174 Block: BlockT,
175 BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
176 + Send
177 + Sync,
178{
179 type Error = sp_consensus::Error;
180
181 async fn check_block(
182 &self,
183 block: BlockCheckParams<Block>,
184 ) -> Result<ImportResult, Self::Error> {
185 self.0
186 .check_block(block)
187 .await
188 .map_err(|error| sp_consensus::Error::Other(error.into()))
189 }
190
191 async fn import_block(
192 &self,
193 block: BlockImportParams<Block>,
194 ) -> Result<ImportResult, Self::Error> {
195 self.0
196 .import_block(block)
197 .await
198 .map_err(|error| sp_consensus::Error::Other(error.into()))
199 }
200}
201
202#[cfg(not(feature = "runtime-benchmarks"))]
204pub type HostFunctions = (
205 sp_io::SubstrateHostFunctions,
206 sp_consensus_subspace::consensus::HostFunctions,
207 sp_domains_fraud_proof::HostFunctions,
208 sp_subspace_mmr::HostFunctions,
209 sp_messenger_host_functions::HostFunctions,
210);
211
212#[cfg(feature = "runtime-benchmarks")]
214pub type HostFunctions = (
215 sp_io::SubstrateHostFunctions,
216 frame_benchmarking::benchmarking::HostFunctions,
217 sp_consensus_subspace::consensus::HostFunctions,
218 sp_domains_fraud_proof::HostFunctions,
219 sp_subspace_mmr::HostFunctions,
220 sp_messenger_host_functions::HostFunctions,
221);
222
223pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
225
226pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
228
229pub type FullBackend = sc_service::TFullBackend<Block>;
230pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
231
232struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
233 kzg: Kzg,
234 client: Arc<Client>,
235 backend: Arc<FullBackend>,
236 pot_verifier: PotVerifier,
237 domains_executor: Arc<sc_domains::RuntimeExecutor>,
238 confirmation_depth_k: BlockNumber,
239 _pos_table: PhantomData<(PosTable, DomainBlock)>,
240}
241
242impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
243 for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
244where
245 PosTable: Table,
246 Block: BlockT,
247 Block::Hash: From<H256> + Into<H256>,
248 DomainBlock: BlockT,
249 DomainBlock::Hash: Into<H256> + From<H256>,
250 Client: BlockBackend<Block>
251 + HeaderBackend<Block>
252 + ProvideRuntimeApi<Block>
253 + Send
254 + Sync
255 + 'static,
256 Client::Api: SubspaceApi<Block, PublicKey>
257 + DomainsApi<Block, DomainBlock::Header>
258 + BundleProducerElectionApi<Block, Balance>
259 + MmrApi<Block, H256, NumberFor<Block>>
260 + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
261{
262 fn extensions_for(
263 &self,
264 _block_hash: Block::Hash,
265 _block_number: NumberFor<Block>,
266 ) -> Extensions {
267 let confirmation_depth_k = self.confirmation_depth_k;
268 let mut exts = Extensions::new();
269 exts.register(KzgExtension::new(self.kzg.clone()));
270 exts.register(PosExtension::new::<PosTable>());
271 exts.register(PotExtension::new({
272 let client = Arc::clone(&self.client);
273 let pot_verifier = self.pot_verifier.clone();
274
275 Box::new(
276 move |parent_hash, slot, proof_of_time, quick_verification| {
277 let parent_hash = {
278 let mut converted_parent_hash = Block::Hash::default();
279 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
280 converted_parent_hash
281 };
282
283 let parent_header = match client.header(parent_hash) {
284 Ok(Some(parent_header)) => parent_header,
285 Ok(None) => {
286 if quick_verification {
287 error!(
288 %parent_hash,
289 "Header not found during proof of time verification"
290 );
291
292 return false;
293 } else {
294 debug!(
295 %parent_hash,
296 "Header not found during proof of time verification"
297 );
298
299 return true;
302 }
303 }
304 Err(error) => {
305 error!(
306 %error,
307 %parent_hash,
308 "Failed to retrieve header during proof of time verification"
309 );
310
311 return false;
312 }
313 };
314
315 let parent_pre_digest = match extract_pre_digest(&parent_header) {
316 Ok(parent_pre_digest) => parent_pre_digest,
317 Err(error) => {
318 error!(
319 %error,
320 %parent_hash,
321 parent_number = %parent_header.number(),
322 "Failed to extract pre-digest from parent header during proof of \
323 time verification, this must never happen"
324 );
325
326 return false;
327 }
328 };
329
330 let parent_slot = parent_pre_digest.slot();
331 if slot <= *parent_slot {
332 return false;
333 }
334
335 let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
336 Ok(pot_parameters) => pot_parameters,
337 Err(error) => {
338 debug!(
339 %error,
340 %parent_hash,
341 parent_number = %parent_header.number(),
342 "Failed to retrieve proof of time parameters during proof of time \
343 verification"
344 );
345
346 return false;
347 }
348 };
349
350 let pot_input = if parent_header.number().is_zero() {
351 PotNextSlotInput {
352 slot: parent_slot + Slot::from(1),
353 slot_iterations: pot_parameters.slot_iterations(),
354 seed: pot_verifier.genesis_seed(),
355 }
356 } else {
357 let pot_info = parent_pre_digest.pot_info();
358
359 PotNextSlotInput::derive(
360 pot_parameters.slot_iterations(),
361 parent_slot,
362 pot_info.proof_of_time(),
363 &pot_parameters.next_parameters_change(),
364 )
365 };
366
367 if quick_verification {
371 pot_verifier.try_is_output_valid(
372 pot_input,
373 Slot::from(slot) - parent_slot,
374 proof_of_time,
375 pot_parameters.next_parameters_change(),
376 )
377 } else {
378 pot_verifier.is_output_valid(
379 pot_input,
380 Slot::from(slot) - parent_slot,
381 proof_of_time,
382 pot_parameters.next_parameters_change(),
383 )
384 }
385 },
386 )
387 }));
388
389 exts.register(FraudProofExtension::new(Arc::new(
390 FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
391 self.client.clone(),
392 self.domains_executor.clone(),
393 move |client, executor| {
394 let extension_factory =
395 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
396 client,
397 executor,
398 confirmation_depth_k,
399 );
400 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
401 },
402 ),
403 )));
404
405 exts.register(SubspaceMmrExtension::new(Arc::new(
406 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
407 self.client.clone(),
408 confirmation_depth_k,
409 ),
410 )));
411
412 exts.register(MessengerExtension::new(Arc::new(
413 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
414 self.client.clone(),
415 self.domains_executor.clone(),
416 ),
417 )));
418
419 if let Some(offchain_storage) = self.backend.offchain_storage() {
422 let offchain_db = OffchainDb::new(offchain_storage);
423 exts.register(OffchainDbExt::new(offchain_db));
424 }
425
426 exts
427 }
428}
429
430pub struct OtherPartialComponents<RuntimeApi>
432where
433 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
434{
435 pub block_import: BoxBlockImport<Block>,
437 pub subspace_link: SubspaceLink<Block>,
439 pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
441 pub pot_verifier: PotVerifier,
443 pub sync_target_block_number: Arc<AtomicU32>,
445 pub telemetry: Option<Telemetry>,
447}
448
449type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
450 FullClient<RuntimeApi>,
451 FullBackend,
452 FullSelectChain,
453 DefaultImportQueue<Block>,
454 TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
455 OtherPartialComponents<RuntimeApi>,
456>;
457
458#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
460pub fn new_partial<PosTable, RuntimeApi>(
461 config: &Configuration,
464 snap_sync: bool,
466 pot_external_entropy: &[u8],
467) -> Result<PartialComponents<RuntimeApi>, ServiceError>
468where
469 PosTable: Table,
470 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
471 RuntimeApi::RuntimeApi: ApiExt<Block>
472 + Metadata<Block>
473 + BlockBuilder<Block>
474 + OffchainWorkerApi<Block>
475 + SessionKeys<Block>
476 + TaggedTransactionQueue<Block>
477 + SubspaceApi<Block, PublicKey>
478 + DomainsApi<Block, DomainHeader>
479 + FraudProofApi<Block, DomainHeader>
480 + BundleProducerElectionApi<Block, Balance>
481 + ObjectsApi<Block>
482 + MmrApi<Block, H256, NumberFor<Block>>
483 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
484{
485 let telemetry = config
486 .telemetry_endpoints
487 .clone()
488 .filter(|x| !x.is_empty())
489 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
490 let worker = TelemetryWorker::new(16)?;
491 let telemetry = worker.handle().new_telemetry(endpoints);
492 Ok((worker, telemetry))
493 })
494 .transpose()?;
495
496 let executor = sc_service::new_wasm_executor(&config.executor);
497 let domains_executor = sc_service::new_wasm_executor(&config.executor);
498
499 let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
500 ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
501 )?;
502
503 let backend = Arc::new(sc_client_db::Backend::new(
504 config.db_config(),
505 confirmation_depth_k.into(),
506 )?);
507
508 let genesis_block_builder = GenesisBlockBuilder::new(
509 config.chain_spec.as_storage_builder(),
510 !snap_sync,
511 backend.clone(),
512 executor.clone(),
513 )?;
514
515 let (client, backend, keystore_container, task_manager) =
516 sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
517 config,
518 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
519 executor.clone(),
520 backend,
521 genesis_block_builder,
522 false,
523 )?;
524
525 let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
527 rayon::join(Kzg::new, || {
528 ErasureCoding::new(
529 NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
530 .expect("Not zero; qed"),
531 )
532 .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
533 })
534 });
535 let erasure_coding = maybe_erasure_coding?;
536
537 let client = Arc::new(client);
538 let client_info = client.info();
539 let chain_constants = client
540 .runtime_api()
541 .chain_constants(client_info.best_hash)
542 .map_err(|error| ServiceError::Application(error.into()))?;
543
544 let pot_verifier = PotVerifier::new(
545 PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
546 POT_VERIFIER_CACHE_SIZE,
547 );
548
549 assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
551
552 client
553 .execution_extensions()
554 .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
555 kzg: kzg.clone(),
556 client: Arc::clone(&client),
557 pot_verifier: pot_verifier.clone(),
558 domains_executor: Arc::new(domains_executor),
559 backend: backend.clone(),
560 confirmation_depth_k: chain_constants.confirmation_depth_k(),
561 _pos_table: PhantomData,
562 });
563
564 let telemetry = telemetry.map(|(worker, telemetry)| {
565 task_manager
566 .spawn_handle()
567 .spawn("telemetry", None, worker.run());
568 telemetry
569 });
570
571 let select_chain = sc_consensus::LongestChain::new(backend.clone());
572
573 let segment_headers_store = tokio::task::block_in_place(|| {
574 SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
575 })
576 .map_err(|error| ServiceError::Application(error.into()))?;
577
578 let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
579 let segment_headers_store = segment_headers_store.clone();
580
581 let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
582 client.clone(),
583 client.clone(),
584 subspace_link.clone(),
585 {
586 let client = client.clone();
587 let segment_headers_store = segment_headers_store.clone();
588
589 move |parent_hash, ()| {
590 let client = client.clone();
591 let segment_headers_store = segment_headers_store.clone();
592
593 async move {
594 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
595
596 let parent_header = client
597 .header(parent_hash)?
598 .expect("Parent header must always exist when block is created; qed");
599
600 let parent_block_number = parent_header.number;
601
602 let subspace_inherents =
603 sp_consensus_subspace::inherents::InherentDataProvider::new(
604 segment_headers_store
605 .segment_headers_for_block(parent_block_number + 1),
606 );
607
608 Ok((timestamp, subspace_inherents))
609 }
610 }
611 },
612 segment_headers_store.clone(),
613 pot_verifier.clone(),
614 );
615
616 let sync_target_block_number = Arc::new(AtomicU32::new(0));
617 let transaction_pool = Arc::from(
618 sc_transaction_pool::Builder::new(
619 task_manager.spawn_essential_handle(),
620 client.clone(),
621 config.role.is_authority().into(),
622 )
623 .with_options(config.transaction_pool.clone())
624 .with_prometheus(config.prometheus_registry())
625 .build(),
626 );
627
628 let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
629 client: client.clone(),
630 chain_constants,
631 kzg,
632 telemetry: telemetry.as_ref().map(|x| x.handle()),
633 reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
634 sync_target_block_number: Arc::clone(&sync_target_block_number),
635 is_authoring_blocks: config.role.is_authority(),
636 pot_verifier: pot_verifier.clone(),
637 });
638
639 let import_queue = BasicQueue::new(
640 verifier,
641 Box::new(BlockImportWrapper(block_import.clone())),
642 None,
643 &task_manager.spawn_essential_handle(),
644 config.prometheus_registry(),
645 );
646
647 let other = OtherPartialComponents {
648 block_import: Box::new(BlockImportWrapper(block_import.clone())),
649 subspace_link,
650 segment_headers_store,
651 pot_verifier,
652 sync_target_block_number,
653 telemetry,
654 };
655
656 Ok(PartialComponents {
657 client,
658 backend,
659 task_manager,
660 import_queue,
661 keystore_container,
662 select_chain,
663 transaction_pool,
664 other,
665 })
666}
667
668pub struct NewFull<Client>
670where
671 Client: ProvideRuntimeApi<Block>
672 + AuxStore
673 + BlockBackend<Block>
674 + BlockIdTo<Block>
675 + HeaderBackend<Block>
676 + HeaderMetadata<Block, Error = sp_blockchain::Error>
677 + 'static,
678 Client::Api: TaggedTransactionQueue<Block>
679 + DomainsApi<Block, DomainHeader>
680 + FraudProofApi<Block, DomainHeader>
681 + SubspaceApi<Block, PublicKey>
682 + MmrApi<Block, H256, NumberFor<Block>>
683 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
684{
685 pub task_manager: TaskManager,
687 pub client: Arc<Client>,
689 pub select_chain: FullSelectChain,
691 pub network_service: Arc<dyn NetworkService + Send + Sync>,
693 pub xdm_gossip_notification_service: Box<dyn NotificationService>,
695 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
697 pub rpc_handlers: sc_service::RpcHandlers,
699 pub backend: Arc<FullBackend>,
701 pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
703 pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
706 pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
708 pub block_importing_notification_stream:
710 SubspaceNotificationStream<BlockImportingNotification<Block>>,
711 pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
713 pub archived_segment_notification_stream:
715 SubspaceNotificationStream<ArchivedSegmentNotification>,
716 pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
718}
719
720type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
721
722pub async fn new_full<PosTable, RuntimeApi>(
724 mut config: SubspaceConfiguration,
725 partial_components: PartialComponents<RuntimeApi>,
726 prometheus_registry: Option<&mut Registry>,
727 enable_rpc_extensions: bool,
728 block_proposal_slot_portion: SlotProportion,
729 consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
730) -> Result<FullNode<RuntimeApi>, Error>
731where
732 PosTable: Table,
733 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
734 RuntimeApi::RuntimeApi: ApiExt<Block>
735 + Metadata<Block>
736 + AccountNonceApi<Block, AccountId, Nonce>
737 + BlockBuilder<Block>
738 + OffchainWorkerApi<Block>
739 + SessionKeys<Block>
740 + TaggedTransactionQueue<Block>
741 + TransactionPaymentApi<Block, Balance>
742 + SubspaceApi<Block, PublicKey>
743 + DomainsApi<Block, DomainHeader>
744 + FraudProofApi<Block, DomainHeader>
745 + ObjectsApi<Block>
746 + MmrApi<Block, Hash, BlockNumber>
747 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
748{
749 let PartialComponents {
750 client,
751 backend,
752 mut task_manager,
753 import_queue,
754 keystore_container,
755 select_chain,
756 transaction_pool,
757 other,
758 } = partial_components;
759 let OtherPartialComponents {
760 block_import,
761 subspace_link,
762 segment_headers_store,
763 pot_verifier,
764 sync_target_block_number,
765 mut telemetry,
766 } = other;
767
768 let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
769 let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
770 SubspaceNetworking::Reuse {
771 node,
772 bootstrap_nodes,
773 piece_getter,
774 } => (node, bootstrap_nodes, piece_getter),
775 SubspaceNetworking::Create { config: dsn_config } => {
776 let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
777
778 debug!(
779 chain_type=?config.base.chain_spec.chain_type(),
780 genesis_hash=%hex::encode(client.chain_info().genesis_hash),
781 "Setting DSN protocol version..."
782 );
783
784 let out_connections = dsn_config.max_out_connections;
785 let (node, mut node_runner) = create_dsn_instance(
786 dsn_protocol_version,
787 dsn_config.clone(),
788 prometheus_registry,
789 )?;
790
791 info!("Subspace networking initialized: Node ID is {}", node.id());
792
793 node.on_new_listener(Arc::new({
794 let node = node.clone();
795
796 move |address| {
797 info!(
798 "DSN listening on {}",
799 address.clone().with(Protocol::P2p(node.id()))
800 );
801 }
802 }))
803 .detach();
804
805 task_manager
806 .spawn_essential_handle()
807 .spawn_essential_blocking(
808 "node-runner",
809 Some("subspace-networking"),
810 Box::pin(
811 async move {
812 node_runner.run().await;
813 }
814 .in_current_span(),
815 ),
816 );
817
818 let piece_provider = PieceProvider::new(
819 node.clone(),
820 SegmentCommitmentPieceValidator::new(
821 node.clone(),
822 subspace_link.kzg().clone(),
823 segment_headers_store.clone(),
824 ),
825 Arc::new(Semaphore::new(
826 out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
827 )),
828 );
829
830 (
831 node,
832 dsn_config.bootstrap_nodes,
833 Arc::new(DsnPieceGetter::new(piece_provider)) as _,
834 )
835 }
836 };
837
838 let dsn_bootstrap_nodes = {
839 if bootstrap_nodes.is_empty() {
842 let (node_address_sender, node_address_receiver) = oneshot::channel();
843 let _handler = node.on_new_listener(Arc::new({
844 let node_address_sender = Mutex::new(Some(node_address_sender));
845
846 move |address| {
847 if matches!(address.iter().next(), Some(Protocol::Ip4(_)))
848 && let Some(node_address_sender) = node_address_sender.lock().take()
849 && let Err(err) = node_address_sender.send(address.clone())
850 {
851 debug!(?err, "Couldn't send a node address to the channel.");
852 }
853 }
854 }));
855
856 let mut node_listeners = node.listeners();
857
858 if node_listeners.is_empty() {
859 let Ok(listener) = node_address_receiver.await else {
860 return Err(Error::Other(
861 "Oneshot receiver dropped before DSN node listener was ready"
862 .to_string()
863 .into(),
864 ));
865 };
866
867 node_listeners = vec![listener];
868 }
869
870 node_listeners.iter_mut().for_each(|multiaddr| {
871 multiaddr.push(Protocol::P2p(node.id()));
872 });
873
874 node_listeners
875 } else {
876 bootstrap_nodes
877 }
878 };
879
880 let substrate_prometheus_registry = config
881 .base
882 .prometheus_config
883 .as_ref()
884 .map(|prometheus_config| prometheus_config.registry.clone());
885 let import_queue_service1 = import_queue.service();
886 let import_queue_service2 = import_queue.service();
887 let network_wrapper = Arc::new(NetworkWrapper::default());
888 let block_relay = build_consensus_relay(
889 network_wrapper.clone(),
890 client.clone(),
891 transaction_pool.clone(),
892 substrate_prometheus_registry.as_ref(),
893 )
894 .map_err(Error::BlockRelay)?;
895 let mut net_config = sc_network::config::FullNetworkConfiguration::new(
896 &config.base.network,
897 substrate_prometheus_registry.clone(),
898 );
899 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
900 xdm_gossip_peers_set_config();
901 net_config.add_notification_protocol(xdm_gossip_notification_config);
902 let (pot_gossip_notification_config, pot_gossip_notification_service) =
903 pot_gossip_peers_set_config();
904 net_config.add_notification_protocol(pot_gossip_notification_config);
905 let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
906
907 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
908 + net_config
909 .network_config
910 .default_peers_set
911 .reserved_nodes
912 .len();
913
914 let protocol_id = config.base.protocol_id();
915 let fork_id = config.base.chain_spec.fork_id();
916
917 let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
919 NetworkWorker<Block, BlockHashFor<Block>>,
920 >(fork_id, client.clone(), num_peer_hint);
921 task_manager.spawn_handle().spawn(
922 "domain-block-er-request-handler",
923 Some("networking"),
924 handler.run(),
925 );
926 net_config.add_request_response_protocol(protocol_config);
927
928 if let Some(offchain_storage) = backend.offchain_storage() {
929 let (handler, protocol_config) =
931 MmrRequestHandler::new::<NetworkWorker<Block, BlockHashFor<Block>>>(
932 &config.base.protocol_id(),
933 fork_id,
934 client.clone(),
935 num_peer_hint,
936 offchain_storage,
937 );
938 task_manager
939 .spawn_handle()
940 .spawn("mmr-request-handler", Some("networking"), handler.run());
941
942 net_config.add_request_response_protocol(protocol_config);
943 }
944
945 let network_service_provider = NetworkServiceProvider::new();
946 let network_service_handle = network_service_provider.handle();
947 let (network_service, system_rpc_tx, tx_handler_controller, sync_service) = {
948 let spawn_handle = task_manager.spawn_handle();
949 let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
950
951 let block_downloader = {
953 let BlockRelayParams {
954 mut server,
955 downloader,
956 request_response_config,
957 } = block_relay;
958 net_config.add_request_response_protocol(request_response_config);
959
960 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
961 server.run().await;
962 });
963
964 downloader
965 };
966
967 let syncing_strategy = build_polkadot_syncing_strategy(
968 protocol_id.clone(),
969 fork_id,
970 &mut net_config,
971 None,
972 block_downloader,
973 client.clone(),
974 &spawn_handle,
975 substrate_prometheus_registry.as_ref(),
976 )?;
977
978 let (syncing_engine, sync_service, block_announce_config) =
979 SyncingEngine::new::<NetworkWorker<_, _>>(
980 Roles::from(&config.base.role),
981 Arc::clone(&client),
982 substrate_prometheus_registry.as_ref(),
983 metrics.clone(),
984 &net_config,
985 protocol_id.clone(),
986 fork_id,
987 Box::new(DefaultBlockAnnounceValidator),
988 syncing_strategy,
989 network_service_provider.handle(),
990 import_queue.service(),
991 net_config.peer_store_handle(),
992 config.base.network.force_synced,
993 )
994 .map_err(sc_service::Error::from)?;
995
996 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
997
998 build_network_advanced(BuildNetworkAdvancedParams {
999 role: config.base.role,
1000 protocol_id,
1001 fork_id,
1002 ipfs_server: config.base.network.ipfs_server,
1003 announce_block: config.base.announce_block,
1004 net_config,
1005 client: Arc::clone(&client),
1006 transaction_pool: Arc::clone(&transaction_pool),
1007 spawn_handle,
1008 import_queue,
1009 sync_service,
1010 block_announce_config,
1011 network_service_provider,
1012 metrics_registry: substrate_prometheus_registry.as_ref(),
1013 metrics,
1014 })?
1015 };
1016
1017 task_manager.spawn_handle().spawn(
1018 "sync-target-follower",
1019 None,
1020 Box::pin({
1021 let sync_service = sync_service.clone();
1022 let sync_target_block_number = Arc::clone(&sync_target_block_number);
1023
1024 async move {
1025 loop {
1026 let best_seen_block = sync_service
1027 .status()
1028 .await
1029 .map(|status| status.best_seen_block.unwrap_or_default())
1030 .unwrap_or_default();
1031 sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1032
1033 tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1034 }
1035 }
1036 }),
1037 );
1038
1039 let sync_oracle = SubspaceSyncOracle::new(
1040 config.base.force_authoring,
1041 Arc::clone(&pause_sync),
1042 sync_service.clone(),
1043 );
1044
1045 let subspace_archiver = tokio::task::block_in_place(|| {
1046 create_subspace_archiver(
1047 segment_headers_store.clone(),
1048 subspace_link.clone(),
1049 client.clone(),
1050 sync_oracle.clone(),
1051 telemetry.as_ref().map(|telemetry| telemetry.handle()),
1052 config.create_object_mappings,
1053 )
1054 })
1055 .map_err(ServiceError::Client)?;
1056
1057 task_manager
1058 .spawn_essential_handle()
1059 .spawn_essential_blocking(
1060 "subspace-archiver",
1061 None,
1062 Box::pin(async move {
1063 if let Err(error) = subspace_archiver.await {
1064 error!(%error, "Archiver exited with error");
1065 }
1066 }),
1067 );
1068
1069 network_wrapper.set(network_service.clone());
1070
1071 if !config.base.network.force_synced {
1072 pause_sync.store(true, Ordering::Release);
1074 }
1075
1076 let snap_sync_task = snap_sync(
1077 segment_headers_store.clone(),
1078 node.clone(),
1079 fork_id.map(|fork_id| fork_id.to_string()),
1080 Arc::clone(&client),
1081 import_queue_service1,
1082 pause_sync.clone(),
1083 piece_getter.clone(),
1084 sync_service.clone(),
1085 network_service_handle,
1086 subspace_link.erasure_coding().clone(),
1087 consensus_snap_sync_target_block_receiver,
1088 backend.offchain_storage(),
1089 network_service.clone(),
1090 );
1091
1092 let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1093 segment_headers_store.clone(),
1094 Arc::clone(&network_service),
1095 node.clone(),
1096 Arc::clone(&client),
1097 import_queue_service2,
1098 sync_service.clone(),
1099 sync_target_block_number,
1100 pause_sync,
1101 piece_getter,
1102 subspace_link.erasure_coding().clone(),
1103 );
1104 task_manager
1105 .spawn_handle()
1106 .spawn("observer", Some("sync-from-dsn"), observer);
1107 task_manager
1108 .spawn_essential_handle()
1109 .spawn_essential_blocking(
1110 "worker",
1111 Some("sync-from-dsn"),
1112 Box::pin(async move {
1113 if config.sync == ChainSyncMode::Snap
1115 && let Err(error) = snap_sync_task.in_current_span().await
1116 {
1117 error!(%error, "Snap sync exited with a fatal error");
1118 return;
1119 }
1120
1121 if let Err(error) = worker.await {
1122 error!(%error, "Sync from DSN exited with an error");
1123 }
1124 }),
1125 );
1126
1127 if let Some(registry) = substrate_prometheus_registry.as_ref() {
1128 match NodeMetrics::new(
1129 client.clone(),
1130 client.every_import_notification_stream(),
1131 registry,
1132 ) {
1133 Ok(node_metrics) => {
1134 task_manager.spawn_handle().spawn(
1135 "node_metrics",
1136 None,
1137 Box::pin(async move {
1138 node_metrics.run().await;
1139 }),
1140 );
1141 }
1142 Err(err) => {
1143 error!("Failed to initialize node metrics: {err:?}");
1144 }
1145 }
1146 }
1147
1148 let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1149
1150 if config.base.offchain_worker.enabled {
1151 let offchain_workers =
1152 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1153 runtime_api_provider: client.clone(),
1154 is_validator: config.base.role.is_authority(),
1155 keystore: Some(keystore_container.keystore()),
1156 offchain_db: backend.offchain_storage(),
1157 transaction_pool: Some(offchain_tx_pool_factory.clone()),
1158 network_provider: Arc::new(network_service.clone()),
1159 enable_http_requests: true,
1160 custom_extensions: |_| vec![],
1161 })?;
1162 task_manager.spawn_handle().spawn(
1163 "offchain-workers-runner",
1164 "offchain-worker",
1165 offchain_workers
1166 .run(client.clone(), task_manager.spawn_handle())
1167 .boxed(),
1168 );
1169 }
1170
1171 if offchain_indexing_enabled {
1173 task_manager.spawn_essential_handle().spawn_blocking(
1174 "mmr-gadget",
1175 None,
1176 mmr_gadget::MmrGadget::start(
1177 client.clone(),
1178 backend.clone(),
1179 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1180 ),
1181 );
1182 }
1183
1184 let backoff_authoring_blocks: Option<()> = None;
1185
1186 let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1187 let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1188 let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1189 let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1190 let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1191
1192 let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1193 config.is_timekeeper,
1194 config.timekeeper_cpu_cores,
1195 client.clone(),
1196 pot_verifier.clone(),
1197 Arc::clone(&network_service),
1198 pot_gossip_notification_service,
1199 sync_service.clone(),
1200 sync_oracle.clone(),
1201 )
1202 .map_err(|error| Error::Other(error.into()))?;
1203
1204 let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1205
1206 task_manager
1207 .spawn_essential_handle()
1208 .spawn("pot-source", Some("pot"), pot_source_worker.run());
1209 task_manager
1210 .spawn_essential_handle()
1211 .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1212
1213 if config.base.role.is_authority() || config.force_new_slot_notifications {
1214 let proposer_factory = ProposerFactory::new(
1215 task_manager.spawn_handle(),
1216 client.clone(),
1217 transaction_pool.clone(),
1218 substrate_prometheus_registry.as_ref(),
1219 telemetry.as_ref().map(|x| x.handle()),
1220 );
1221
1222 let subspace_slot_worker =
1223 SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1224 client: client.clone(),
1225 env: proposer_factory,
1226 block_import,
1227 sync_oracle: sync_oracle.clone(),
1228 justification_sync_link: sync_service.clone(),
1229 force_authoring: config.base.force_authoring,
1230 backoff_authoring_blocks,
1231 subspace_link: subspace_link.clone(),
1232 segment_headers_store: segment_headers_store.clone(),
1233 block_proposal_slot_portion,
1234 max_block_proposal_slot_portion: None,
1235 telemetry: telemetry.as_ref().map(|x| x.handle()),
1236 offchain_tx_pool_factory,
1237 pot_verifier,
1238 });
1239
1240 let create_inherent_data_providers = {
1241 let client = client.clone();
1242 let segment_headers_store = segment_headers_store.clone();
1243
1244 move |parent_hash, ()| {
1245 let client = client.clone();
1246 let segment_headers_store = segment_headers_store.clone();
1247
1248 async move {
1249 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1250
1251 let parent_header = client
1252 .header(parent_hash)?
1253 .expect("Parent header must always exist when block is created; qed");
1254
1255 let parent_block_number = parent_header.number;
1256
1257 let subspace_inherents =
1258 sp_consensus_subspace::inherents::InherentDataProvider::new(
1259 segment_headers_store
1260 .segment_headers_for_block(parent_block_number + 1),
1261 );
1262
1263 Ok((timestamp, subspace_inherents))
1264 }
1265 }
1266 };
1267
1268 info!("🧑🌾 Starting Subspace Authorship worker");
1269 let slot_worker_task = sc_proof_of_time::start_slot_worker(
1270 subspace_link.chain_constants().slot_duration(),
1271 client.clone(),
1272 select_chain.clone(),
1273 subspace_slot_worker,
1274 sync_oracle.clone(),
1275 create_inherent_data_providers,
1276 pot_slot_info_stream,
1277 );
1278
1279 task_manager.spawn_essential_handle().spawn_blocking(
1282 "subspace-proposer",
1283 Some("block-authoring"),
1284 slot_worker_task,
1285 );
1286 }
1287
1288 config.base.prometheus_config.take();
1290
1291 let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1292 network: network_service.clone(),
1293 client: client.clone(),
1294 keystore: keystore_container.keystore(),
1295 task_manager: &mut task_manager,
1296 transaction_pool: transaction_pool.clone(),
1297 rpc_builder: if enable_rpc_extensions {
1298 let client = client.clone();
1299 let new_slot_notification_stream = new_slot_notification_stream.clone();
1300 let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1301 let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1302 let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1303 let transaction_pool = transaction_pool.clone();
1304 let backend = backend.clone();
1305
1306 #[allow(clippy::result_large_err)]
1307 Box::new(move |subscription_executor| {
1308 let deps = rpc::FullDeps {
1309 client: client.clone(),
1310 pool: transaction_pool.clone(),
1311 subscription_executor,
1312 new_slot_notification_stream: new_slot_notification_stream.clone(),
1313 reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1314 object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1315 archived_segment_notification_stream: archived_segment_notification_stream
1316 .clone(),
1317 dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1318 segment_headers_store: segment_headers_store.clone(),
1319 sync_oracle: sync_oracle.clone(),
1320 kzg: subspace_link.kzg().clone(),
1321 erasure_coding: subspace_link.erasure_coding().clone(),
1322 backend: backend.clone(),
1323 };
1324
1325 rpc::create_full(deps).map_err(Into::into)
1326 })
1327 } else {
1328 #[allow(clippy::result_large_err)]
1329 Box::new(|_| Ok(RpcModule::new(())))
1330 },
1331 backend: backend.clone(),
1332 system_rpc_tx,
1333 config: config.base,
1334 telemetry: telemetry.as_mut(),
1335 tx_handler_controller,
1336 sync_service: sync_service.clone(),
1337 tracing_execute_block: None,
1339 })?;
1340
1341 Ok(NewFull {
1342 task_manager,
1343 client,
1344 select_chain,
1345 network_service,
1346 xdm_gossip_notification_service,
1347 sync_service,
1348 rpc_handlers,
1349 backend,
1350 pot_slot_info_stream: additional_pot_slot_info_stream,
1351 new_slot_notification_stream,
1352 reward_signing_notification_stream,
1353 block_importing_notification_stream,
1354 object_mapping_notification_stream,
1355 archived_segment_notification_stream,
1356 transaction_pool,
1357 })
1358}
1359
1360fn confirmation_depth_storage_key() -> StorageKey {
1362 StorageKey(
1363 frame_support::storage::storage_prefix(
1364 "RuntimeConfigs".as_bytes(),
1367 "ConfirmationDepthK".as_bytes(),
1369 )
1370 .to_vec(),
1371 )
1372}
1373
1374fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1375 let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1376 let spec: serde_json::Value =
1377 serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1378 let encoded_confirmation_depth = hex::decode(
1379 spec.pointer(format!("/genesis/raw/top/{storage_key}").as_str())?
1380 .as_str()?
1381 .trim_start_matches("0x"),
1382 )
1383 .ok()?;
1384 u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1385}
1386
1387#[cfg(test)]
1388mod test {
1389 use static_assertions::const_assert_eq;
1390 use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1391 use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1392
1393 #[test]
1396 fn max_block_length_consistent() {
1397 const_assert_eq!(
1398 CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1399 ARCHIVER_MAX_BLOCK_LENGTH,
1400 );
1401 }
1402}