1#![feature(
3 duration_constructors,
4 impl_trait_in_assoc_type,
5 int_roundings,
6 let_chains,
7 type_alias_impl_trait,
8 type_changing_struct_update
9)]
10
11pub mod config;
12pub mod dsn;
13mod metrics;
14pub(crate) mod mmr;
15pub mod rpc;
16pub mod sync_from_dsn;
17mod task_spawner;
18mod utils;
19
20use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
21use crate::dsn::{create_dsn_instance, DsnConfigurationError};
22use crate::metrics::NodeMetrics;
23use crate::mmr::request_handler::MmrRequestHandler;
24use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
25use crate::sync_from_dsn::snap_sync::snap_sync;
26use crate::sync_from_dsn::DsnPieceGetter;
27use async_lock::Semaphore;
28use core::sync::atomic::{AtomicU32, Ordering};
29use cross_domain_message_gossip::xdm_gossip_peers_set_config;
30use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
31use frame_system_rpc_runtime_api::AccountNonceApi;
32use futures::channel::oneshot;
33use futures::FutureExt;
34use jsonrpsee::RpcModule;
35use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
36use parity_scale_codec::Decode;
37use parking_lot::Mutex;
38use prometheus_client::registry::Registry;
39use sc_basic_authorship::ProposerFactory;
40use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
41use sc_client_api::execution_extensions::ExtensionsFactory;
42use sc_client_api::{
43 AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
44};
45use sc_consensus::{
46 BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
47 DefaultImportQueue, ImportQueue, ImportResult,
48};
49use sc_consensus_slots::SlotProportion;
50use sc_consensus_subspace::archiver::{
51 create_subspace_archiver, ArchivedSegmentNotification, ObjectMappingNotification,
52 SegmentHeadersStore,
53};
54use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
55use sc_consensus_subspace::notification::SubspaceNotificationStream;
56use sc_consensus_subspace::slot_worker::{
57 NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
58 SubspaceSyncOracle,
59};
60use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
61use sc_consensus_subspace::SubspaceLink;
62use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
63use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
64use sc_network::service::traits::NetworkService;
65use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
66use sc_network_sync::block_relay_protocol::BlockRelayParams;
67use sc_network_sync::engine::SyncingEngine;
68use sc_network_sync::service::network::NetworkServiceProvider;
69use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
70use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
71use sc_proof_of_time::verifier::PotVerifier;
72use sc_service::error::Error as ServiceError;
73use sc_service::{
74 build_network_advanced, build_polkadot_syncing_strategy, BuildNetworkAdvancedParams,
75 Configuration, NetworkStarter, SpawnTasksParams, TaskManager,
76};
77use sc_subspace_block_relay::{
78 build_consensus_relay, BlockRelayConfigurationError, NetworkWrapper,
79};
80use sc_telemetry::{Telemetry, TelemetryWorker};
81use sc_transaction_pool::TransactionPoolHandle;
82use sc_transaction_pool_api::OffchainTransactionPoolFactory;
83use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
84use sp_block_builder::BlockBuilder;
85use sp_blockchain::HeaderMetadata;
86use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
87use sp_consensus_slots::Slot;
88use sp_consensus_subspace::digests::extract_pre_digest;
89use sp_consensus_subspace::{
90 KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
91};
92use sp_core::offchain::storage::OffchainDb;
93use sp_core::offchain::OffchainDbExt;
94use sp_core::traits::SpawnEssentialNamed;
95use sp_core::H256;
96use sp_domains::storage::StorageKey;
97use sp_domains::{BundleProducerElectionApi, DomainsApi};
98use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
99use sp_externalities::Extensions;
100use sp_messenger::MessengerApi;
101use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
102use sp_mmr_primitives::MmrApi;
103use sp_objects::ObjectsApi;
104use sp_offchain::OffchainWorkerApi;
105use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
106use sp_session::SessionKeys;
107use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
108use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
109use static_assertions::const_assert;
110use std::marker::PhantomData;
111use std::num::NonZeroUsize;
112use std::sync::Arc;
113use std::time::Duration;
114use subspace_core_primitives::pieces::Record;
115use subspace_core_primitives::pot::PotSeed;
116use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
117use subspace_erasure_coding::ErasureCoding;
118use subspace_kzg::Kzg;
119use subspace_networking::libp2p::multiaddr::Protocol;
120use subspace_networking::utils::piece_provider::PieceProvider;
121use subspace_proof_of_space::Table;
122use subspace_runtime_primitives::opaque::Block;
123use subspace_runtime_primitives::{AccountId, Balance, Hash, Nonce};
124use tokio::sync::broadcast;
125use tracing::{debug, error, info, Instrument};
126pub use utils::wait_for_block_import;
127
128const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
131
132const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
135const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
136const PIECE_PROVIDER_MULTIPLIER: usize = 10;
138
139#[derive(thiserror::Error, Debug)]
141pub enum Error {
142 #[error(transparent)]
144 Io(#[from] std::io::Error),
145
146 #[error(transparent)]
148 AddrFormatInvalid(#[from] std::net::AddrParseError),
149
150 #[error(transparent)]
152 Sub(#[from] sc_service::Error),
153
154 #[error(transparent)]
156 Consensus(#[from] sp_consensus::Error),
157
158 #[error(transparent)]
160 Telemetry(#[from] sc_telemetry::Error),
161
162 #[error(transparent)]
164 SubspaceDsn(#[from] DsnConfigurationError),
165
166 #[error(transparent)]
168 BlockRelay(#[from] BlockRelayConfigurationError),
169
170 #[error(transparent)]
172 Other(Box<dyn std::error::Error + Send + Sync>),
173}
174
175#[derive(Clone)]
177struct BlockImportWrapper<BI>(BI);
178
179#[async_trait::async_trait]
180impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
181where
182 Block: BlockT,
183 BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
184 + Send
185 + Sync,
186{
187 type Error = sp_consensus::Error;
188
189 async fn check_block(
190 &self,
191 block: BlockCheckParams<Block>,
192 ) -> Result<ImportResult, Self::Error> {
193 self.0
194 .check_block(block)
195 .await
196 .map_err(|error| sp_consensus::Error::Other(error.into()))
197 }
198
199 async fn import_block(
200 &self,
201 block: BlockImportParams<Block>,
202 ) -> Result<ImportResult, Self::Error> {
203 self.0
204 .import_block(block)
205 .await
206 .map_err(|error| sp_consensus::Error::Other(error.into()))
207 }
208}
209
210#[cfg(not(feature = "runtime-benchmarks"))]
212pub type HostFunctions = (
213 sp_io::SubstrateHostFunctions,
214 sp_consensus_subspace::consensus::HostFunctions,
215 sp_domains_fraud_proof::HostFunctions,
216 sp_subspace_mmr::HostFunctions,
217 sp_messenger_host_functions::HostFunctions,
218);
219
220#[cfg(feature = "runtime-benchmarks")]
222pub type HostFunctions = (
223 sp_io::SubstrateHostFunctions,
224 frame_benchmarking::benchmarking::HostFunctions,
225 sp_consensus_subspace::consensus::HostFunctions,
226 sp_domains_fraud_proof::HostFunctions,
227 sp_subspace_mmr::HostFunctions,
228 sp_messenger_host_functions::HostFunctions,
229);
230
231pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
233
234pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
236
237pub type FullBackend = sc_service::TFullBackend<Block>;
238pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
239
240struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
241 kzg: Kzg,
242 client: Arc<Client>,
243 backend: Arc<FullBackend>,
244 pot_verifier: PotVerifier,
245 domains_executor: Arc<sc_domains::RuntimeExecutor>,
246 confirmation_depth_k: BlockNumber,
247 _pos_table: PhantomData<(PosTable, DomainBlock)>,
248}
249
250impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
251 for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
252where
253 PosTable: Table,
254 Block: BlockT,
255 Block::Hash: From<H256> + Into<H256>,
256 DomainBlock: BlockT,
257 DomainBlock::Hash: Into<H256> + From<H256>,
258 Client: BlockBackend<Block>
259 + HeaderBackend<Block>
260 + ProvideRuntimeApi<Block>
261 + Send
262 + Sync
263 + 'static,
264 Client::Api: SubspaceApi<Block, PublicKey>
265 + DomainsApi<Block, DomainBlock::Header>
266 + BundleProducerElectionApi<Block, Balance>
267 + MmrApi<Block, H256, NumberFor<Block>>
268 + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
269{
270 fn extensions_for(
271 &self,
272 _block_hash: Block::Hash,
273 _block_number: NumberFor<Block>,
274 ) -> Extensions {
275 let confirmation_depth_k = self.confirmation_depth_k;
276 let mut exts = Extensions::new();
277 exts.register(KzgExtension::new(self.kzg.clone()));
278 exts.register(PosExtension::new::<PosTable>());
279 exts.register(PotExtension::new({
280 let client = Arc::clone(&self.client);
281 let pot_verifier = self.pot_verifier.clone();
282
283 Box::new(
284 move |parent_hash, slot, proof_of_time, quick_verification| {
285 let parent_hash = {
286 let mut converted_parent_hash = Block::Hash::default();
287 converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
288 converted_parent_hash
289 };
290
291 let parent_header = match client.header(parent_hash) {
292 Ok(Some(parent_header)) => parent_header,
293 Ok(None) => {
294 if quick_verification {
295 error!(
296 %parent_hash,
297 "Header not found during proof of time verification"
298 );
299
300 return false;
301 } else {
302 debug!(
303 %parent_hash,
304 "Header not found during proof of time verification"
305 );
306
307 return true;
310 }
311 }
312 Err(error) => {
313 error!(
314 %error,
315 %parent_hash,
316 "Failed to retrieve header during proof of time verification"
317 );
318
319 return false;
320 }
321 };
322
323 let parent_pre_digest = match extract_pre_digest(&parent_header) {
324 Ok(parent_pre_digest) => parent_pre_digest,
325 Err(error) => {
326 error!(
327 %error,
328 %parent_hash,
329 parent_number = %parent_header.number(),
330 "Failed to extract pre-digest from parent header during proof of \
331 time verification, this must never happen"
332 );
333
334 return false;
335 }
336 };
337
338 let parent_slot = parent_pre_digest.slot();
339 if slot <= *parent_slot {
340 return false;
341 }
342
343 let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
344 Ok(pot_parameters) => pot_parameters,
345 Err(error) => {
346 debug!(
347 %error,
348 %parent_hash,
349 parent_number = %parent_header.number(),
350 "Failed to retrieve proof of time parameters during proof of time \
351 verification"
352 );
353
354 return false;
355 }
356 };
357
358 let pot_input = if parent_header.number().is_zero() {
359 PotNextSlotInput {
360 slot: parent_slot + Slot::from(1),
361 slot_iterations: pot_parameters.slot_iterations(),
362 seed: pot_verifier.genesis_seed(),
363 }
364 } else {
365 let pot_info = parent_pre_digest.pot_info();
366
367 PotNextSlotInput::derive(
368 pot_parameters.slot_iterations(),
369 parent_slot,
370 pot_info.proof_of_time(),
371 &pot_parameters.next_parameters_change(),
372 )
373 };
374
375 if quick_verification {
379 pot_verifier.try_is_output_valid(
380 pot_input,
381 Slot::from(slot) - parent_slot,
382 proof_of_time,
383 pot_parameters.next_parameters_change(),
384 )
385 } else {
386 pot_verifier.is_output_valid(
387 pot_input,
388 Slot::from(slot) - parent_slot,
389 proof_of_time,
390 pot_parameters.next_parameters_change(),
391 )
392 }
393 },
394 )
395 }));
396
397 exts.register(FraudProofExtension::new(Arc::new(
398 FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
399 self.client.clone(),
400 self.domains_executor.clone(),
401 move |client, executor| {
402 let extension_factory =
403 DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
404 client,
405 executor,
406 confirmation_depth_k,
407 );
408 Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
409 },
410 ),
411 )));
412
413 exts.register(SubspaceMmrExtension::new(Arc::new(
414 SubspaceMmrHostFunctionsImpl::<Block, _>::new(
415 self.client.clone(),
416 confirmation_depth_k,
417 ),
418 )));
419
420 exts.register(MessengerExtension::new(Arc::new(
421 MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
422 self.client.clone(),
423 self.domains_executor.clone(),
424 ),
425 )));
426
427 if let Some(offchain_storage) = self.backend.offchain_storage() {
430 let offchain_db = OffchainDb::new(offchain_storage);
431 exts.register(OffchainDbExt::new(offchain_db));
432 }
433
434 exts
435 }
436}
437
438pub struct OtherPartialComponents<RuntimeApi>
440where
441 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
442{
443 pub block_import: BoxBlockImport<Block>,
445 pub subspace_link: SubspaceLink<Block>,
447 pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
449 pub pot_verifier: PotVerifier,
451 pub sync_target_block_number: Arc<AtomicU32>,
453 pub telemetry: Option<Telemetry>,
455}
456
457type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
458 FullClient<RuntimeApi>,
459 FullBackend,
460 FullSelectChain,
461 DefaultImportQueue<Block>,
462 TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
463 OtherPartialComponents<RuntimeApi>,
464>;
465
466#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
468pub fn new_partial<PosTable, RuntimeApi>(
469 config: &Configuration,
472 snap_sync: bool,
474 pot_external_entropy: &[u8],
475) -> Result<PartialComponents<RuntimeApi>, ServiceError>
476where
477 PosTable: Table,
478 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
479 RuntimeApi::RuntimeApi: ApiExt<Block>
480 + Metadata<Block>
481 + BlockBuilder<Block>
482 + OffchainWorkerApi<Block>
483 + SessionKeys<Block>
484 + TaggedTransactionQueue<Block>
485 + SubspaceApi<Block, PublicKey>
486 + DomainsApi<Block, DomainHeader>
487 + FraudProofApi<Block, DomainHeader>
488 + BundleProducerElectionApi<Block, Balance>
489 + ObjectsApi<Block>
490 + MmrApi<Block, H256, NumberFor<Block>>
491 + MessengerApi<Block, NumberFor<Block>, <Block as BlockT>::Hash>,
492{
493 let telemetry = config
494 .telemetry_endpoints
495 .clone()
496 .filter(|x| !x.is_empty())
497 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
498 let worker = TelemetryWorker::new(16)?;
499 let telemetry = worker.handle().new_telemetry(endpoints);
500 Ok((worker, telemetry))
501 })
502 .transpose()?;
503
504 let executor = sc_service::new_wasm_executor(&config.executor);
505 let domains_executor = sc_service::new_wasm_executor(&config.executor);
506
507 let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
508 ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
509 )?;
510
511 let backend = Arc::new(sc_client_db::Backend::new(
512 config.db_config(),
513 confirmation_depth_k.into(),
514 )?);
515
516 let genesis_block_builder = GenesisBlockBuilder::new(
517 config.chain_spec.as_storage_builder(),
518 !snap_sync,
519 backend.clone(),
520 executor.clone(),
521 )?;
522
523 let (client, backend, keystore_container, task_manager) =
524 sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
525 config,
526 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
527 executor.clone(),
528 backend,
529 genesis_block_builder,
530 false,
531 )?;
532
533 let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
535 rayon::join(Kzg::new, || {
536 ErasureCoding::new(
537 NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
538 .expect("Not zero; qed"),
539 )
540 .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
541 })
542 });
543 let erasure_coding = maybe_erasure_coding?;
544
545 let client = Arc::new(client);
546 let client_info = client.info();
547 let chain_constants = client
548 .runtime_api()
549 .chain_constants(client_info.best_hash)
550 .map_err(|error| ServiceError::Application(error.into()))?;
551
552 let pot_verifier = PotVerifier::new(
553 PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
554 POT_VERIFIER_CACHE_SIZE,
555 );
556
557 assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
559
560 client
561 .execution_extensions()
562 .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
563 kzg: kzg.clone(),
564 client: Arc::clone(&client),
565 pot_verifier: pot_verifier.clone(),
566 domains_executor: Arc::new(domains_executor),
567 backend: backend.clone(),
568 confirmation_depth_k: chain_constants.confirmation_depth_k(),
569 _pos_table: PhantomData,
570 });
571
572 let telemetry = telemetry.map(|(worker, telemetry)| {
573 task_manager
574 .spawn_handle()
575 .spawn("telemetry", None, worker.run());
576 telemetry
577 });
578
579 let select_chain = sc_consensus::LongestChain::new(backend.clone());
580
581 let segment_headers_store = tokio::task::block_in_place(|| {
582 SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
583 })
584 .map_err(|error| ServiceError::Application(error.into()))?;
585
586 let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
587 let segment_headers_store = segment_headers_store.clone();
588
589 let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
590 client.clone(),
591 client.clone(),
592 subspace_link.clone(),
593 {
594 let client = client.clone();
595 let segment_headers_store = segment_headers_store.clone();
596
597 move |parent_hash, ()| {
598 let client = client.clone();
599 let segment_headers_store = segment_headers_store.clone();
600
601 async move {
602 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
603
604 let parent_header = client
605 .header(parent_hash)?
606 .expect("Parent header must always exist when block is created; qed");
607
608 let parent_block_number = parent_header.number;
609
610 let subspace_inherents =
611 sp_consensus_subspace::inherents::InherentDataProvider::new(
612 segment_headers_store
613 .segment_headers_for_block(parent_block_number + 1),
614 );
615
616 Ok((timestamp, subspace_inherents))
617 }
618 }
619 },
620 segment_headers_store.clone(),
621 pot_verifier.clone(),
622 );
623
624 let sync_target_block_number = Arc::new(AtomicU32::new(0));
625 let transaction_pool = Arc::from(
626 sc_transaction_pool::Builder::new(
627 task_manager.spawn_essential_handle(),
628 client.clone(),
629 config.role.is_authority().into(),
630 )
631 .with_options(config.transaction_pool.clone())
632 .with_prometheus(config.prometheus_registry())
633 .build(),
634 );
635
636 let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
637 client: client.clone(),
638 chain_constants,
639 kzg,
640 telemetry: telemetry.as_ref().map(|x| x.handle()),
641 reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
642 sync_target_block_number: Arc::clone(&sync_target_block_number),
643 is_authoring_blocks: config.role.is_authority(),
644 pot_verifier: pot_verifier.clone(),
645 });
646
647 let import_queue = BasicQueue::new(
648 verifier,
649 Box::new(BlockImportWrapper(block_import.clone())),
650 None,
651 &task_manager.spawn_essential_handle(),
652 config.prometheus_registry(),
653 );
654
655 let other = OtherPartialComponents {
656 block_import: Box::new(BlockImportWrapper(block_import.clone())),
657 subspace_link,
658 segment_headers_store,
659 pot_verifier,
660 sync_target_block_number,
661 telemetry,
662 };
663
664 Ok(PartialComponents {
665 client,
666 backend,
667 task_manager,
668 import_queue,
669 keystore_container,
670 select_chain,
671 transaction_pool,
672 other,
673 })
674}
675
676pub struct NewFull<Client>
678where
679 Client: ProvideRuntimeApi<Block>
680 + AuxStore
681 + BlockBackend<Block>
682 + BlockIdTo<Block>
683 + HeaderBackend<Block>
684 + HeaderMetadata<Block, Error = sp_blockchain::Error>
685 + 'static,
686 Client::Api: TaggedTransactionQueue<Block>
687 + DomainsApi<Block, DomainHeader>
688 + FraudProofApi<Block, DomainHeader>
689 + SubspaceApi<Block, PublicKey>
690 + MmrApi<Block, H256, NumberFor<Block>>
691 + MessengerApi<Block, NumberFor<Block>, <Block as BlockT>::Hash>,
692{
693 pub task_manager: TaskManager,
695 pub client: Arc<Client>,
697 pub select_chain: FullSelectChain,
699 pub network_service: Arc<dyn NetworkService + Send + Sync>,
701 pub xdm_gossip_notification_service: Box<dyn NotificationService>,
703 pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
705 pub rpc_handlers: sc_service::RpcHandlers,
707 pub backend: Arc<FullBackend>,
709 pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
711 pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
714 pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
716 pub block_importing_notification_stream:
718 SubspaceNotificationStream<BlockImportingNotification<Block>>,
719 pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
721 pub archived_segment_notification_stream:
723 SubspaceNotificationStream<ArchivedSegmentNotification>,
724 pub network_starter: NetworkStarter,
726 pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
728}
729
730type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
731
732pub async fn new_full<PosTable, RuntimeApi>(
734 mut config: SubspaceConfiguration,
735 partial_components: PartialComponents<RuntimeApi>,
736 prometheus_registry: Option<&mut Registry>,
737 enable_rpc_extensions: bool,
738 block_proposal_slot_portion: SlotProportion,
739 consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
740) -> Result<FullNode<RuntimeApi>, Error>
741where
742 PosTable: Table,
743 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
744 RuntimeApi::RuntimeApi: ApiExt<Block>
745 + Metadata<Block>
746 + AccountNonceApi<Block, AccountId, Nonce>
747 + BlockBuilder<Block>
748 + OffchainWorkerApi<Block>
749 + SessionKeys<Block>
750 + TaggedTransactionQueue<Block>
751 + TransactionPaymentApi<Block, Balance>
752 + SubspaceApi<Block, PublicKey>
753 + DomainsApi<Block, DomainHeader>
754 + FraudProofApi<Block, DomainHeader>
755 + ObjectsApi<Block>
756 + MmrApi<Block, Hash, BlockNumber>
757 + MessengerApi<Block, NumberFor<Block>, <Block as BlockT>::Hash>,
758{
759 let PartialComponents {
760 client,
761 backend,
762 mut task_manager,
763 import_queue,
764 keystore_container,
765 select_chain,
766 transaction_pool,
767 other,
768 } = partial_components;
769 let OtherPartialComponents {
770 block_import,
771 subspace_link,
772 segment_headers_store,
773 pot_verifier,
774 sync_target_block_number,
775 mut telemetry,
776 } = other;
777
778 let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
779 let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
780 SubspaceNetworking::Reuse {
781 node,
782 bootstrap_nodes,
783 piece_getter,
784 } => (node, bootstrap_nodes, piece_getter),
785 SubspaceNetworking::Create { config: dsn_config } => {
786 let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
787
788 debug!(
789 chain_type=?config.base.chain_spec.chain_type(),
790 genesis_hash=%hex::encode(client.chain_info().genesis_hash),
791 "Setting DSN protocol version..."
792 );
793
794 let out_connections = dsn_config.max_out_connections;
795 let (node, mut node_runner) = create_dsn_instance(
796 dsn_protocol_version,
797 dsn_config.clone(),
798 prometheus_registry,
799 )?;
800
801 info!("Subspace networking initialized: Node ID is {}", node.id());
802
803 node.on_new_listener(Arc::new({
804 let node = node.clone();
805
806 move |address| {
807 info!(
808 "DSN listening on {}",
809 address.clone().with(Protocol::P2p(node.id()))
810 );
811 }
812 }))
813 .detach();
814
815 task_manager
816 .spawn_essential_handle()
817 .spawn_essential_blocking(
818 "node-runner",
819 Some("subspace-networking"),
820 Box::pin(
821 async move {
822 node_runner.run().await;
823 }
824 .in_current_span(),
825 ),
826 );
827
828 let piece_provider = PieceProvider::new(
829 node.clone(),
830 SegmentCommitmentPieceValidator::new(
831 node.clone(),
832 subspace_link.kzg().clone(),
833 segment_headers_store.clone(),
834 ),
835 Arc::new(Semaphore::new(
836 out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
837 )),
838 );
839
840 (
841 node,
842 dsn_config.bootstrap_nodes,
843 Arc::new(DsnPieceGetter::new(piece_provider)) as _,
844 )
845 }
846 };
847
848 let dsn_bootstrap_nodes = {
849 if bootstrap_nodes.is_empty() {
852 let (node_address_sender, node_address_receiver) = oneshot::channel();
853 let _handler = node.on_new_listener(Arc::new({
854 let node_address_sender = Mutex::new(Some(node_address_sender));
855
856 move |address| {
857 if matches!(address.iter().next(), Some(Protocol::Ip4(_))) {
858 if let Some(node_address_sender) = node_address_sender.lock().take() {
859 if let Err(err) = node_address_sender.send(address.clone()) {
860 debug!(?err, "Couldn't send a node address to the channel.");
861 }
862 }
863 }
864 }
865 }));
866
867 let mut node_listeners = node.listeners();
868
869 if node_listeners.is_empty() {
870 let Ok(listener) = node_address_receiver.await else {
871 return Err(Error::Other(
872 "Oneshot receiver dropped before DSN node listener was ready"
873 .to_string()
874 .into(),
875 ));
876 };
877
878 node_listeners = vec![listener];
879 }
880
881 node_listeners.iter_mut().for_each(|multiaddr| {
882 multiaddr.push(Protocol::P2p(node.id()));
883 });
884
885 node_listeners
886 } else {
887 bootstrap_nodes
888 }
889 };
890
891 let substrate_prometheus_registry = config
892 .base
893 .prometheus_config
894 .as_ref()
895 .map(|prometheus_config| prometheus_config.registry.clone());
896 let import_queue_service1 = import_queue.service();
897 let import_queue_service2 = import_queue.service();
898 let network_wrapper = Arc::new(NetworkWrapper::default());
899 let block_relay = build_consensus_relay(
900 network_wrapper.clone(),
901 client.clone(),
902 transaction_pool.clone(),
903 substrate_prometheus_registry.as_ref(),
904 )
905 .map_err(Error::BlockRelay)?;
906 let mut net_config = sc_network::config::FullNetworkConfiguration::new(
907 &config.base.network,
908 substrate_prometheus_registry.clone(),
909 );
910 let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
911 xdm_gossip_peers_set_config();
912 net_config.add_notification_protocol(xdm_gossip_notification_config);
913 let (pot_gossip_notification_config, pot_gossip_notification_service) =
914 pot_gossip_peers_set_config();
915 net_config.add_notification_protocol(pot_gossip_notification_config);
916 let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
917
918 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
919 + net_config
920 .network_config
921 .default_peers_set
922 .reserved_nodes
923 .len();
924
925 let protocol_id = config.base.protocol_id();
926 let fork_id = config.base.chain_spec.fork_id();
927
928 let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
930 NetworkWorker<Block, <Block as BlockT>::Hash>,
931 >(fork_id, client.clone(), num_peer_hint);
932 task_manager.spawn_handle().spawn(
933 "domain-block-er-request-handler",
934 Some("networking"),
935 handler.run(),
936 );
937 net_config.add_request_response_protocol(protocol_config);
938
939 if let Some(offchain_storage) = backend.offchain_storage() {
940 let (handler, protocol_config) =
942 MmrRequestHandler::new::<NetworkWorker<Block, <Block as BlockT>::Hash>>(
943 &config.base.protocol_id(),
944 fork_id,
945 client.clone(),
946 num_peer_hint,
947 offchain_storage,
948 );
949 task_manager
950 .spawn_handle()
951 .spawn("mmr-request-handler", Some("networking"), handler.run());
952
953 net_config.add_request_response_protocol(protocol_config);
954 }
955
956 let network_service_provider = NetworkServiceProvider::new();
957 let network_service_handle = network_service_provider.handle();
958 let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = {
959 let spawn_handle = task_manager.spawn_handle();
960 let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
961
962 let block_downloader = {
964 let BlockRelayParams {
965 mut server,
966 downloader,
967 request_response_config,
968 } = block_relay;
969 net_config.add_request_response_protocol(request_response_config);
970
971 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
972 server.run().await;
973 });
974
975 downloader
976 };
977
978 let syncing_strategy = build_polkadot_syncing_strategy(
979 protocol_id.clone(),
980 fork_id,
981 &mut net_config,
982 None,
983 block_downloader,
984 client.clone(),
985 &spawn_handle,
986 substrate_prometheus_registry.as_ref(),
987 )?;
988
989 let (syncing_engine, sync_service, block_announce_config) =
990 SyncingEngine::new::<NetworkWorker<_, _>>(
991 Roles::from(&config.base.role),
992 Arc::clone(&client),
993 substrate_prometheus_registry.as_ref(),
994 metrics.clone(),
995 &net_config,
996 protocol_id.clone(),
997 fork_id,
998 Box::new(DefaultBlockAnnounceValidator),
999 syncing_strategy,
1000 network_service_provider.handle(),
1001 import_queue.service(),
1002 net_config.peer_store_handle(),
1003 config.base.network.force_synced,
1004 )
1005 .map_err(sc_service::Error::from)?;
1006
1007 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1008
1009 build_network_advanced(BuildNetworkAdvancedParams {
1010 role: config.base.role,
1011 protocol_id,
1012 fork_id,
1013 ipfs_server: config.base.network.ipfs_server,
1014 announce_block: config.base.announce_block,
1015 net_config,
1016 client: Arc::clone(&client),
1017 transaction_pool: Arc::clone(&transaction_pool),
1018 spawn_handle,
1019 import_queue,
1020 sync_service,
1021 block_announce_config,
1022 network_service_provider,
1023 metrics_registry: substrate_prometheus_registry.as_ref(),
1024 metrics,
1025 })?
1026 };
1027
1028 task_manager.spawn_handle().spawn(
1029 "sync-target-follower",
1030 None,
1031 Box::pin({
1032 let sync_service = sync_service.clone();
1033 let sync_target_block_number = Arc::clone(&sync_target_block_number);
1034
1035 async move {
1036 loop {
1037 let best_seen_block = sync_service
1038 .status()
1039 .await
1040 .map(|status| status.best_seen_block.unwrap_or_default())
1041 .unwrap_or_default();
1042 sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1043
1044 tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1045 }
1046 }
1047 }),
1048 );
1049
1050 let sync_oracle = SubspaceSyncOracle::new(
1051 config.base.force_authoring,
1052 Arc::clone(&pause_sync),
1053 sync_service.clone(),
1054 );
1055
1056 let subspace_archiver = tokio::task::block_in_place(|| {
1057 create_subspace_archiver(
1058 segment_headers_store.clone(),
1059 subspace_link.clone(),
1060 client.clone(),
1061 sync_oracle.clone(),
1062 telemetry.as_ref().map(|telemetry| telemetry.handle()),
1063 config.create_object_mappings,
1064 )
1065 })
1066 .map_err(ServiceError::Client)?;
1067
1068 task_manager
1069 .spawn_essential_handle()
1070 .spawn_essential_blocking(
1071 "subspace-archiver",
1072 None,
1073 Box::pin(async move {
1074 if let Err(error) = subspace_archiver.await {
1075 error!(%error, "Archiver exited with error");
1076 }
1077 }),
1078 );
1079
1080 network_wrapper.set(network_service.clone());
1081
1082 if !config.base.network.force_synced {
1083 pause_sync.store(true, Ordering::Release);
1085 }
1086
1087 let snap_sync_task = snap_sync(
1088 segment_headers_store.clone(),
1089 node.clone(),
1090 fork_id.map(|fork_id| fork_id.to_string()),
1091 Arc::clone(&client),
1092 import_queue_service1,
1093 pause_sync.clone(),
1094 piece_getter.clone(),
1095 sync_service.clone(),
1096 network_service_handle,
1097 subspace_link.erasure_coding().clone(),
1098 consensus_snap_sync_target_block_receiver,
1099 backend.offchain_storage(),
1100 network_service.clone(),
1101 );
1102
1103 let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1104 segment_headers_store.clone(),
1105 Arc::clone(&network_service),
1106 node.clone(),
1107 Arc::clone(&client),
1108 import_queue_service2,
1109 sync_service.clone(),
1110 sync_target_block_number,
1111 pause_sync,
1112 piece_getter,
1113 subspace_link.erasure_coding().clone(),
1114 );
1115 task_manager
1116 .spawn_handle()
1117 .spawn("observer", Some("sync-from-dsn"), observer);
1118 task_manager
1119 .spawn_essential_handle()
1120 .spawn_essential_blocking(
1121 "worker",
1122 Some("sync-from-dsn"),
1123 Box::pin(async move {
1124 if config.sync == ChainSyncMode::Snap {
1126 if let Err(error) = snap_sync_task.in_current_span().await {
1127 error!(%error, "Snap sync exited with a fatal error");
1128 return;
1129 }
1130 }
1131
1132 if let Err(error) = worker.await {
1133 error!(%error, "Sync from DSN exited with an error");
1134 }
1135 }),
1136 );
1137
1138 if let Some(registry) = substrate_prometheus_registry.as_ref() {
1139 match NodeMetrics::new(
1140 client.clone(),
1141 client.every_import_notification_stream(),
1142 registry,
1143 ) {
1144 Ok(node_metrics) => {
1145 task_manager.spawn_handle().spawn(
1146 "node_metrics",
1147 None,
1148 Box::pin(async move {
1149 node_metrics.run().await;
1150 }),
1151 );
1152 }
1153 Err(err) => {
1154 error!("Failed to initialize node metrics: {err:?}");
1155 }
1156 }
1157 }
1158
1159 let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1160
1161 if config.base.offchain_worker.enabled {
1162 let offchain_workers =
1163 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1164 runtime_api_provider: client.clone(),
1165 is_validator: config.base.role.is_authority(),
1166 keystore: Some(keystore_container.keystore()),
1167 offchain_db: backend.offchain_storage(),
1168 transaction_pool: Some(offchain_tx_pool_factory.clone()),
1169 network_provider: Arc::new(network_service.clone()),
1170 enable_http_requests: true,
1171 custom_extensions: |_| vec![],
1172 })?;
1173 task_manager.spawn_handle().spawn(
1174 "offchain-workers-runner",
1175 "offchain-worker",
1176 offchain_workers
1177 .run(client.clone(), task_manager.spawn_handle())
1178 .boxed(),
1179 );
1180 }
1181
1182 if offchain_indexing_enabled {
1184 task_manager.spawn_essential_handle().spawn_blocking(
1185 "mmr-gadget",
1186 None,
1187 mmr_gadget::MmrGadget::start(
1188 client.clone(),
1189 backend.clone(),
1190 sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1191 ),
1192 );
1193 }
1194
1195 let backoff_authoring_blocks: Option<()> = None;
1196
1197 let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1198 let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1199 let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1200 let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1201 let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1202
1203 let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1204 config.is_timekeeper,
1205 config.timekeeper_cpu_cores,
1206 client.clone(),
1207 pot_verifier.clone(),
1208 Arc::clone(&network_service),
1209 pot_gossip_notification_service,
1210 sync_service.clone(),
1211 sync_oracle.clone(),
1212 )
1213 .map_err(|error| Error::Other(error.into()))?;
1214
1215 let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1216
1217 task_manager
1218 .spawn_essential_handle()
1219 .spawn("pot-source", Some("pot"), pot_source_worker.run());
1220 task_manager
1221 .spawn_essential_handle()
1222 .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1223
1224 if config.base.role.is_authority() || config.force_new_slot_notifications {
1225 let proposer_factory = ProposerFactory::new(
1226 task_manager.spawn_handle(),
1227 client.clone(),
1228 transaction_pool.clone(),
1229 substrate_prometheus_registry.as_ref(),
1230 telemetry.as_ref().map(|x| x.handle()),
1231 );
1232
1233 let subspace_slot_worker =
1234 SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1235 client: client.clone(),
1236 env: proposer_factory,
1237 block_import,
1238 sync_oracle: sync_oracle.clone(),
1239 justification_sync_link: sync_service.clone(),
1240 force_authoring: config.base.force_authoring,
1241 backoff_authoring_blocks,
1242 subspace_link: subspace_link.clone(),
1243 segment_headers_store: segment_headers_store.clone(),
1244 block_proposal_slot_portion,
1245 max_block_proposal_slot_portion: None,
1246 telemetry: telemetry.as_ref().map(|x| x.handle()),
1247 offchain_tx_pool_factory,
1248 pot_verifier,
1249 });
1250
1251 let create_inherent_data_providers = {
1252 let client = client.clone();
1253 let segment_headers_store = segment_headers_store.clone();
1254
1255 move |parent_hash, ()| {
1256 let client = client.clone();
1257 let segment_headers_store = segment_headers_store.clone();
1258
1259 async move {
1260 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1261
1262 let parent_header = client
1263 .header(parent_hash)?
1264 .expect("Parent header must always exist when block is created; qed");
1265
1266 let parent_block_number = parent_header.number;
1267
1268 let subspace_inherents =
1269 sp_consensus_subspace::inherents::InherentDataProvider::new(
1270 segment_headers_store
1271 .segment_headers_for_block(parent_block_number + 1),
1272 );
1273
1274 Ok((timestamp, subspace_inherents))
1275 }
1276 }
1277 };
1278
1279 info!(target: "subspace", "🧑🌾 Starting Subspace Authorship worker");
1280 let slot_worker_task = sc_proof_of_time::start_slot_worker(
1281 subspace_link.chain_constants().slot_duration(),
1282 client.clone(),
1283 select_chain.clone(),
1284 subspace_slot_worker,
1285 sync_oracle.clone(),
1286 create_inherent_data_providers,
1287 pot_slot_info_stream,
1288 );
1289
1290 task_manager.spawn_essential_handle().spawn_blocking(
1293 "subspace-proposer",
1294 Some("block-authoring"),
1295 slot_worker_task,
1296 );
1297 }
1298
1299 config.base.prometheus_config.take();
1301
1302 let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1303 network: network_service.clone(),
1304 client: client.clone(),
1305 keystore: keystore_container.keystore(),
1306 task_manager: &mut task_manager,
1307 transaction_pool: transaction_pool.clone(),
1308 rpc_builder: if enable_rpc_extensions {
1309 let client = client.clone();
1310 let new_slot_notification_stream = new_slot_notification_stream.clone();
1311 let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1312 let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1313 let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1314 let transaction_pool = transaction_pool.clone();
1315 let backend = backend.clone();
1316
1317 Box::new(move |subscription_executor| {
1318 let deps = rpc::FullDeps {
1319 client: client.clone(),
1320 pool: transaction_pool.clone(),
1321 subscription_executor,
1322 new_slot_notification_stream: new_slot_notification_stream.clone(),
1323 reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1324 object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1325 archived_segment_notification_stream: archived_segment_notification_stream
1326 .clone(),
1327 dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1328 segment_headers_store: segment_headers_store.clone(),
1329 sync_oracle: sync_oracle.clone(),
1330 kzg: subspace_link.kzg().clone(),
1331 erasure_coding: subspace_link.erasure_coding().clone(),
1332 backend: backend.clone(),
1333 };
1334
1335 rpc::create_full(deps).map_err(Into::into)
1336 })
1337 } else {
1338 Box::new(|_| Ok(RpcModule::new(())))
1339 },
1340 backend: backend.clone(),
1341 system_rpc_tx,
1342 config: config.base,
1343 telemetry: telemetry.as_mut(),
1344 tx_handler_controller,
1345 sync_service: sync_service.clone(),
1346 })?;
1347
1348 Ok(NewFull {
1349 task_manager,
1350 client,
1351 select_chain,
1352 network_service,
1353 xdm_gossip_notification_service,
1354 sync_service,
1355 rpc_handlers,
1356 backend,
1357 pot_slot_info_stream: additional_pot_slot_info_stream,
1358 new_slot_notification_stream,
1359 reward_signing_notification_stream,
1360 block_importing_notification_stream,
1361 object_mapping_notification_stream,
1362 archived_segment_notification_stream,
1363 network_starter,
1364 transaction_pool,
1365 })
1366}
1367
1368fn confirmation_depth_storage_key() -> StorageKey {
1370 StorageKey(
1371 frame_support::storage::storage_prefix(
1372 "RuntimeConfigs".as_bytes(),
1375 "ConfirmationDepthK".as_bytes(),
1377 )
1378 .to_vec(),
1379 )
1380}
1381
1382fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1383 let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1384 let spec: serde_json::Value =
1385 serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1386 let encoded_confirmation_depth = hex::decode(
1387 spec.pointer(format!("/genesis/raw/top/{}", storage_key).as_str())?
1388 .as_str()?
1389 .trim_start_matches("0x"),
1390 )
1391 .ok()?;
1392 u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1393}
1394
1395#[cfg(test)]
1396mod test {
1397 use static_assertions::const_assert_eq;
1398 use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1399 use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1400
1401 #[test]
1404 fn max_block_length_consistent() {
1405 const_assert_eq!(
1406 CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1407 ARCHIVER_MAX_BLOCK_LENGTH,
1408 );
1409 }
1410}