1#![feature(
3 impl_trait_in_assoc_type,
4 int_roundings,
5 type_alias_impl_trait,
6 type_changing_struct_update
7)]
8
9pub mod config;
10pub mod dsn;
11mod metrics;
12pub(crate) mod mmr;
13pub mod rpc;
14pub mod sync_from_dsn;
15mod task_spawner;
16mod utils;
17
18use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
19use crate::dsn::{DsnConfigurationError, create_dsn_instance};
20use crate::metrics::NodeMetrics;
21use crate::mmr::request_handler::MmrRequestHandler;
22use crate::sync_from_dsn::DsnPieceGetter;
23use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
24use crate::sync_from_dsn::snap_sync::snap_sync;
25use async_lock::Semaphore;
26use core::sync::atomic::{AtomicU32, Ordering};
27use cross_domain_message_gossip::xdm_gossip_peers_set_config;
28use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
29use frame_system_rpc_runtime_api::AccountNonceApi;
30use futures::FutureExt;
31use futures::channel::oneshot;
32use jsonrpsee::RpcModule;
33use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
34use parity_scale_codec::Decode;
35use parking_lot::Mutex;
36use prometheus_client::registry::Registry;
37use sc_basic_authorship::ProposerFactory;
38use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
39use sc_client_api::execution_extensions::ExtensionsFactory;
40use sc_client_api::{
41 AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
42};
43use sc_consensus::{
44 BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
45 DefaultImportQueue, ImportQueue, ImportResult,
46};
47use sc_consensus_slots::SlotProportion;
48use sc_consensus_subspace::SubspaceLink;
49use sc_consensus_subspace::archiver::{
50 ArchivedSegmentNotification, ObjectMappingNotification, SegmentHeadersStore,
51 create_subspace_archiver,
52};
53use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
54use sc_consensus_subspace::notification::SubspaceNotificationStream;
55use sc_consensus_subspace::slot_worker::{
56 NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
57 SubspaceSyncOracle,
58};
59use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
60use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
61use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
62use sc_network::service::traits::NetworkService;
63use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
64use sc_network_sync::block_relay_protocol::BlockRelayParams;
65use sc_network_sync::engine::SyncingEngine;
66use sc_network_sync::service::network::NetworkServiceProvider;
67use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
68use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
69use sc_proof_of_time::verifier::PotVerifier;
70use sc_service::error::Error as ServiceError;
71use sc_service::{
72 BuildNetworkAdvancedParams, Configuration, SpawnTasksParams, TaskManager,
73 build_network_advanced, build_polkadot_syncing_strategy,
74};
75use sc_subspace_block_relay::{
76 BlockRelayConfigurationError, NetworkWrapper, build_consensus_relay,
77};
78use sc_telemetry::{Telemetry, TelemetryWorker};
79use sc_transaction_pool::TransactionPoolHandle;
80use sc_transaction_pool_api::OffchainTransactionPoolFactory;
81use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
82use sp_block_builder::BlockBuilder;
83use sp_blockchain::HeaderMetadata;
84use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
85use sp_consensus_slots::Slot;
86use sp_consensus_subspace::digests::extract_pre_digest;
87use sp_consensus_subspace::{
88 KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
89};
90use sp_core::H256;
91use sp_core::offchain::OffchainDbExt;
92use sp_core::offchain::storage::OffchainDb;
93use sp_core::traits::SpawnEssentialNamed;
94use sp_domains::storage::StorageKey;
95use sp_domains::{BundleProducerElectionApi, DomainsApi};
96use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
97use sp_externalities::Extensions;
98use sp_messenger::MessengerApi;
99use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
100use sp_mmr_primitives::MmrApi;
101use sp_objects::ObjectsApi;
102use sp_offchain::OffchainWorkerApi;
103use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
104use sp_session::SessionKeys;
105use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
106use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
107use static_assertions::const_assert;
108use std::marker::PhantomData;
109use std::num::NonZeroUsize;
110use std::sync::Arc;
111use std::time::Duration;
112use subspace_core_primitives::pieces::Record;
113use subspace_core_primitives::pot::PotSeed;
114use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
115use subspace_erasure_coding::ErasureCoding;
116use subspace_kzg::Kzg;
117use subspace_networking::libp2p::multiaddr::Protocol;
118use subspace_networking::utils::piece_provider::PieceProvider;
119use subspace_proof_of_space::Table;
120use subspace_runtime_primitives::opaque::Block;
121use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, Hash, Nonce};
122use tokio::sync::broadcast;
123use tracing::{Instrument, debug, error, info};
124pub use utils::wait_for_block_import;
125
126const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
129
130const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
133const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
134const PIECE_PROVIDER_MULTIPLIER: usize = 10;
136
137#[derive(thiserror::Error, Debug)]
139pub enum Error {
140 #[error(transparent)]
142 Io(#[from] std::io::Error),
143
144 #[error(transparent)]
146 AddrFormatInvalid(#[from] std::net::AddrParseError),
147
148 #[error(transparent)]
150 Sub(#[from] sc_service::Error),
151
152 #[error(transparent)]
154 Consensus(#[from] sp_consensus::Error),
155
156 #[error(transparent)]
158 Telemetry(#[from] sc_telemetry::Error),
159
160 #[error(transparent)]
162 SubspaceDsn(#[from] DsnConfigurationError),
163
164 #[error(transparent)]
166 BlockRelay(#[from] BlockRelayConfigurationError),
167
168 #[error(transparent)]
170 Other(Box<dyn std::error::Error + Send + Sync>),
171}
172
173#[derive(Clone)]
175struct BlockImportWrapper<BI>(BI);
176
177#[async_trait::async_trait]
178impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
179where
180 Block: BlockT,
181 BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
182 + Send
183 + Sync,
184{
185 type Error = sp_consensus::Error;
186
187 async fn check_block(
188 &self,
189 block: BlockCheckParams<Block>,
190 ) -> Result<ImportResult, Self::Error> {
191 self.0
192 .check_block(block)
193 .await
194 .map_err(|error| sp_consensus::Error::Other(error.into()))
195 }
196
197 async fn import_block(
198 &self,
199 block: BlockImportParams<Block>,
200 ) -> Result<ImportResult, Self::Error> {
201 self.0
202 .import_block(block)
203 .await
204 .map_err(|error| sp_consensus::Error::Other(error.into()))
205 }
206}
207
208#[cfg(not(feature = "runtime-benchmarks"))]
210pub type HostFunctions = (
211 sp_io::SubstrateHostFunctions,
212 sp_consensus_subspace::consensus::HostFunctions,
213 sp_domains_fraud_proof::HostFunctions,
214 sp_subspace_mmr::HostFunctions,
215 sp_messenger_host_functions::HostFunctions,
216);
217
218#[cfg(feature = "runtime-benchmarks")]
220pub type HostFunctions = (
221 sp_io::SubstrateHostFunctions,
222 frame_benchmarking::benchmarking::HostFunctions,
223 sp_consensus_subspace::consensus::HostFunctions,
224 sp_domains_fraud_proof::HostFunctions,
225 sp_subspace_mmr::HostFunctions,
226 sp_messenger_host_functions::HostFunctions,
227);
228
229pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
231
232pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
234
235pub type FullBackend = sc_service::TFullBackend<Block>;
236pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
237
238struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
239 kzg: Kzg,
240 client: Arc<Client>,
241 backend: Arc<FullBackend>,
242 pot_verifier: PotVerifier,
243 domains_executor: Arc<sc_domains::RuntimeExecutor>,
244 confirmation_depth_k: BlockNumber,
245 _pos_table: PhantomData<(PosTable, DomainBlock)>,
246}
247
248impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
249 for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
250where
251 PosTable: Table,
252 Block: BlockT,
253 Block::Hash: From<H256> + Into<H256>,
254 DomainBlock: BlockT,
255 DomainBlock::Hash: Into<H256> + From<H256>,
256 Client: BlockBackend<Block>
257 + HeaderBackend<Block>
258 + ProvideRuntimeApi<Block>
259 + Send
260 + Sync
261 + 'static,
262 Client::Api: SubspaceApi<Block, PublicKey>
263 + DomainsApi<Block, DomainBlock::Header>
264 + BundleProducerElectionApi<Block, Balance>
265 + MmrApi<Block, H256, NumberFor<Block>>
266 + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
267{
268 fn extensions_for(
269 &self,
270 _block_hash: Block::Hash,
271 _block_number: NumberFor<Block>,
272 ) -> Extensions {
273 let confirmation_depth_k = self.confirmation_depth_k;
274 let mut exts = Extensions::new();
275 exts.register(KzgExtension::new(self.kzg.clone()));
276 exts.register(PosExtension::new::<PosTable>());
277 exts.register(PotExtension::new({
278 let client = Arc::clone(&self.client);
279 let pot_verifier = self.pot_verifier.clone();
280
281 Box::new(
282 move |parent_hash, slot, proof_of_time, quick_verification| {
283 let parent_hash = {
284 let mut converted_parent_hash = Block::Hash::default();
285 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
286 converted_parent_hash
287 };
288
289 let parent_header = match client.header(parent_hash) {
290 Ok(Some(parent_header)) => parent_header,
291 Ok(None) => {
292 if quick_verification {
293 error!(
294 %parent_hash,
295 "Header not found during proof of time verification"
296 );
297
298 return false;
299 } else {
300 debug!(
301 %parent_hash,
302 "Header not found during proof of time verification"
303 );
304
305 return true;
308 }
309 }
310 Err(error) => {
311 error!(
312 %error,
313 %parent_hash,
314 "Failed to retrieve header during proof of time verification"
315 );
316
317 return false;
318 }
319 };
320
321 let parent_pre_digest = match extract_pre_digest(&parent_header) {
322 Ok(parent_pre_digest) => parent_pre_digest,
323 Err(error) => {
324 error!(
325 %error,
326 %parent_hash,
327 parent_number = %parent_header.number(),
328 "Failed to extract pre-digest from parent header during proof of \
329 time verification, this must never happen"
330 );
331
332 return false;
333 }
334 };
335
336 let parent_slot = parent_pre_digest.slot();
337 if slot <= *parent_slot {
338 return false;
339 }
340
341 let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
342 Ok(pot_parameters) => pot_parameters,
343 Err(error) => {
344 debug!(
345 %error,
346 %parent_hash,
347 parent_number = %parent_header.number(),
348 "Failed to retrieve proof of time parameters during proof of time \
349 verification"
350 );
351
352 return false;
353 }
354 };
355
356 let pot_input = if parent_header.number().is_zero() {
357 PotNextSlotInput {
358 slot: parent_slot + Slot::from(1),
359 slot_iterations: pot_parameters.slot_iterations(),
360 seed: pot_verifier.genesis_seed(),
361 }
362 } else {
363 let pot_info = parent_pre_digest.pot_info();
364
365 PotNextSlotInput::derive(
366 pot_parameters.slot_iterations(),
367 parent_slot,
368 pot_info.proof_of_time(),
369 &pot_parameters.next_parameters_change(),
370 )
371 };
372
373 if quick_verification {
377 pot_verifier.try_is_output_valid(
378 pot_input,
379 Slot::from(slot) - parent_slot,
380 proof_of_time,
381 pot_parameters.next_parameters_change(),
382 )
383 } else {
384 pot_verifier.is_output_valid(
385 pot_input,
386 Slot::from(slot) - parent_slot,
387 proof_of_time,
388 pot_parameters.next_parameters_change(),
389 )
390 }
391 },
392 )
393 }));
394
395 exts.register(FraudProofExtension::new(Arc::new(
396 FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
397 self.client.clone(),
398 self.domains_executor.clone(),
399 move |client, executor| {
400 let extension_factory =
401 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
402 client,
403 executor,
404 confirmation_depth_k,
405 );
406 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
407 },
408 ),
409 )));
410
411 exts.register(SubspaceMmrExtension::new(Arc::new(
412 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
413 self.client.clone(),
414 confirmation_depth_k,
415 ),
416 )));
417
418 exts.register(MessengerExtension::new(Arc::new(
419 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
420 self.client.clone(),
421 self.domains_executor.clone(),
422 ),
423 )));
424
425 if let Some(offchain_storage) = self.backend.offchain_storage() {
428 let offchain_db = OffchainDb::new(offchain_storage);
429 exts.register(OffchainDbExt::new(offchain_db));
430 }
431
432 exts
433 }
434}
435
436pub struct OtherPartialComponents<RuntimeApi>
438where
439 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
440{
441 pub block_import: BoxBlockImport<Block>,
443 pub subspace_link: SubspaceLink<Block>,
445 pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
447 pub pot_verifier: PotVerifier,
449 pub sync_target_block_number: Arc<AtomicU32>,
451 pub telemetry: Option<Telemetry>,
453}
454
455type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
456 FullClient<RuntimeApi>,
457 FullBackend,
458 FullSelectChain,
459 DefaultImportQueue<Block>,
460 TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
461 OtherPartialComponents<RuntimeApi>,
462>;
463
464#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
466pub fn new_partial<PosTable, RuntimeApi>(
467 config: &Configuration,
470 snap_sync: bool,
472 pot_external_entropy: &[u8],
473) -> Result<PartialComponents<RuntimeApi>, ServiceError>
474where
475 PosTable: Table,
476 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
477 RuntimeApi::RuntimeApi: ApiExt<Block>
478 + Metadata<Block>
479 + BlockBuilder<Block>
480 + OffchainWorkerApi<Block>
481 + SessionKeys<Block>
482 + TaggedTransactionQueue<Block>
483 + SubspaceApi<Block, PublicKey>
484 + DomainsApi<Block, DomainHeader>
485 + FraudProofApi<Block, DomainHeader>
486 + BundleProducerElectionApi<Block, Balance>
487 + ObjectsApi<Block>
488 + MmrApi<Block, H256, NumberFor<Block>>
489 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
490{
491 let telemetry = config
492 .telemetry_endpoints
493 .clone()
494 .filter(|x| !x.is_empty())
495 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
496 let worker = TelemetryWorker::new(16)?;
497 let telemetry = worker.handle().new_telemetry(endpoints);
498 Ok((worker, telemetry))
499 })
500 .transpose()?;
501
502 let executor = sc_service::new_wasm_executor(&config.executor);
503 let domains_executor = sc_service::new_wasm_executor(&config.executor);
504
505 let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
506 ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
507 )?;
508
509 let backend = Arc::new(sc_client_db::Backend::new(
510 config.db_config(),
511 confirmation_depth_k.into(),
512 )?);
513
514 let genesis_block_builder = GenesisBlockBuilder::new(
515 config.chain_spec.as_storage_builder(),
516 !snap_sync,
517 backend.clone(),
518 executor.clone(),
519 )?;
520
521 let (client, backend, keystore_container, task_manager) =
522 sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
523 config,
524 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
525 executor.clone(),
526 backend,
527 genesis_block_builder,
528 false,
529 )?;
530
531 let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
533 rayon::join(Kzg::new, || {
534 ErasureCoding::new(
535 NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
536 .expect("Not zero; qed"),
537 )
538 .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
539 })
540 });
541 let erasure_coding = maybe_erasure_coding?;
542
543 let client = Arc::new(client);
544 let client_info = client.info();
545 let chain_constants = client
546 .runtime_api()
547 .chain_constants(client_info.best_hash)
548 .map_err(|error| ServiceError::Application(error.into()))?;
549
550 let pot_verifier = PotVerifier::new(
551 PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
552 POT_VERIFIER_CACHE_SIZE,
553 );
554
555 assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
557
558 client
559 .execution_extensions()
560 .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
561 kzg: kzg.clone(),
562 client: Arc::clone(&client),
563 pot_verifier: pot_verifier.clone(),
564 domains_executor: Arc::new(domains_executor),
565 backend: backend.clone(),
566 confirmation_depth_k: chain_constants.confirmation_depth_k(),
567 _pos_table: PhantomData,
568 });
569
570 let telemetry = telemetry.map(|(worker, telemetry)| {
571 task_manager
572 .spawn_handle()
573 .spawn("telemetry", None, worker.run());
574 telemetry
575 });
576
577 let select_chain = sc_consensus::LongestChain::new(backend.clone());
578
579 let segment_headers_store = tokio::task::block_in_place(|| {
580 SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
581 })
582 .map_err(|error| ServiceError::Application(error.into()))?;
583
584 let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
585 let segment_headers_store = segment_headers_store.clone();
586
587 let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
588 client.clone(),
589 client.clone(),
590 subspace_link.clone(),
591 {
592 let client = client.clone();
593 let segment_headers_store = segment_headers_store.clone();
594
595 move |parent_hash, ()| {
596 let client = client.clone();
597 let segment_headers_store = segment_headers_store.clone();
598
599 async move {
600 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
601
602 let parent_header = client
603 .header(parent_hash)?
604 .expect("Parent header must always exist when block is created; qed");
605
606 let parent_block_number = parent_header.number;
607
608 let subspace_inherents =
609 sp_consensus_subspace::inherents::InherentDataProvider::new(
610 segment_headers_store
611 .segment_headers_for_block(parent_block_number + 1),
612 );
613
614 Ok((timestamp, subspace_inherents))
615 }
616 }
617 },
618 segment_headers_store.clone(),
619 pot_verifier.clone(),
620 );
621
622 let sync_target_block_number = Arc::new(AtomicU32::new(0));
623 let transaction_pool = Arc::from(
624 sc_transaction_pool::Builder::new(
625 task_manager.spawn_essential_handle(),
626 client.clone(),
627 config.role.is_authority().into(),
628 )
629 .with_options(config.transaction_pool.clone())
630 .with_prometheus(config.prometheus_registry())
631 .build(),
632 );
633
634 let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
635 client: client.clone(),
636 chain_constants,
637 kzg,
638 telemetry: telemetry.as_ref().map(|x| x.handle()),
639 reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
640 sync_target_block_number: Arc::clone(&sync_target_block_number),
641 is_authoring_blocks: config.role.is_authority(),
642 pot_verifier: pot_verifier.clone(),
643 });
644
645 let import_queue = BasicQueue::new(
646 verifier,
647 Box::new(BlockImportWrapper(block_import.clone())),
648 None,
649 &task_manager.spawn_essential_handle(),
650 config.prometheus_registry(),
651 );
652
653 let other = OtherPartialComponents {
654 block_import: Box::new(BlockImportWrapper(block_import.clone())),
655 subspace_link,
656 segment_headers_store,
657 pot_verifier,
658 sync_target_block_number,
659 telemetry,
660 };
661
662 Ok(PartialComponents {
663 client,
664 backend,
665 task_manager,
666 import_queue,
667 keystore_container,
668 select_chain,
669 transaction_pool,
670 other,
671 })
672}
673
674pub struct NewFull<Client>
676where
677 Client: ProvideRuntimeApi<Block>
678 + AuxStore
679 + BlockBackend<Block>
680 + BlockIdTo<Block>
681 + HeaderBackend<Block>
682 + HeaderMetadata<Block, Error = sp_blockchain::Error>
683 + 'static,
684 Client::Api: TaggedTransactionQueue<Block>
685 + DomainsApi<Block, DomainHeader>
686 + FraudProofApi<Block, DomainHeader>
687 + SubspaceApi<Block, PublicKey>
688 + MmrApi<Block, H256, NumberFor<Block>>
689 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
690{
691 pub task_manager: TaskManager,
693 pub client: Arc<Client>,
695 pub select_chain: FullSelectChain,
697 pub network_service: Arc<dyn NetworkService + Send + Sync>,
699 pub xdm_gossip_notification_service: Box<dyn NotificationService>,
701 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
703 pub rpc_handlers: sc_service::RpcHandlers,
705 pub backend: Arc<FullBackend>,
707 pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
709 pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
712 pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
714 pub block_importing_notification_stream:
716 SubspaceNotificationStream<BlockImportingNotification<Block>>,
717 pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
719 pub archived_segment_notification_stream:
721 SubspaceNotificationStream<ArchivedSegmentNotification>,
722 pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
724}
725
726type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
727
728pub async fn new_full<PosTable, RuntimeApi>(
730 mut config: SubspaceConfiguration,
731 partial_components: PartialComponents<RuntimeApi>,
732 prometheus_registry: Option<&mut Registry>,
733 enable_rpc_extensions: bool,
734 block_proposal_slot_portion: SlotProportion,
735 consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
736) -> Result<FullNode<RuntimeApi>, Error>
737where
738 PosTable: Table,
739 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
740 RuntimeApi::RuntimeApi: ApiExt<Block>
741 + Metadata<Block>
742 + AccountNonceApi<Block, AccountId, Nonce>
743 + BlockBuilder<Block>
744 + OffchainWorkerApi<Block>
745 + SessionKeys<Block>
746 + TaggedTransactionQueue<Block>
747 + TransactionPaymentApi<Block, Balance>
748 + SubspaceApi<Block, PublicKey>
749 + DomainsApi<Block, DomainHeader>
750 + FraudProofApi<Block, DomainHeader>
751 + ObjectsApi<Block>
752 + MmrApi<Block, Hash, BlockNumber>
753 + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
754{
755 let PartialComponents {
756 client,
757 backend,
758 mut task_manager,
759 import_queue,
760 keystore_container,
761 select_chain,
762 transaction_pool,
763 other,
764 } = partial_components;
765 let OtherPartialComponents {
766 block_import,
767 subspace_link,
768 segment_headers_store,
769 pot_verifier,
770 sync_target_block_number,
771 mut telemetry,
772 } = other;
773
774 let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
775 let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
776 SubspaceNetworking::Reuse {
777 node,
778 bootstrap_nodes,
779 piece_getter,
780 } => (node, bootstrap_nodes, piece_getter),
781 SubspaceNetworking::Create { config: dsn_config } => {
782 let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
783
784 debug!(
785 chain_type=?config.base.chain_spec.chain_type(),
786 genesis_hash=%hex::encode(client.chain_info().genesis_hash),
787 "Setting DSN protocol version..."
788 );
789
790 let out_connections = dsn_config.max_out_connections;
791 let (node, mut node_runner) = create_dsn_instance(
792 dsn_protocol_version,
793 dsn_config.clone(),
794 prometheus_registry,
795 )?;
796
797 info!("Subspace networking initialized: Node ID is {}", node.id());
798
799 node.on_new_listener(Arc::new({
800 let node = node.clone();
801
802 move |address| {
803 info!(
804 "DSN listening on {}",
805 address.clone().with(Protocol::P2p(node.id()))
806 );
807 }
808 }))
809 .detach();
810
811 task_manager
812 .spawn_essential_handle()
813 .spawn_essential_blocking(
814 "node-runner",
815 Some("subspace-networking"),
816 Box::pin(
817 async move {
818 node_runner.run().await;
819 }
820 .in_current_span(),
821 ),
822 );
823
824 let piece_provider = PieceProvider::new(
825 node.clone(),
826 SegmentCommitmentPieceValidator::new(
827 node.clone(),
828 subspace_link.kzg().clone(),
829 segment_headers_store.clone(),
830 ),
831 Arc::new(Semaphore::new(
832 out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
833 )),
834 );
835
836 (
837 node,
838 dsn_config.bootstrap_nodes,
839 Arc::new(DsnPieceGetter::new(piece_provider)) as _,
840 )
841 }
842 };
843
844 let dsn_bootstrap_nodes = {
845 if bootstrap_nodes.is_empty() {
848 let (node_address_sender, node_address_receiver) = oneshot::channel();
849 let _handler = node.on_new_listener(Arc::new({
850 let node_address_sender = Mutex::new(Some(node_address_sender));
851
852 move |address| {
853 if matches!(address.iter().next(), Some(Protocol::Ip4(_)))
854 && let Some(node_address_sender) = node_address_sender.lock().take()
855 && let Err(err) = node_address_sender.send(address.clone())
856 {
857 debug!(?err, "Couldn't send a node address to the channel.");
858 }
859 }
860 }));
861
862 let mut node_listeners = node.listeners();
863
864 if node_listeners.is_empty() {
865 let Ok(listener) = node_address_receiver.await else {
866 return Err(Error::Other(
867 "Oneshot receiver dropped before DSN node listener was ready"
868 .to_string()
869 .into(),
870 ));
871 };
872
873 node_listeners = vec![listener];
874 }
875
876 node_listeners.iter_mut().for_each(|multiaddr| {
877 multiaddr.push(Protocol::P2p(node.id()));
878 });
879
880 node_listeners
881 } else {
882 bootstrap_nodes
883 }
884 };
885
886 let substrate_prometheus_registry = config
887 .base
888 .prometheus_config
889 .as_ref()
890 .map(|prometheus_config| prometheus_config.registry.clone());
891 let import_queue_service1 = import_queue.service();
892 let import_queue_service2 = import_queue.service();
893 let network_wrapper = Arc::new(NetworkWrapper::default());
894 let block_relay = build_consensus_relay(
895 network_wrapper.clone(),
896 client.clone(),
897 transaction_pool.clone(),
898 substrate_prometheus_registry.as_ref(),
899 )
900 .map_err(Error::BlockRelay)?;
901 let mut net_config = sc_network::config::FullNetworkConfiguration::new(
902 &config.base.network,
903 substrate_prometheus_registry.clone(),
904 );
905 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
906 xdm_gossip_peers_set_config();
907 net_config.add_notification_protocol(xdm_gossip_notification_config);
908 let (pot_gossip_notification_config, pot_gossip_notification_service) =
909 pot_gossip_peers_set_config();
910 net_config.add_notification_protocol(pot_gossip_notification_config);
911 let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
912
913 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
914 + net_config
915 .network_config
916 .default_peers_set
917 .reserved_nodes
918 .len();
919
920 let protocol_id = config.base.protocol_id();
921 let fork_id = config.base.chain_spec.fork_id();
922
923 let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
925 NetworkWorker<Block, BlockHashFor<Block>>,
926 >(fork_id, client.clone(), num_peer_hint);
927 task_manager.spawn_handle().spawn(
928 "domain-block-er-request-handler",
929 Some("networking"),
930 handler.run(),
931 );
932 net_config.add_request_response_protocol(protocol_config);
933
934 if let Some(offchain_storage) = backend.offchain_storage() {
935 let (handler, protocol_config) =
937 MmrRequestHandler::new::<NetworkWorker<Block, BlockHashFor<Block>>>(
938 &config.base.protocol_id(),
939 fork_id,
940 client.clone(),
941 num_peer_hint,
942 offchain_storage,
943 );
944 task_manager
945 .spawn_handle()
946 .spawn("mmr-request-handler", Some("networking"), handler.run());
947
948 net_config.add_request_response_protocol(protocol_config);
949 }
950
951 let network_service_provider = NetworkServiceProvider::new();
952 let network_service_handle = network_service_provider.handle();
953 let (network_service, system_rpc_tx, tx_handler_controller, sync_service) = {
954 let spawn_handle = task_manager.spawn_handle();
955 let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
956
957 let block_downloader = {
959 let BlockRelayParams {
960 mut server,
961 downloader,
962 request_response_config,
963 } = block_relay;
964 net_config.add_request_response_protocol(request_response_config);
965
966 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
967 server.run().await;
968 });
969
970 downloader
971 };
972
973 let syncing_strategy = build_polkadot_syncing_strategy(
974 protocol_id.clone(),
975 fork_id,
976 &mut net_config,
977 None,
978 block_downloader,
979 client.clone(),
980 &spawn_handle,
981 substrate_prometheus_registry.as_ref(),
982 )?;
983
984 let (syncing_engine, sync_service, block_announce_config) =
985 SyncingEngine::new::<NetworkWorker<_, _>>(
986 Roles::from(&config.base.role),
987 Arc::clone(&client),
988 substrate_prometheus_registry.as_ref(),
989 metrics.clone(),
990 &net_config,
991 protocol_id.clone(),
992 fork_id,
993 Box::new(DefaultBlockAnnounceValidator),
994 syncing_strategy,
995 network_service_provider.handle(),
996 import_queue.service(),
997 net_config.peer_store_handle(),
998 config.base.network.force_synced,
999 )
1000 .map_err(sc_service::Error::from)?;
1001
1002 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1003
1004 build_network_advanced(BuildNetworkAdvancedParams {
1005 role: config.base.role,
1006 protocol_id,
1007 fork_id,
1008 ipfs_server: config.base.network.ipfs_server,
1009 announce_block: config.base.announce_block,
1010 net_config,
1011 client: Arc::clone(&client),
1012 transaction_pool: Arc::clone(&transaction_pool),
1013 spawn_handle,
1014 import_queue,
1015 sync_service,
1016 block_announce_config,
1017 network_service_provider,
1018 metrics_registry: substrate_prometheus_registry.as_ref(),
1019 metrics,
1020 })?
1021 };
1022
1023 task_manager.spawn_handle().spawn(
1024 "sync-target-follower",
1025 None,
1026 Box::pin({
1027 let sync_service = sync_service.clone();
1028 let sync_target_block_number = Arc::clone(&sync_target_block_number);
1029
1030 async move {
1031 loop {
1032 let best_seen_block = sync_service
1033 .status()
1034 .await
1035 .map(|status| status.best_seen_block.unwrap_or_default())
1036 .unwrap_or_default();
1037 sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1038
1039 tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1040 }
1041 }
1042 }),
1043 );
1044
1045 let sync_oracle = SubspaceSyncOracle::new(
1046 config.base.force_authoring,
1047 Arc::clone(&pause_sync),
1048 sync_service.clone(),
1049 );
1050
1051 let subspace_archiver = tokio::task::block_in_place(|| {
1052 create_subspace_archiver(
1053 segment_headers_store.clone(),
1054 subspace_link.clone(),
1055 client.clone(),
1056 sync_oracle.clone(),
1057 telemetry.as_ref().map(|telemetry| telemetry.handle()),
1058 config.create_object_mappings,
1059 )
1060 })
1061 .map_err(ServiceError::Client)?;
1062
1063 task_manager
1064 .spawn_essential_handle()
1065 .spawn_essential_blocking(
1066 "subspace-archiver",
1067 None,
1068 Box::pin(async move {
1069 if let Err(error) = subspace_archiver.await {
1070 error!(%error, "Archiver exited with error");
1071 }
1072 }),
1073 );
1074
1075 network_wrapper.set(network_service.clone());
1076
1077 if !config.base.network.force_synced {
1078 pause_sync.store(true, Ordering::Release);
1080 }
1081
1082 let snap_sync_task = snap_sync(
1083 segment_headers_store.clone(),
1084 node.clone(),
1085 fork_id.map(|fork_id| fork_id.to_string()),
1086 Arc::clone(&client),
1087 import_queue_service1,
1088 pause_sync.clone(),
1089 piece_getter.clone(),
1090 sync_service.clone(),
1091 network_service_handle,
1092 subspace_link.erasure_coding().clone(),
1093 consensus_snap_sync_target_block_receiver,
1094 backend.offchain_storage(),
1095 network_service.clone(),
1096 );
1097
1098 let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1099 segment_headers_store.clone(),
1100 Arc::clone(&network_service),
1101 node.clone(),
1102 Arc::clone(&client),
1103 import_queue_service2,
1104 sync_service.clone(),
1105 sync_target_block_number,
1106 pause_sync,
1107 piece_getter,
1108 subspace_link.erasure_coding().clone(),
1109 );
1110 task_manager
1111 .spawn_handle()
1112 .spawn("observer", Some("sync-from-dsn"), observer);
1113 task_manager
1114 .spawn_essential_handle()
1115 .spawn_essential_blocking(
1116 "worker",
1117 Some("sync-from-dsn"),
1118 Box::pin(async move {
1119 if config.sync == ChainSyncMode::Snap
1121 && let Err(error) = snap_sync_task.in_current_span().await
1122 {
1123 error!(%error, "Snap sync exited with a fatal error");
1124 return;
1125 }
1126
1127 if let Err(error) = worker.await {
1128 error!(%error, "Sync from DSN exited with an error");
1129 }
1130 }),
1131 );
1132
1133 if let Some(registry) = substrate_prometheus_registry.as_ref() {
1134 match NodeMetrics::new(
1135 client.clone(),
1136 client.every_import_notification_stream(),
1137 registry,
1138 ) {
1139 Ok(node_metrics) => {
1140 task_manager.spawn_handle().spawn(
1141 "node_metrics",
1142 None,
1143 Box::pin(async move {
1144 node_metrics.run().await;
1145 }),
1146 );
1147 }
1148 Err(err) => {
1149 error!("Failed to initialize node metrics: {err:?}");
1150 }
1151 }
1152 }
1153
1154 let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1155
1156 if config.base.offchain_worker.enabled {
1157 let offchain_workers =
1158 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1159 runtime_api_provider: client.clone(),
1160 is_validator: config.base.role.is_authority(),
1161 keystore: Some(keystore_container.keystore()),
1162 offchain_db: backend.offchain_storage(),
1163 transaction_pool: Some(offchain_tx_pool_factory.clone()),
1164 network_provider: Arc::new(network_service.clone()),
1165 enable_http_requests: true,
1166 custom_extensions: |_| vec![],
1167 })?;
1168 task_manager.spawn_handle().spawn(
1169 "offchain-workers-runner",
1170 "offchain-worker",
1171 offchain_workers
1172 .run(client.clone(), task_manager.spawn_handle())
1173 .boxed(),
1174 );
1175 }
1176
1177 if offchain_indexing_enabled {
1179 task_manager.spawn_essential_handle().spawn_blocking(
1180 "mmr-gadget",
1181 None,
1182 mmr_gadget::MmrGadget::start(
1183 client.clone(),
1184 backend.clone(),
1185 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1186 ),
1187 );
1188 }
1189
1190 let backoff_authoring_blocks: Option<()> = None;
1191
1192 let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1193 let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1194 let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1195 let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1196 let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1197
1198 let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1199 config.is_timekeeper,
1200 config.timekeeper_cpu_cores,
1201 client.clone(),
1202 pot_verifier.clone(),
1203 Arc::clone(&network_service),
1204 pot_gossip_notification_service,
1205 sync_service.clone(),
1206 sync_oracle.clone(),
1207 )
1208 .map_err(|error| Error::Other(error.into()))?;
1209
1210 let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1211
1212 task_manager
1213 .spawn_essential_handle()
1214 .spawn("pot-source", Some("pot"), pot_source_worker.run());
1215 task_manager
1216 .spawn_essential_handle()
1217 .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1218
1219 if config.base.role.is_authority() || config.force_new_slot_notifications {
1220 let proposer_factory = ProposerFactory::new(
1221 task_manager.spawn_handle(),
1222 client.clone(),
1223 transaction_pool.clone(),
1224 substrate_prometheus_registry.as_ref(),
1225 telemetry.as_ref().map(|x| x.handle()),
1226 );
1227
1228 let subspace_slot_worker =
1229 SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1230 client: client.clone(),
1231 env: proposer_factory,
1232 block_import,
1233 sync_oracle: sync_oracle.clone(),
1234 justification_sync_link: sync_service.clone(),
1235 force_authoring: config.base.force_authoring,
1236 backoff_authoring_blocks,
1237 subspace_link: subspace_link.clone(),
1238 segment_headers_store: segment_headers_store.clone(),
1239 block_proposal_slot_portion,
1240 max_block_proposal_slot_portion: None,
1241 telemetry: telemetry.as_ref().map(|x| x.handle()),
1242 offchain_tx_pool_factory,
1243 pot_verifier,
1244 });
1245
1246 let create_inherent_data_providers = {
1247 let client = client.clone();
1248 let segment_headers_store = segment_headers_store.clone();
1249
1250 move |parent_hash, ()| {
1251 let client = client.clone();
1252 let segment_headers_store = segment_headers_store.clone();
1253
1254 async move {
1255 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1256
1257 let parent_header = client
1258 .header(parent_hash)?
1259 .expect("Parent header must always exist when block is created; qed");
1260
1261 let parent_block_number = parent_header.number;
1262
1263 let subspace_inherents =
1264 sp_consensus_subspace::inherents::InherentDataProvider::new(
1265 segment_headers_store
1266 .segment_headers_for_block(parent_block_number + 1),
1267 );
1268
1269 Ok((timestamp, subspace_inherents))
1270 }
1271 }
1272 };
1273
1274 info!("🧑🌾 Starting Subspace Authorship worker");
1275 let slot_worker_task = sc_proof_of_time::start_slot_worker(
1276 subspace_link.chain_constants().slot_duration(),
1277 client.clone(),
1278 select_chain.clone(),
1279 subspace_slot_worker,
1280 sync_oracle.clone(),
1281 create_inherent_data_providers,
1282 pot_slot_info_stream,
1283 );
1284
1285 task_manager.spawn_essential_handle().spawn_blocking(
1288 "subspace-proposer",
1289 Some("block-authoring"),
1290 slot_worker_task,
1291 );
1292 }
1293
1294 config.base.prometheus_config.take();
1296
1297 let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1298 network: network_service.clone(),
1299 client: client.clone(),
1300 keystore: keystore_container.keystore(),
1301 task_manager: &mut task_manager,
1302 transaction_pool: transaction_pool.clone(),
1303 rpc_builder: if enable_rpc_extensions {
1304 let client = client.clone();
1305 let new_slot_notification_stream = new_slot_notification_stream.clone();
1306 let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1307 let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1308 let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1309 let transaction_pool = transaction_pool.clone();
1310 let backend = backend.clone();
1311
1312 Box::new(move |subscription_executor| {
1313 let deps = rpc::FullDeps {
1314 client: client.clone(),
1315 pool: transaction_pool.clone(),
1316 subscription_executor,
1317 new_slot_notification_stream: new_slot_notification_stream.clone(),
1318 reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1319 object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1320 archived_segment_notification_stream: archived_segment_notification_stream
1321 .clone(),
1322 dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1323 segment_headers_store: segment_headers_store.clone(),
1324 sync_oracle: sync_oracle.clone(),
1325 kzg: subspace_link.kzg().clone(),
1326 erasure_coding: subspace_link.erasure_coding().clone(),
1327 backend: backend.clone(),
1328 };
1329
1330 rpc::create_full(deps).map_err(Into::into)
1331 })
1332 } else {
1333 Box::new(|_| Ok(RpcModule::new(())))
1334 },
1335 backend: backend.clone(),
1336 system_rpc_tx,
1337 config: config.base,
1338 telemetry: telemetry.as_mut(),
1339 tx_handler_controller,
1340 sync_service: sync_service.clone(),
1341 })?;
1342
1343 Ok(NewFull {
1344 task_manager,
1345 client,
1346 select_chain,
1347 network_service,
1348 xdm_gossip_notification_service,
1349 sync_service,
1350 rpc_handlers,
1351 backend,
1352 pot_slot_info_stream: additional_pot_slot_info_stream,
1353 new_slot_notification_stream,
1354 reward_signing_notification_stream,
1355 block_importing_notification_stream,
1356 object_mapping_notification_stream,
1357 archived_segment_notification_stream,
1358 transaction_pool,
1359 })
1360}
1361
1362fn confirmation_depth_storage_key() -> StorageKey {
1364 StorageKey(
1365 frame_support::storage::storage_prefix(
1366 "RuntimeConfigs".as_bytes(),
1369 "ConfirmationDepthK".as_bytes(),
1371 )
1372 .to_vec(),
1373 )
1374}
1375
1376fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1377 let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1378 let spec: serde_json::Value =
1379 serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1380 let encoded_confirmation_depth = hex::decode(
1381 spec.pointer(format!("/genesis/raw/top/{storage_key}").as_str())?
1382 .as_str()?
1383 .trim_start_matches("0x"),
1384 )
1385 .ok()?;
1386 u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1387}
1388
1389#[cfg(test)]
1390mod test {
1391 use static_assertions::const_assert_eq;
1392 use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1393 use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1394
1395 #[test]
1398 fn max_block_length_consistent() {
1399 const_assert_eq!(
1400 CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1401 ARCHIVER_MAX_BLOCK_LENGTH,
1402 );
1403 }
1404}