1#![feature(
3 duration_constructors_lite,
4 impl_trait_in_assoc_type,
5 int_roundings,
6 type_alias_impl_trait,
7 type_changing_struct_update
8)]
9
10pub mod config;
11pub mod dsn;
12mod metrics;
13pub(crate) mod mmr;
14pub mod rpc;
15pub mod sync_from_dsn;
16mod task_spawner;
17mod utils;
18
19use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
20use crate::dsn::{DsnConfigurationError, create_dsn_instance};
21use crate::metrics::NodeMetrics;
22use crate::mmr::request_handler::MmrRequestHandler;
23use crate::sync_from_dsn::DsnPieceGetter;
24use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
25use crate::sync_from_dsn::snap_sync::snap_sync;
26use async_lock::Semaphore;
27use core::sync::atomic::{AtomicU32, Ordering};
28use cross_domain_message_gossip::xdm_gossip_peers_set_config;
29use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
30use frame_system_rpc_runtime_api::AccountNonceApi;
31use futures::FutureExt;
32use futures::channel::oneshot;
33use jsonrpsee::RpcModule;
34use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
35use parity_scale_codec::Decode;
36use parking_lot::Mutex;
37use prometheus_client::registry::Registry;
38use sc_basic_authorship::ProposerFactory;
39use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
40use sc_client_api::execution_extensions::ExtensionsFactory;
41use sc_client_api::{
42 AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
43};
44use sc_consensus::{
45 BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
46 DefaultImportQueue, ImportQueue, ImportResult,
47};
48use sc_consensus_slots::SlotProportion;
49use sc_consensus_subspace::SubspaceLink;
50use sc_consensus_subspace::archiver::{
51 ArchivedSegmentNotification, ObjectMappingNotification, SegmentHeadersStore,
52 create_subspace_archiver,
53};
54use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
55use sc_consensus_subspace::notification::SubspaceNotificationStream;
56use sc_consensus_subspace::slot_worker::{
57 NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
58 SubspaceSyncOracle,
59};
60use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
61use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
62use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
63use sc_network::service::traits::NetworkService;
64use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
65use sc_network_sync::block_relay_protocol::BlockRelayParams;
66use sc_network_sync::engine::SyncingEngine;
67use sc_network_sync::service::network::NetworkServiceProvider;
68use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
69use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
70use sc_proof_of_time::verifier::PotVerifier;
71use sc_service::error::Error as ServiceError;
72use sc_service::{
73 BuildNetworkAdvancedParams, Configuration, NetworkStarter, SpawnTasksParams, TaskManager,
74 build_network_advanced, build_polkadot_syncing_strategy,
75};
76use sc_subspace_block_relay::{
77 BlockRelayConfigurationError, NetworkWrapper, build_consensus_relay,
78};
79use sc_telemetry::{Telemetry, TelemetryWorker};
80use sc_transaction_pool::TransactionPoolHandle;
81use sc_transaction_pool_api::OffchainTransactionPoolFactory;
82use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
83use sp_block_builder::BlockBuilder;
84use sp_blockchain::HeaderMetadata;
85use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
86use sp_consensus_slots::Slot;
87use sp_consensus_subspace::digests::extract_pre_digest;
88use sp_consensus_subspace::{
89 KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
90};
91use sp_core::H256;
92use sp_core::offchain::OffchainDbExt;
93use sp_core::offchain::storage::OffchainDb;
94use sp_core::traits::SpawnEssentialNamed;
95use sp_domains::storage::StorageKey;
96use sp_domains::{BundleProducerElectionApi, DomainsApi};
97use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
98use sp_externalities::Extensions;
99use sp_messenger::MessengerApi;
100use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
101use sp_mmr_primitives::MmrApi;
102use sp_objects::ObjectsApi;
103use sp_offchain::OffchainWorkerApi;
104use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
105use sp_session::SessionKeys;
106use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
107use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
108use static_assertions::const_assert;
109use std::marker::PhantomData;
110use std::num::NonZeroUsize;
111use std::sync::Arc;
112use std::time::Duration;
113use subspace_core_primitives::pieces::Record;
114use subspace_core_primitives::pot::PotSeed;
115use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
116use subspace_erasure_coding::ErasureCoding;
117use subspace_kzg::Kzg;
118use subspace_networking::libp2p::multiaddr::Protocol;
119use subspace_networking::utils::piece_provider::PieceProvider;
120use subspace_proof_of_space::Table;
121use subspace_runtime_primitives::opaque::Block;
122use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, Hash, Nonce};
123use tokio::sync::broadcast;
124use tracing::{Instrument, debug, error, info};
125pub use utils::wait_for_block_import;
126
127const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
130
131const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
134const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
135const PIECE_PROVIDER_MULTIPLIER: usize = 10;
137
138#[derive(thiserror::Error, Debug)]
140pub enum Error {
141 #[error(transparent)]
143 Io(#[from] std::io::Error),
144
145 #[error(transparent)]
147 AddrFormatInvalid(#[from] std::net::AddrParseError),
148
149 #[error(transparent)]
151 Sub(#[from] sc_service::Error),
152
153 #[error(transparent)]
155 Consensus(#[from] sp_consensus::Error),
156
157 #[error(transparent)]
159 Telemetry(#[from] sc_telemetry::Error),
160
161 #[error(transparent)]
163 SubspaceDsn(#[from] DsnConfigurationError),
164
165 #[error(transparent)]
167 BlockRelay(#[from] BlockRelayConfigurationError),
168
169 #[error(transparent)]
171 Other(Box<dyn std::error::Error + Send + Sync>),
172}
173
174#[derive(Clone)]
176struct BlockImportWrapper<BI>(BI);
177
178#[async_trait::async_trait]
179impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
180where
181 Block: BlockT,
182 BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
183 + Send
184 + Sync,
185{
186 type Error = sp_consensus::Error;
187
188 async fn check_block(
189 &self,
190 block: BlockCheckParams<Block>,
191 ) -> Result<ImportResult, Self::Error> {
192 self.0
193 .check_block(block)
194 .await
195 .map_err(|error| sp_consensus::Error::Other(error.into()))
196 }
197
198 async fn import_block(
199 &self,
200 block: BlockImportParams<Block>,
201 ) -> Result<ImportResult, Self::Error> {
202 self.0
203 .import_block(block)
204 .await
205 .map_err(|error| sp_consensus::Error::Other(error.into()))
206 }
207}
208
209#[cfg(not(feature = "runtime-benchmarks"))]
211pub type HostFunctions = (
212 sp_io::SubstrateHostFunctions,
213 sp_consensus_subspace::consensus::HostFunctions,
214 sp_domains_fraud_proof::HostFunctions,
215 sp_subspace_mmr::HostFunctions,
216 sp_messenger_host_functions::HostFunctions,
217);
218
219#[cfg(feature = "runtime-benchmarks")]
221pub type HostFunctions = (
222 sp_io::SubstrateHostFunctions,
223 frame_benchmarking::benchmarking::HostFunctions,
224 sp_consensus_subspace::consensus::HostFunctions,
225 sp_domains_fraud_proof::HostFunctions,
226 sp_subspace_mmr::HostFunctions,
227 sp_messenger_host_functions::HostFunctions,
228);
229
230pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
232
233pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
235
236pub type FullBackend = sc_service::TFullBackend<Block>;
237pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
238
239struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
240 kzg: Kzg,
241 client: Arc<Client>,
242 backend: Arc<FullBackend>,
243 pot_verifier: PotVerifier,
244 domains_executor: Arc<sc_domains::RuntimeExecutor>,
245 confirmation_depth_k: BlockNumber,
246 _pos_table: PhantomData<(PosTable, DomainBlock)>,
247}
248
249impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
250 for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
251where
252 PosTable: Table,
253 Block: BlockT,
254 Block::Hash: From<H256> + Into<H256>,
255 DomainBlock: BlockT,
256 DomainBlock::Hash: Into<H256> + From<H256>,
257 Client: BlockBackend<Block>
258 + HeaderBackend<Block>
259 + ProvideRuntimeApi<Block>
260 + Send
261 + Sync
262 + 'static,
263 Client::Api: SubspaceApi<Block, PublicKey>
264 + DomainsApi<Block, DomainBlock::Header>
265 + BundleProducerElectionApi<Block, Balance>
266 + MmrApi<Block, H256, NumberFor<Block>>
267 + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
268{
269 fn extensions_for(
270 &self,
271 _block_hash: Block::Hash,
272 _block_number: NumberFor<Block>,
273 ) -> Extensions {
274 let confirmation_depth_k = self.confirmation_depth_k;
275 let mut exts = Extensions::new();
276 exts.register(KzgExtension::new(self.kzg.clone()));
277 exts.register(PosExtension::new::<PosTable>());
278 exts.register(PotExtension::new({
279 let client = Arc::clone(&self.client);
280 let pot_verifier = self.pot_verifier.clone();
281
282 Box::new(
283 move |parent_hash, slot, proof_of_time, quick_verification| {
284 let parent_hash = {
285 let mut converted_parent_hash = Block::Hash::default();
286 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
287 converted_parent_hash
288 };
289
290 let parent_header = match client.header(parent_hash) {
291 Ok(Some(parent_header)) => parent_header,
292 Ok(None) => {
293 if quick_verification {
294 error!(
295 %parent_hash,
296 "Header not found during proof of time verification"
297 );
298
299 return false;
300 } else {
301 debug!(
302 %parent_hash,
303 "Header not found during proof of time verification"
304 );
305
306 return true;
309 }
310 }
311 Err(error) => {
312 error!(
313 %error,
314 %parent_hash,
315 "Failed to retrieve header during proof of time verification"
316 );
317
318 return false;
319 }
320 };
321
322 let parent_pre_digest = match extract_pre_digest(&parent_header) {
323 Ok(parent_pre_digest) => parent_pre_digest,
324 Err(error) => {
325 error!(
326 %error,
327 %parent_hash,
328 parent_number = %parent_header.number(),
329 "Failed to extract pre-digest from parent header during proof of \
330 time verification, this must never happen"
331 );
332
333 return false;
334 }
335 };
336
337 let parent_slot = parent_pre_digest.slot();
338 if slot <= *parent_slot {
339 return false;
340 }
341
342 let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
343 Ok(pot_parameters) => pot_parameters,
344 Err(error) => {
345 debug!(
346 %error,
347 %parent_hash,
348 parent_number = %parent_header.number(),
349 "Failed to retrieve proof of time parameters during proof of time \
350 verification"
351 );
352
353 return false;
354 }
355 };
356
357 let pot_input = if parent_header.number().is_zero() {
358 PotNextSlotInput {
359 slot: parent_slot + Slot::from(1),
360 slot_iterations: pot_parameters.slot_iterations(),
361 seed: pot_verifier.genesis_seed(),
362 }
363 } else {
364 let pot_info = parent_pre_digest.pot_info();
365
366 PotNextSlotInput::derive(
367 pot_parameters.slot_iterations(),
368 parent_slot,
369 pot_info.proof_of_time(),
370 &pot_parameters.next_parameters_change(),
371 )
372 };
373
374 if quick_verification {
378 pot_verifier.try_is_output_valid(
379 pot_input,
380 Slot::from(slot) - parent_slot,
381 proof_of_time,
382 pot_parameters.next_parameters_change(),
383 )
384 } else {
385 pot_verifier.is_output_valid(
386 pot_input,
387 Slot::from(slot) - parent_slot,
388 proof_of_time,
389 pot_parameters.next_parameters_change(),
390 )
391 }
392 },
393 )
394 }));
395
396 exts.register(FraudProofExtension::new(Arc::new(
397 FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
398 self.client.clone(),
399 self.domains_executor.clone(),
400 move |client, executor| {
401 let extension_factory =
402 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
403 client,
404 executor,
405 confirmation_depth_k,
406 );
407 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
408 },
409 ),
410 )));
411
412 exts.register(SubspaceMmrExtension::new(Arc::new(
413 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
414 self.client.clone(),
415 confirmation_depth_k,
416 ),
417 )));
418
419 exts.register(MessengerExtension::new(Arc::new(
420 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
421 self.client.clone(),
422 self.domains_executor.clone(),
423 ),
424 )));
425
426 if let Some(offchain_storage) = self.backend.offchain_storage() {
429 let offchain_db = OffchainDb::new(offchain_storage);
430 exts.register(OffchainDbExt::new(offchain_db));
431 }
432
433 exts
434 }
435}
436
437pub struct OtherPartialComponents<RuntimeApi>
439where
440 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
441{
442 pub block_import: BoxBlockImport<Block>,
444 pub subspace_link: SubspaceLink<Block>,
446 pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
448 pub pot_verifier: PotVerifier,
450 pub sync_target_block_number: Arc<AtomicU32>,
452 pub telemetry: Option<Telemetry>,
454}
455
456type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
457 FullClient<RuntimeApi>,
458 FullBackend,
459 FullSelectChain,
460 DefaultImportQueue<Block>,
461 TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
462 OtherPartialComponents<RuntimeApi>,
463>;
464
465#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
467pub fn new_partial<PosTable, RuntimeApi>(
468 config: &Configuration,
471 snap_sync: bool,
473 pot_external_entropy: &[u8],
474) -> Result<PartialComponents<RuntimeApi>, ServiceError>
475where
476 PosTable: Table,
477 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
478 RuntimeApi::RuntimeApi: ApiExt<Block>
479 + Metadata<Block>
480 + BlockBuilder<Block>
481 + OffchainWorkerApi<Block>
482 + SessionKeys<Block>
483 + TaggedTransactionQueue<Block>
484 + SubspaceApi<Block, PublicKey>
485 + DomainsApi<Block, DomainHeader>
486 + FraudProofApi<Block, DomainHeader>
487 + BundleProducerElectionApi<Block, Balance>
488 + ObjectsApi<Block>
489 + MmrApi<Block, H256, NumberFor<Block>>
490 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
491{
492 let telemetry = config
493 .telemetry_endpoints
494 .clone()
495 .filter(|x| !x.is_empty())
496 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
497 let worker = TelemetryWorker::new(16)?;
498 let telemetry = worker.handle().new_telemetry(endpoints);
499 Ok((worker, telemetry))
500 })
501 .transpose()?;
502
503 let executor = sc_service::new_wasm_executor(&config.executor);
504 let domains_executor = sc_service::new_wasm_executor(&config.executor);
505
506 let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
507 ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
508 )?;
509
510 let backend = Arc::new(sc_client_db::Backend::new(
511 config.db_config(),
512 confirmation_depth_k.into(),
513 )?);
514
515 let genesis_block_builder = GenesisBlockBuilder::new(
516 config.chain_spec.as_storage_builder(),
517 !snap_sync,
518 backend.clone(),
519 executor.clone(),
520 )?;
521
522 let (client, backend, keystore_container, task_manager) =
523 sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
524 config,
525 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
526 executor.clone(),
527 backend,
528 genesis_block_builder,
529 false,
530 )?;
531
532 let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
534 rayon::join(Kzg::new, || {
535 ErasureCoding::new(
536 NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
537 .expect("Not zero; qed"),
538 )
539 .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
540 })
541 });
542 let erasure_coding = maybe_erasure_coding?;
543
544 let client = Arc::new(client);
545 let client_info = client.info();
546 let chain_constants = client
547 .runtime_api()
548 .chain_constants(client_info.best_hash)
549 .map_err(|error| ServiceError::Application(error.into()))?;
550
551 let pot_verifier = PotVerifier::new(
552 PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
553 POT_VERIFIER_CACHE_SIZE,
554 );
555
556 assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
558
559 client
560 .execution_extensions()
561 .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
562 kzg: kzg.clone(),
563 client: Arc::clone(&client),
564 pot_verifier: pot_verifier.clone(),
565 domains_executor: Arc::new(domains_executor),
566 backend: backend.clone(),
567 confirmation_depth_k: chain_constants.confirmation_depth_k(),
568 _pos_table: PhantomData,
569 });
570
571 let telemetry = telemetry.map(|(worker, telemetry)| {
572 task_manager
573 .spawn_handle()
574 .spawn("telemetry", None, worker.run());
575 telemetry
576 });
577
578 let select_chain = sc_consensus::LongestChain::new(backend.clone());
579
580 let segment_headers_store = tokio::task::block_in_place(|| {
581 SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
582 })
583 .map_err(|error| ServiceError::Application(error.into()))?;
584
585 let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
586 let segment_headers_store = segment_headers_store.clone();
587
588 let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
589 client.clone(),
590 client.clone(),
591 subspace_link.clone(),
592 {
593 let client = client.clone();
594 let segment_headers_store = segment_headers_store.clone();
595
596 move |parent_hash, ()| {
597 let client = client.clone();
598 let segment_headers_store = segment_headers_store.clone();
599
600 async move {
601 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
602
603 let parent_header = client
604 .header(parent_hash)?
605 .expect("Parent header must always exist when block is created; qed");
606
607 let parent_block_number = parent_header.number;
608
609 let subspace_inherents =
610 sp_consensus_subspace::inherents::InherentDataProvider::new(
611 segment_headers_store
612 .segment_headers_for_block(parent_block_number + 1),
613 );
614
615 Ok((timestamp, subspace_inherents))
616 }
617 }
618 },
619 segment_headers_store.clone(),
620 pot_verifier.clone(),
621 );
622
623 let sync_target_block_number = Arc::new(AtomicU32::new(0));
624 let transaction_pool = Arc::from(
625 sc_transaction_pool::Builder::new(
626 task_manager.spawn_essential_handle(),
627 client.clone(),
628 config.role.is_authority().into(),
629 )
630 .with_options(config.transaction_pool.clone())
631 .with_prometheus(config.prometheus_registry())
632 .build(),
633 );
634
635 let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
636 client: client.clone(),
637 chain_constants,
638 kzg,
639 telemetry: telemetry.as_ref().map(|x| x.handle()),
640 reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
641 sync_target_block_number: Arc::clone(&sync_target_block_number),
642 is_authoring_blocks: config.role.is_authority(),
643 pot_verifier: pot_verifier.clone(),
644 });
645
646 let import_queue = BasicQueue::new(
647 verifier,
648 Box::new(BlockImportWrapper(block_import.clone())),
649 None,
650 &task_manager.spawn_essential_handle(),
651 config.prometheus_registry(),
652 );
653
654 let other = OtherPartialComponents {
655 block_import: Box::new(BlockImportWrapper(block_import.clone())),
656 subspace_link,
657 segment_headers_store,
658 pot_verifier,
659 sync_target_block_number,
660 telemetry,
661 };
662
663 Ok(PartialComponents {
664 client,
665 backend,
666 task_manager,
667 import_queue,
668 keystore_container,
669 select_chain,
670 transaction_pool,
671 other,
672 })
673}
674
675pub struct NewFull<Client>
677where
678 Client: ProvideRuntimeApi<Block>
679 + AuxStore
680 + BlockBackend<Block>
681 + BlockIdTo<Block>
682 + HeaderBackend<Block>
683 + HeaderMetadata<Block, Error = sp_blockchain::Error>
684 + 'static,
685 Client::Api: TaggedTransactionQueue<Block>
686 + DomainsApi<Block, DomainHeader>
687 + FraudProofApi<Block, DomainHeader>
688 + SubspaceApi<Block, PublicKey>
689 + MmrApi<Block, H256, NumberFor<Block>>
690 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
691{
692 pub task_manager: TaskManager,
694 pub client: Arc<Client>,
696 pub select_chain: FullSelectChain,
698 pub network_service: Arc<dyn NetworkService + Send + Sync>,
700 pub xdm_gossip_notification_service: Box<dyn NotificationService>,
702 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
704 pub rpc_handlers: sc_service::RpcHandlers,
706 pub backend: Arc<FullBackend>,
708 pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
710 pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
713 pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
715 pub block_importing_notification_stream:
717 SubspaceNotificationStream<BlockImportingNotification<Block>>,
718 pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
720 pub archived_segment_notification_stream:
722 SubspaceNotificationStream<ArchivedSegmentNotification>,
723 pub network_starter: NetworkStarter,
725 pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
727}
728
729type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
730
731pub async fn new_full<PosTable, RuntimeApi>(
733 mut config: SubspaceConfiguration,
734 partial_components: PartialComponents<RuntimeApi>,
735 prometheus_registry: Option<&mut Registry>,
736 enable_rpc_extensions: bool,
737 block_proposal_slot_portion: SlotProportion,
738 consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
739) -> Result<FullNode<RuntimeApi>, Error>
740where
741 PosTable: Table,
742 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
743 RuntimeApi::RuntimeApi: ApiExt<Block>
744 + Metadata<Block>
745 + AccountNonceApi<Block, AccountId, Nonce>
746 + BlockBuilder<Block>
747 + OffchainWorkerApi<Block>
748 + SessionKeys<Block>
749 + TaggedTransactionQueue<Block>
750 + TransactionPaymentApi<Block, Balance>
751 + SubspaceApi<Block, PublicKey>
752 + DomainsApi<Block, DomainHeader>
753 + FraudProofApi<Block, DomainHeader>
754 + ObjectsApi<Block>
755 + MmrApi<Block, Hash, BlockNumber>
756 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
757{
758 let PartialComponents {
759 client,
760 backend,
761 mut task_manager,
762 import_queue,
763 keystore_container,
764 select_chain,
765 transaction_pool,
766 other,
767 } = partial_components;
768 let OtherPartialComponents {
769 block_import,
770 subspace_link,
771 segment_headers_store,
772 pot_verifier,
773 sync_target_block_number,
774 mut telemetry,
775 } = other;
776
777 let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
778 let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
779 SubspaceNetworking::Reuse {
780 node,
781 bootstrap_nodes,
782 piece_getter,
783 } => (node, bootstrap_nodes, piece_getter),
784 SubspaceNetworking::Create { config: dsn_config } => {
785 let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
786
787 debug!(
788 chain_type=?config.base.chain_spec.chain_type(),
789 genesis_hash=%hex::encode(client.chain_info().genesis_hash),
790 "Setting DSN protocol version..."
791 );
792
793 let out_connections = dsn_config.max_out_connections;
794 let (node, mut node_runner) = create_dsn_instance(
795 dsn_protocol_version,
796 dsn_config.clone(),
797 prometheus_registry,
798 )?;
799
800 info!("Subspace networking initialized: Node ID is {}", node.id());
801
802 node.on_new_listener(Arc::new({
803 let node = node.clone();
804
805 move |address| {
806 info!(
807 "DSN listening on {}",
808 address.clone().with(Protocol::P2p(node.id()))
809 );
810 }
811 }))
812 .detach();
813
814 task_manager
815 .spawn_essential_handle()
816 .spawn_essential_blocking(
817 "node-runner",
818 Some("subspace-networking"),
819 Box::pin(
820 async move {
821 node_runner.run().await;
822 }
823 .in_current_span(),
824 ),
825 );
826
827 let piece_provider = PieceProvider::new(
828 node.clone(),
829 SegmentCommitmentPieceValidator::new(
830 node.clone(),
831 subspace_link.kzg().clone(),
832 segment_headers_store.clone(),
833 ),
834 Arc::new(Semaphore::new(
835 out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
836 )),
837 );
838
839 (
840 node,
841 dsn_config.bootstrap_nodes,
842 Arc::new(DsnPieceGetter::new(piece_provider)) as _,
843 )
844 }
845 };
846
847 let dsn_bootstrap_nodes = {
848 if bootstrap_nodes.is_empty() {
851 let (node_address_sender, node_address_receiver) = oneshot::channel();
852 let _handler = node.on_new_listener(Arc::new({
853 let node_address_sender = Mutex::new(Some(node_address_sender));
854
855 move |address| {
856 if matches!(address.iter().next(), Some(Protocol::Ip4(_)))
857 && let Some(node_address_sender) = node_address_sender.lock().take()
858 && let Err(err) = node_address_sender.send(address.clone())
859 {
860 debug!(?err, "Couldn't send a node address to the channel.");
861 }
862 }
863 }));
864
865 let mut node_listeners = node.listeners();
866
867 if node_listeners.is_empty() {
868 let Ok(listener) = node_address_receiver.await else {
869 return Err(Error::Other(
870 "Oneshot receiver dropped before DSN node listener was ready"
871 .to_string()
872 .into(),
873 ));
874 };
875
876 node_listeners = vec![listener];
877 }
878
879 node_listeners.iter_mut().for_each(|multiaddr| {
880 multiaddr.push(Protocol::P2p(node.id()));
881 });
882
883 node_listeners
884 } else {
885 bootstrap_nodes
886 }
887 };
888
889 let substrate_prometheus_registry = config
890 .base
891 .prometheus_config
892 .as_ref()
893 .map(|prometheus_config| prometheus_config.registry.clone());
894 let import_queue_service1 = import_queue.service();
895 let import_queue_service2 = import_queue.service();
896 let network_wrapper = Arc::new(NetworkWrapper::default());
897 let block_relay = build_consensus_relay(
898 network_wrapper.clone(),
899 client.clone(),
900 transaction_pool.clone(),
901 substrate_prometheus_registry.as_ref(),
902 )
903 .map_err(Error::BlockRelay)?;
904 let mut net_config = sc_network::config::FullNetworkConfiguration::new(
905 &config.base.network,
906 substrate_prometheus_registry.clone(),
907 );
908 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
909 xdm_gossip_peers_set_config();
910 net_config.add_notification_protocol(xdm_gossip_notification_config);
911 let (pot_gossip_notification_config, pot_gossip_notification_service) =
912 pot_gossip_peers_set_config();
913 net_config.add_notification_protocol(pot_gossip_notification_config);
914 let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
915
916 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
917 + net_config
918 .network_config
919 .default_peers_set
920 .reserved_nodes
921 .len();
922
923 let protocol_id = config.base.protocol_id();
924 let fork_id = config.base.chain_spec.fork_id();
925
926 let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
928 NetworkWorker<Block, BlockHashFor<Block>>,
929 >(fork_id, client.clone(), num_peer_hint);
930 task_manager.spawn_handle().spawn(
931 "domain-block-er-request-handler",
932 Some("networking"),
933 handler.run(),
934 );
935 net_config.add_request_response_protocol(protocol_config);
936
937 if let Some(offchain_storage) = backend.offchain_storage() {
938 let (handler, protocol_config) =
940 MmrRequestHandler::new::<NetworkWorker<Block, BlockHashFor<Block>>>(
941 &config.base.protocol_id(),
942 fork_id,
943 client.clone(),
944 num_peer_hint,
945 offchain_storage,
946 );
947 task_manager
948 .spawn_handle()
949 .spawn("mmr-request-handler", Some("networking"), handler.run());
950
951 net_config.add_request_response_protocol(protocol_config);
952 }
953
954 let network_service_provider = NetworkServiceProvider::new();
955 let network_service_handle = network_service_provider.handle();
956 let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = {
957 let spawn_handle = task_manager.spawn_handle();
958 let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
959
960 let block_downloader = {
962 let BlockRelayParams {
963 mut server,
964 downloader,
965 request_response_config,
966 } = block_relay;
967 net_config.add_request_response_protocol(request_response_config);
968
969 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
970 server.run().await;
971 });
972
973 downloader
974 };
975
976 let syncing_strategy = build_polkadot_syncing_strategy(
977 protocol_id.clone(),
978 fork_id,
979 &mut net_config,
980 None,
981 block_downloader,
982 client.clone(),
983 &spawn_handle,
984 substrate_prometheus_registry.as_ref(),
985 )?;
986
987 let (syncing_engine, sync_service, block_announce_config) =
988 SyncingEngine::new::<NetworkWorker<_, _>>(
989 Roles::from(&config.base.role),
990 Arc::clone(&client),
991 substrate_prometheus_registry.as_ref(),
992 metrics.clone(),
993 &net_config,
994 protocol_id.clone(),
995 fork_id,
996 Box::new(DefaultBlockAnnounceValidator),
997 syncing_strategy,
998 network_service_provider.handle(),
999 import_queue.service(),
1000 net_config.peer_store_handle(),
1001 config.base.network.force_synced,
1002 )
1003 .map_err(sc_service::Error::from)?;
1004
1005 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1006
1007 build_network_advanced(BuildNetworkAdvancedParams {
1008 role: config.base.role,
1009 protocol_id,
1010 fork_id,
1011 ipfs_server: config.base.network.ipfs_server,
1012 announce_block: config.base.announce_block,
1013 net_config,
1014 client: Arc::clone(&client),
1015 transaction_pool: Arc::clone(&transaction_pool),
1016 spawn_handle,
1017 import_queue,
1018 sync_service,
1019 block_announce_config,
1020 network_service_provider,
1021 metrics_registry: substrate_prometheus_registry.as_ref(),
1022 metrics,
1023 })?
1024 };
1025
1026 task_manager.spawn_handle().spawn(
1027 "sync-target-follower",
1028 None,
1029 Box::pin({
1030 let sync_service = sync_service.clone();
1031 let sync_target_block_number = Arc::clone(&sync_target_block_number);
1032
1033 async move {
1034 loop {
1035 let best_seen_block = sync_service
1036 .status()
1037 .await
1038 .map(|status| status.best_seen_block.unwrap_or_default())
1039 .unwrap_or_default();
1040 sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1041
1042 tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1043 }
1044 }
1045 }),
1046 );
1047
1048 let sync_oracle = SubspaceSyncOracle::new(
1049 config.base.force_authoring,
1050 Arc::clone(&pause_sync),
1051 sync_service.clone(),
1052 );
1053
1054 let subspace_archiver = tokio::task::block_in_place(|| {
1055 create_subspace_archiver(
1056 segment_headers_store.clone(),
1057 subspace_link.clone(),
1058 client.clone(),
1059 sync_oracle.clone(),
1060 telemetry.as_ref().map(|telemetry| telemetry.handle()),
1061 config.create_object_mappings,
1062 )
1063 })
1064 .map_err(ServiceError::Client)?;
1065
1066 task_manager
1067 .spawn_essential_handle()
1068 .spawn_essential_blocking(
1069 "subspace-archiver",
1070 None,
1071 Box::pin(async move {
1072 if let Err(error) = subspace_archiver.await {
1073 error!(%error, "Archiver exited with error");
1074 }
1075 }),
1076 );
1077
1078 network_wrapper.set(network_service.clone());
1079
1080 if !config.base.network.force_synced {
1081 pause_sync.store(true, Ordering::Release);
1083 }
1084
1085 let snap_sync_task = snap_sync(
1086 segment_headers_store.clone(),
1087 node.clone(),
1088 fork_id.map(|fork_id| fork_id.to_string()),
1089 Arc::clone(&client),
1090 import_queue_service1,
1091 pause_sync.clone(),
1092 piece_getter.clone(),
1093 sync_service.clone(),
1094 network_service_handle,
1095 subspace_link.erasure_coding().clone(),
1096 consensus_snap_sync_target_block_receiver,
1097 backend.offchain_storage(),
1098 network_service.clone(),
1099 );
1100
1101 let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1102 segment_headers_store.clone(),
1103 Arc::clone(&network_service),
1104 node.clone(),
1105 Arc::clone(&client),
1106 import_queue_service2,
1107 sync_service.clone(),
1108 sync_target_block_number,
1109 pause_sync,
1110 piece_getter,
1111 subspace_link.erasure_coding().clone(),
1112 );
1113 task_manager
1114 .spawn_handle()
1115 .spawn("observer", Some("sync-from-dsn"), observer);
1116 task_manager
1117 .spawn_essential_handle()
1118 .spawn_essential_blocking(
1119 "worker",
1120 Some("sync-from-dsn"),
1121 Box::pin(async move {
1122 if config.sync == ChainSyncMode::Snap
1124 && let Err(error) = snap_sync_task.in_current_span().await
1125 {
1126 error!(%error, "Snap sync exited with a fatal error");
1127 return;
1128 }
1129
1130 if let Err(error) = worker.await {
1131 error!(%error, "Sync from DSN exited with an error");
1132 }
1133 }),
1134 );
1135
1136 if let Some(registry) = substrate_prometheus_registry.as_ref() {
1137 match NodeMetrics::new(
1138 client.clone(),
1139 client.every_import_notification_stream(),
1140 registry,
1141 ) {
1142 Ok(node_metrics) => {
1143 task_manager.spawn_handle().spawn(
1144 "node_metrics",
1145 None,
1146 Box::pin(async move {
1147 node_metrics.run().await;
1148 }),
1149 );
1150 }
1151 Err(err) => {
1152 error!("Failed to initialize node metrics: {err:?}");
1153 }
1154 }
1155 }
1156
1157 let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1158
1159 if config.base.offchain_worker.enabled {
1160 let offchain_workers =
1161 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1162 runtime_api_provider: client.clone(),
1163 is_validator: config.base.role.is_authority(),
1164 keystore: Some(keystore_container.keystore()),
1165 offchain_db: backend.offchain_storage(),
1166 transaction_pool: Some(offchain_tx_pool_factory.clone()),
1167 network_provider: Arc::new(network_service.clone()),
1168 enable_http_requests: true,
1169 custom_extensions: |_| vec![],
1170 })?;
1171 task_manager.spawn_handle().spawn(
1172 "offchain-workers-runner",
1173 "offchain-worker",
1174 offchain_workers
1175 .run(client.clone(), task_manager.spawn_handle())
1176 .boxed(),
1177 );
1178 }
1179
1180 if offchain_indexing_enabled {
1182 task_manager.spawn_essential_handle().spawn_blocking(
1183 "mmr-gadget",
1184 None,
1185 mmr_gadget::MmrGadget::start(
1186 client.clone(),
1187 backend.clone(),
1188 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1189 ),
1190 );
1191 }
1192
1193 let backoff_authoring_blocks: Option<()> = None;
1194
1195 let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1196 let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1197 let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1198 let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1199 let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1200
1201 let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1202 config.is_timekeeper,
1203 config.timekeeper_cpu_cores,
1204 client.clone(),
1205 pot_verifier.clone(),
1206 Arc::clone(&network_service),
1207 pot_gossip_notification_service,
1208 sync_service.clone(),
1209 sync_oracle.clone(),
1210 )
1211 .map_err(|error| Error::Other(error.into()))?;
1212
1213 let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1214
1215 task_manager
1216 .spawn_essential_handle()
1217 .spawn("pot-source", Some("pot"), pot_source_worker.run());
1218 task_manager
1219 .spawn_essential_handle()
1220 .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1221
1222 if config.base.role.is_authority() || config.force_new_slot_notifications {
1223 let proposer_factory = ProposerFactory::new(
1224 task_manager.spawn_handle(),
1225 client.clone(),
1226 transaction_pool.clone(),
1227 substrate_prometheus_registry.as_ref(),
1228 telemetry.as_ref().map(|x| x.handle()),
1229 );
1230
1231 let subspace_slot_worker =
1232 SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1233 client: client.clone(),
1234 env: proposer_factory,
1235 block_import,
1236 sync_oracle: sync_oracle.clone(),
1237 justification_sync_link: sync_service.clone(),
1238 force_authoring: config.base.force_authoring,
1239 backoff_authoring_blocks,
1240 subspace_link: subspace_link.clone(),
1241 segment_headers_store: segment_headers_store.clone(),
1242 block_proposal_slot_portion,
1243 max_block_proposal_slot_portion: None,
1244 telemetry: telemetry.as_ref().map(|x| x.handle()),
1245 offchain_tx_pool_factory,
1246 pot_verifier,
1247 });
1248
1249 let create_inherent_data_providers = {
1250 let client = client.clone();
1251 let segment_headers_store = segment_headers_store.clone();
1252
1253 move |parent_hash, ()| {
1254 let client = client.clone();
1255 let segment_headers_store = segment_headers_store.clone();
1256
1257 async move {
1258 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1259
1260 let parent_header = client
1261 .header(parent_hash)?
1262 .expect("Parent header must always exist when block is created; qed");
1263
1264 let parent_block_number = parent_header.number;
1265
1266 let subspace_inherents =
1267 sp_consensus_subspace::inherents::InherentDataProvider::new(
1268 segment_headers_store
1269 .segment_headers_for_block(parent_block_number + 1),
1270 );
1271
1272 Ok((timestamp, subspace_inherents))
1273 }
1274 }
1275 };
1276
1277 info!("🧑🌾 Starting Subspace Authorship worker");
1278 let slot_worker_task = sc_proof_of_time::start_slot_worker(
1279 subspace_link.chain_constants().slot_duration(),
1280 client.clone(),
1281 select_chain.clone(),
1282 subspace_slot_worker,
1283 sync_oracle.clone(),
1284 create_inherent_data_providers,
1285 pot_slot_info_stream,
1286 );
1287
1288 task_manager.spawn_essential_handle().spawn_blocking(
1291 "subspace-proposer",
1292 Some("block-authoring"),
1293 slot_worker_task,
1294 );
1295 }
1296
1297 config.base.prometheus_config.take();
1299
1300 let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1301 network: network_service.clone(),
1302 client: client.clone(),
1303 keystore: keystore_container.keystore(),
1304 task_manager: &mut task_manager,
1305 transaction_pool: transaction_pool.clone(),
1306 rpc_builder: if enable_rpc_extensions {
1307 let client = client.clone();
1308 let new_slot_notification_stream = new_slot_notification_stream.clone();
1309 let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1310 let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1311 let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1312 let transaction_pool = transaction_pool.clone();
1313 let backend = backend.clone();
1314
1315 Box::new(move |subscription_executor| {
1316 let deps = rpc::FullDeps {
1317 client: client.clone(),
1318 pool: transaction_pool.clone(),
1319 subscription_executor,
1320 new_slot_notification_stream: new_slot_notification_stream.clone(),
1321 reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1322 object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1323 archived_segment_notification_stream: archived_segment_notification_stream
1324 .clone(),
1325 dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1326 segment_headers_store: segment_headers_store.clone(),
1327 sync_oracle: sync_oracle.clone(),
1328 kzg: subspace_link.kzg().clone(),
1329 erasure_coding: subspace_link.erasure_coding().clone(),
1330 backend: backend.clone(),
1331 };
1332
1333 rpc::create_full(deps).map_err(Into::into)
1334 })
1335 } else {
1336 Box::new(|_| Ok(RpcModule::new(())))
1337 },
1338 backend: backend.clone(),
1339 system_rpc_tx,
1340 config: config.base,
1341 telemetry: telemetry.as_mut(),
1342 tx_handler_controller,
1343 sync_service: sync_service.clone(),
1344 })?;
1345
1346 Ok(NewFull {
1347 task_manager,
1348 client,
1349 select_chain,
1350 network_service,
1351 xdm_gossip_notification_service,
1352 sync_service,
1353 rpc_handlers,
1354 backend,
1355 pot_slot_info_stream: additional_pot_slot_info_stream,
1356 new_slot_notification_stream,
1357 reward_signing_notification_stream,
1358 block_importing_notification_stream,
1359 object_mapping_notification_stream,
1360 archived_segment_notification_stream,
1361 network_starter,
1362 transaction_pool,
1363 })
1364}
1365
1366fn confirmation_depth_storage_key() -> StorageKey {
1368 StorageKey(
1369 frame_support::storage::storage_prefix(
1370 "RuntimeConfigs".as_bytes(),
1373 "ConfirmationDepthK".as_bytes(),
1375 )
1376 .to_vec(),
1377 )
1378}
1379
1380fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1381 let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1382 let spec: serde_json::Value =
1383 serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1384 let encoded_confirmation_depth = hex::decode(
1385 spec.pointer(format!("/genesis/raw/top/{storage_key}").as_str())?
1386 .as_str()?
1387 .trim_start_matches("0x"),
1388 )
1389 .ok()?;
1390 u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1391}
1392
1393#[cfg(test)]
1394mod test {
1395 use static_assertions::const_assert_eq;
1396 use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1397 use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1398
1399 #[test]
1402 fn max_block_length_consistent() {
1403 const_assert_eq!(
1404 CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1405 ARCHIVER_MAX_BLOCK_LENGTH,
1406 );
1407 }
1408}