subspace_service/
lib.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2#![feature(
3    duration_constructors,
4    impl_trait_in_assoc_type,
5    int_roundings,
6    let_chains,
7    type_alias_impl_trait,
8    type_changing_struct_update
9)]
10
11pub mod config;
12pub mod dsn;
13mod metrics;
14pub(crate) mod mmr;
15pub mod rpc;
16pub mod sync_from_dsn;
17mod task_spawner;
18mod utils;
19
20use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
21use crate::dsn::{create_dsn_instance, DsnConfigurationError};
22use crate::metrics::NodeMetrics;
23use crate::mmr::request_handler::MmrRequestHandler;
24use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
25use crate::sync_from_dsn::snap_sync::snap_sync;
26use crate::sync_from_dsn::DsnPieceGetter;
27use async_lock::Semaphore;
28use core::sync::atomic::{AtomicU32, Ordering};
29use cross_domain_message_gossip::xdm_gossip_peers_set_config;
30use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
31use frame_system_rpc_runtime_api::AccountNonceApi;
32use futures::channel::oneshot;
33use futures::FutureExt;
34use jsonrpsee::RpcModule;
35use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
36use parity_scale_codec::Decode;
37use parking_lot::Mutex;
38use prometheus_client::registry::Registry;
39use sc_basic_authorship::ProposerFactory;
40use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
41use sc_client_api::execution_extensions::ExtensionsFactory;
42use sc_client_api::{
43    AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
44};
45use sc_consensus::{
46    BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
47    DefaultImportQueue, ImportQueue, ImportResult,
48};
49use sc_consensus_slots::SlotProportion;
50use sc_consensus_subspace::archiver::{
51    create_subspace_archiver, ArchivedSegmentNotification, ObjectMappingNotification,
52    SegmentHeadersStore,
53};
54use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
55use sc_consensus_subspace::notification::SubspaceNotificationStream;
56use sc_consensus_subspace::slot_worker::{
57    NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
58    SubspaceSyncOracle,
59};
60use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
61use sc_consensus_subspace::SubspaceLink;
62use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
63use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
64use sc_network::service::traits::NetworkService;
65use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
66use sc_network_sync::block_relay_protocol::BlockRelayParams;
67use sc_network_sync::engine::SyncingEngine;
68use sc_network_sync::service::network::NetworkServiceProvider;
69use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
70use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
71use sc_proof_of_time::verifier::PotVerifier;
72use sc_service::error::Error as ServiceError;
73use sc_service::{
74    build_network_advanced, build_polkadot_syncing_strategy, BuildNetworkAdvancedParams,
75    Configuration, NetworkStarter, SpawnTasksParams, TaskManager,
76};
77use sc_subspace_block_relay::{
78    build_consensus_relay, BlockRelayConfigurationError, NetworkWrapper,
79};
80use sc_telemetry::{Telemetry, TelemetryWorker};
81use sc_transaction_pool::TransactionPoolHandle;
82use sc_transaction_pool_api::OffchainTransactionPoolFactory;
83use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
84use sp_block_builder::BlockBuilder;
85use sp_blockchain::HeaderMetadata;
86use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
87use sp_consensus_slots::Slot;
88use sp_consensus_subspace::digests::extract_pre_digest;
89use sp_consensus_subspace::{
90    KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
91};
92use sp_core::offchain::storage::OffchainDb;
93use sp_core::offchain::OffchainDbExt;
94use sp_core::traits::SpawnEssentialNamed;
95use sp_core::H256;
96use sp_domains::storage::StorageKey;
97use sp_domains::{BundleProducerElectionApi, DomainsApi};
98use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
99use sp_externalities::Extensions;
100use sp_messenger::MessengerApi;
101use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
102use sp_mmr_primitives::MmrApi;
103use sp_objects::ObjectsApi;
104use sp_offchain::OffchainWorkerApi;
105use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
106use sp_session::SessionKeys;
107use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
108use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
109use static_assertions::const_assert;
110use std::marker::PhantomData;
111use std::num::NonZeroUsize;
112use std::sync::Arc;
113use std::time::Duration;
114use subspace_core_primitives::pieces::Record;
115use subspace_core_primitives::pot::PotSeed;
116use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
117use subspace_erasure_coding::ErasureCoding;
118use subspace_kzg::Kzg;
119use subspace_networking::libp2p::multiaddr::Protocol;
120use subspace_networking::utils::piece_provider::PieceProvider;
121use subspace_proof_of_space::Table;
122use subspace_runtime_primitives::opaque::Block;
123use subspace_runtime_primitives::{AccountId, Balance, Hash, Nonce};
124use tokio::sync::broadcast;
125use tracing::{debug, error, info, Instrument};
126pub use utils::wait_for_block_import;
127
128// There are multiple places where it is assumed that node is running on 64-bit system, refuse to
129// compile otherwise
130const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
131
132/// This is over 15 minutes of slots assuming there are no forks, should be both sufficient and not
133/// too large to handle
134const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
135const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
136/// Multiplier on top of outgoing connections number for piece downloading purposes
137const PIECE_PROVIDER_MULTIPLIER: usize = 10;
138
139/// Error type for Subspace service.
140#[derive(thiserror::Error, Debug)]
141pub enum Error {
142    /// IO error.
143    #[error(transparent)]
144    Io(#[from] std::io::Error),
145
146    /// Address parsing error.
147    #[error(transparent)]
148    AddrFormatInvalid(#[from] std::net::AddrParseError),
149
150    /// Substrate service error.
151    #[error(transparent)]
152    Sub(#[from] sc_service::Error),
153
154    /// Substrate consensus error.
155    #[error(transparent)]
156    Consensus(#[from] sp_consensus::Error),
157
158    /// Telemetry error.
159    #[error(transparent)]
160    Telemetry(#[from] sc_telemetry::Error),
161
162    /// Subspace networking (DSN) error.
163    #[error(transparent)]
164    SubspaceDsn(#[from] DsnConfigurationError),
165
166    /// Failed to set up block relay.
167    #[error(transparent)]
168    BlockRelay(#[from] BlockRelayConfigurationError),
169
170    /// Other.
171    #[error(transparent)]
172    Other(Box<dyn std::error::Error + Send + Sync>),
173}
174
175// Simple wrapper whose ony purpose is to convert error type
176#[derive(Clone)]
177struct BlockImportWrapper<BI>(BI);
178
179#[async_trait::async_trait]
180impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
181where
182    Block: BlockT,
183    BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
184        + Send
185        + Sync,
186{
187    type Error = sp_consensus::Error;
188
189    async fn check_block(
190        &self,
191        block: BlockCheckParams<Block>,
192    ) -> Result<ImportResult, Self::Error> {
193        self.0
194            .check_block(block)
195            .await
196            .map_err(|error| sp_consensus::Error::Other(error.into()))
197    }
198
199    async fn import_block(
200        &self,
201        block: BlockImportParams<Block>,
202    ) -> Result<ImportResult, Self::Error> {
203        self.0
204            .import_block(block)
205            .await
206            .map_err(|error| sp_consensus::Error::Other(error.into()))
207    }
208}
209
210/// Host functions required for Subspace
211#[cfg(not(feature = "runtime-benchmarks"))]
212pub type HostFunctions = (
213    sp_io::SubstrateHostFunctions,
214    sp_consensus_subspace::consensus::HostFunctions,
215    sp_domains_fraud_proof::HostFunctions,
216    sp_subspace_mmr::HostFunctions,
217    sp_messenger_host_functions::HostFunctions,
218);
219
220/// Host functions required for Subspace
221#[cfg(feature = "runtime-benchmarks")]
222pub type HostFunctions = (
223    sp_io::SubstrateHostFunctions,
224    frame_benchmarking::benchmarking::HostFunctions,
225    sp_consensus_subspace::consensus::HostFunctions,
226    sp_domains_fraud_proof::HostFunctions,
227    sp_subspace_mmr::HostFunctions,
228    sp_messenger_host_functions::HostFunctions,
229);
230
231/// Runtime executor for Subspace
232pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
233
234/// Subspace-like full client.
235pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
236
237pub type FullBackend = sc_service::TFullBackend<Block>;
238pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
239
240struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
241    kzg: Kzg,
242    client: Arc<Client>,
243    backend: Arc<FullBackend>,
244    pot_verifier: PotVerifier,
245    domains_executor: Arc<sc_domains::RuntimeExecutor>,
246    confirmation_depth_k: BlockNumber,
247    _pos_table: PhantomData<(PosTable, DomainBlock)>,
248}
249
250impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
251    for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
252where
253    PosTable: Table,
254    Block: BlockT,
255    Block::Hash: From<H256> + Into<H256>,
256    DomainBlock: BlockT,
257    DomainBlock::Hash: Into<H256> + From<H256>,
258    Client: BlockBackend<Block>
259        + HeaderBackend<Block>
260        + ProvideRuntimeApi<Block>
261        + Send
262        + Sync
263        + 'static,
264    Client::Api: SubspaceApi<Block, PublicKey>
265        + DomainsApi<Block, DomainBlock::Header>
266        + BundleProducerElectionApi<Block, Balance>
267        + MmrApi<Block, H256, NumberFor<Block>>
268        + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
269{
270    fn extensions_for(
271        &self,
272        _block_hash: Block::Hash,
273        _block_number: NumberFor<Block>,
274    ) -> Extensions {
275        let confirmation_depth_k = self.confirmation_depth_k;
276        let mut exts = Extensions::new();
277        exts.register(KzgExtension::new(self.kzg.clone()));
278        exts.register(PosExtension::new::<PosTable>());
279        exts.register(PotExtension::new({
280            let client = Arc::clone(&self.client);
281            let pot_verifier = self.pot_verifier.clone();
282
283            Box::new(
284                move |parent_hash, slot, proof_of_time, quick_verification| {
285                    let parent_hash = {
286                        let mut converted_parent_hash = Block::Hash::default();
287                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
288                        converted_parent_hash
289                    };
290
291                    let parent_header = match client.header(parent_hash) {
292                        Ok(Some(parent_header)) => parent_header,
293                        Ok(None) => {
294                            if quick_verification {
295                                error!(
296                                    %parent_hash,
297                                    "Header not found during proof of time verification"
298                                );
299
300                                return false;
301                            } else {
302                                debug!(
303                                    %parent_hash,
304                                    "Header not found during proof of time verification"
305                                );
306
307                                // This can only happen during special sync modes there are no other
308                                // cases where parent header may not be available, hence allow it
309                                return true;
310                            }
311                        }
312                        Err(error) => {
313                            error!(
314                                %error,
315                                %parent_hash,
316                                "Failed to retrieve header during proof of time verification"
317                            );
318
319                            return false;
320                        }
321                    };
322
323                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
324                        Ok(parent_pre_digest) => parent_pre_digest,
325                        Err(error) => {
326                            error!(
327                                %error,
328                                %parent_hash,
329                                parent_number = %parent_header.number(),
330                                "Failed to extract pre-digest from parent header during proof of \
331                                time verification, this must never happen"
332                            );
333
334                            return false;
335                        }
336                    };
337
338                    let parent_slot = parent_pre_digest.slot();
339                    if slot <= *parent_slot {
340                        return false;
341                    }
342
343                    let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
344                        Ok(pot_parameters) => pot_parameters,
345                        Err(error) => {
346                            debug!(
347                                %error,
348                                %parent_hash,
349                                parent_number = %parent_header.number(),
350                                "Failed to retrieve proof of time parameters during proof of time \
351                                verification"
352                            );
353
354                            return false;
355                        }
356                    };
357
358                    let pot_input = if parent_header.number().is_zero() {
359                        PotNextSlotInput {
360                            slot: parent_slot + Slot::from(1),
361                            slot_iterations: pot_parameters.slot_iterations(),
362                            seed: pot_verifier.genesis_seed(),
363                        }
364                    } else {
365                        let pot_info = parent_pre_digest.pot_info();
366
367                        PotNextSlotInput::derive(
368                            pot_parameters.slot_iterations(),
369                            parent_slot,
370                            pot_info.proof_of_time(),
371                            &pot_parameters.next_parameters_change(),
372                        )
373                    };
374
375                    // Ensure proof of time and future proof of time included in upcoming block are
376                    // valid
377
378                    if quick_verification {
379                        pot_verifier.try_is_output_valid(
380                            pot_input,
381                            Slot::from(slot) - parent_slot,
382                            proof_of_time,
383                            pot_parameters.next_parameters_change(),
384                        )
385                    } else {
386                        pot_verifier.is_output_valid(
387                            pot_input,
388                            Slot::from(slot) - parent_slot,
389                            proof_of_time,
390                            pot_parameters.next_parameters_change(),
391                        )
392                    }
393                },
394            )
395        }));
396
397        exts.register(FraudProofExtension::new(Arc::new(
398            FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
399                self.client.clone(),
400                self.domains_executor.clone(),
401                move |client, executor| {
402                    let extension_factory =
403                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
404                            client,
405                            executor,
406                            confirmation_depth_k,
407                        );
408                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
409                },
410            ),
411        )));
412
413        exts.register(SubspaceMmrExtension::new(Arc::new(
414            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
415                self.client.clone(),
416                confirmation_depth_k,
417            ),
418        )));
419
420        exts.register(MessengerExtension::new(Arc::new(
421            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
422                self.client.clone(),
423                self.domains_executor.clone(),
424            ),
425        )));
426
427        // if the offchain storage is available, then add offchain extension
428        // to generate and verify MMR proofs
429        if let Some(offchain_storage) = self.backend.offchain_storage() {
430            let offchain_db = OffchainDb::new(offchain_storage);
431            exts.register(OffchainDbExt::new(offchain_db));
432        }
433
434        exts
435    }
436}
437
438/// Other partial components returned by [`new_partial()`]
439pub struct OtherPartialComponents<RuntimeApi>
440where
441    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
442{
443    /// Subspace block import
444    pub block_import: BoxBlockImport<Block>,
445    /// Subspace link
446    pub subspace_link: SubspaceLink<Block>,
447    /// Segment headers store
448    pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
449    /// Proof of time verifier
450    pub pot_verifier: PotVerifier,
451    /// Approximate target block number for syncing purposes
452    pub sync_target_block_number: Arc<AtomicU32>,
453    /// Telemetry
454    pub telemetry: Option<Telemetry>,
455}
456
457type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
458    FullClient<RuntimeApi>,
459    FullBackend,
460    FullSelectChain,
461    DefaultImportQueue<Block>,
462    TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
463    OtherPartialComponents<RuntimeApi>,
464>;
465
466/// Creates `PartialComponents` for Subspace client.
467#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
468pub fn new_partial<PosTable, RuntimeApi>(
469    // TODO: Stop using `Configuration` once
470    //  https://github.com/paritytech/polkadot-sdk/pull/5364 is in our fork
471    config: &Configuration,
472    // TODO: Replace with check for `ChainSyncMode` once we get rid of ^ `Configuration`
473    snap_sync: bool,
474    pot_external_entropy: &[u8],
475) -> Result<PartialComponents<RuntimeApi>, ServiceError>
476where
477    PosTable: Table,
478    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
479    RuntimeApi::RuntimeApi: ApiExt<Block>
480        + Metadata<Block>
481        + BlockBuilder<Block>
482        + OffchainWorkerApi<Block>
483        + SessionKeys<Block>
484        + TaggedTransactionQueue<Block>
485        + SubspaceApi<Block, PublicKey>
486        + DomainsApi<Block, DomainHeader>
487        + FraudProofApi<Block, DomainHeader>
488        + BundleProducerElectionApi<Block, Balance>
489        + ObjectsApi<Block>
490        + MmrApi<Block, H256, NumberFor<Block>>
491        + MessengerApi<Block, NumberFor<Block>, <Block as BlockT>::Hash>,
492{
493    let telemetry = config
494        .telemetry_endpoints
495        .clone()
496        .filter(|x| !x.is_empty())
497        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
498            let worker = TelemetryWorker::new(16)?;
499            let telemetry = worker.handle().new_telemetry(endpoints);
500            Ok((worker, telemetry))
501        })
502        .transpose()?;
503
504    let executor = sc_service::new_wasm_executor(&config.executor);
505    let domains_executor = sc_service::new_wasm_executor(&config.executor);
506
507    let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
508        ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
509    )?;
510
511    let backend = Arc::new(sc_client_db::Backend::new(
512        config.db_config(),
513        confirmation_depth_k.into(),
514    )?);
515
516    let genesis_block_builder = GenesisBlockBuilder::new(
517        config.chain_spec.as_storage_builder(),
518        !snap_sync,
519        backend.clone(),
520        executor.clone(),
521    )?;
522
523    let (client, backend, keystore_container, task_manager) =
524        sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
525            config,
526            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
527            executor.clone(),
528            backend,
529            genesis_block_builder,
530            false,
531        )?;
532
533    // TODO: Make these explicit arguments we no longer use Substate's `Configuration`
534    let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
535        rayon::join(Kzg::new, || {
536            ErasureCoding::new(
537                NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
538                    .expect("Not zero; qed"),
539            )
540            .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
541        })
542    });
543    let erasure_coding = maybe_erasure_coding?;
544
545    let client = Arc::new(client);
546    let client_info = client.info();
547    let chain_constants = client
548        .runtime_api()
549        .chain_constants(client_info.best_hash)
550        .map_err(|error| ServiceError::Application(error.into()))?;
551
552    let pot_verifier = PotVerifier::new(
553        PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
554        POT_VERIFIER_CACHE_SIZE,
555    );
556
557    // ensure the extracted confirmation_depth matches the one from runtime
558    assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
559
560    client
561        .execution_extensions()
562        .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
563            kzg: kzg.clone(),
564            client: Arc::clone(&client),
565            pot_verifier: pot_verifier.clone(),
566            domains_executor: Arc::new(domains_executor),
567            backend: backend.clone(),
568            confirmation_depth_k: chain_constants.confirmation_depth_k(),
569            _pos_table: PhantomData,
570        });
571
572    let telemetry = telemetry.map(|(worker, telemetry)| {
573        task_manager
574            .spawn_handle()
575            .spawn("telemetry", None, worker.run());
576        telemetry
577    });
578
579    let select_chain = sc_consensus::LongestChain::new(backend.clone());
580
581    let segment_headers_store = tokio::task::block_in_place(|| {
582        SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
583    })
584    .map_err(|error| ServiceError::Application(error.into()))?;
585
586    let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
587    let segment_headers_store = segment_headers_store.clone();
588
589    let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
590        client.clone(),
591        client.clone(),
592        subspace_link.clone(),
593        {
594            let client = client.clone();
595            let segment_headers_store = segment_headers_store.clone();
596
597            move |parent_hash, ()| {
598                let client = client.clone();
599                let segment_headers_store = segment_headers_store.clone();
600
601                async move {
602                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
603
604                    let parent_header = client
605                        .header(parent_hash)?
606                        .expect("Parent header must always exist when block is created; qed");
607
608                    let parent_block_number = parent_header.number;
609
610                    let subspace_inherents =
611                        sp_consensus_subspace::inherents::InherentDataProvider::new(
612                            segment_headers_store
613                                .segment_headers_for_block(parent_block_number + 1),
614                        );
615
616                    Ok((timestamp, subspace_inherents))
617                }
618            }
619        },
620        segment_headers_store.clone(),
621        pot_verifier.clone(),
622    );
623
624    let sync_target_block_number = Arc::new(AtomicU32::new(0));
625    let transaction_pool = Arc::from(
626        sc_transaction_pool::Builder::new(
627            task_manager.spawn_essential_handle(),
628            client.clone(),
629            config.role.is_authority().into(),
630        )
631        .with_options(config.transaction_pool.clone())
632        .with_prometheus(config.prometheus_registry())
633        .build(),
634    );
635
636    let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
637        client: client.clone(),
638        chain_constants,
639        kzg,
640        telemetry: telemetry.as_ref().map(|x| x.handle()),
641        reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
642        sync_target_block_number: Arc::clone(&sync_target_block_number),
643        is_authoring_blocks: config.role.is_authority(),
644        pot_verifier: pot_verifier.clone(),
645    });
646
647    let import_queue = BasicQueue::new(
648        verifier,
649        Box::new(BlockImportWrapper(block_import.clone())),
650        None,
651        &task_manager.spawn_essential_handle(),
652        config.prometheus_registry(),
653    );
654
655    let other = OtherPartialComponents {
656        block_import: Box::new(BlockImportWrapper(block_import.clone())),
657        subspace_link,
658        segment_headers_store,
659        pot_verifier,
660        sync_target_block_number,
661        telemetry,
662    };
663
664    Ok(PartialComponents {
665        client,
666        backend,
667        task_manager,
668        import_queue,
669        keystore_container,
670        select_chain,
671        transaction_pool,
672        other,
673    })
674}
675
676/// Full node along with some other components.
677pub struct NewFull<Client>
678where
679    Client: ProvideRuntimeApi<Block>
680        + AuxStore
681        + BlockBackend<Block>
682        + BlockIdTo<Block>
683        + HeaderBackend<Block>
684        + HeaderMetadata<Block, Error = sp_blockchain::Error>
685        + 'static,
686    Client::Api: TaggedTransactionQueue<Block>
687        + DomainsApi<Block, DomainHeader>
688        + FraudProofApi<Block, DomainHeader>
689        + SubspaceApi<Block, PublicKey>
690        + MmrApi<Block, H256, NumberFor<Block>>
691        + MessengerApi<Block, NumberFor<Block>, <Block as BlockT>::Hash>,
692{
693    /// Task manager.
694    pub task_manager: TaskManager,
695    /// Full client.
696    pub client: Arc<Client>,
697    /// Chain selection rule.
698    pub select_chain: FullSelectChain,
699    /// Network service.
700    pub network_service: Arc<dyn NetworkService + Send + Sync>,
701    /// Cross-domain gossip notification service.
702    pub xdm_gossip_notification_service: Box<dyn NotificationService>,
703    /// Sync service.
704    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
705    /// RPC handlers.
706    pub rpc_handlers: sc_service::RpcHandlers,
707    /// Full client backend.
708    pub backend: Arc<FullBackend>,
709    /// Pot slot info stream.
710    pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
711    /// New slot stream.
712    /// Note: this is currently used to send solutions from the farmer during tests.
713    pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
714    /// Block signing stream.
715    pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
716    /// Stream of notifications about blocks about to be imported.
717    pub block_importing_notification_stream:
718        SubspaceNotificationStream<BlockImportingNotification<Block>>,
719    /// Archived object mapping stream.
720    pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
721    /// Archived segment stream.
722    pub archived_segment_notification_stream:
723        SubspaceNotificationStream<ArchivedSegmentNotification>,
724    /// Network starter.
725    pub network_starter: NetworkStarter,
726    /// Transaction pool.
727    pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
728}
729
730type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
731
732/// Builds a new service for a full client.
733pub async fn new_full<PosTable, RuntimeApi>(
734    mut config: SubspaceConfiguration,
735    partial_components: PartialComponents<RuntimeApi>,
736    prometheus_registry: Option<&mut Registry>,
737    enable_rpc_extensions: bool,
738    block_proposal_slot_portion: SlotProportion,
739    consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
740) -> Result<FullNode<RuntimeApi>, Error>
741where
742    PosTable: Table,
743    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
744    RuntimeApi::RuntimeApi: ApiExt<Block>
745        + Metadata<Block>
746        + AccountNonceApi<Block, AccountId, Nonce>
747        + BlockBuilder<Block>
748        + OffchainWorkerApi<Block>
749        + SessionKeys<Block>
750        + TaggedTransactionQueue<Block>
751        + TransactionPaymentApi<Block, Balance>
752        + SubspaceApi<Block, PublicKey>
753        + DomainsApi<Block, DomainHeader>
754        + FraudProofApi<Block, DomainHeader>
755        + ObjectsApi<Block>
756        + MmrApi<Block, Hash, BlockNumber>
757        + MessengerApi<Block, NumberFor<Block>, <Block as BlockT>::Hash>,
758{
759    let PartialComponents {
760        client,
761        backend,
762        mut task_manager,
763        import_queue,
764        keystore_container,
765        select_chain,
766        transaction_pool,
767        other,
768    } = partial_components;
769    let OtherPartialComponents {
770        block_import,
771        subspace_link,
772        segment_headers_store,
773        pot_verifier,
774        sync_target_block_number,
775        mut telemetry,
776    } = other;
777
778    let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
779    let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
780        SubspaceNetworking::Reuse {
781            node,
782            bootstrap_nodes,
783            piece_getter,
784        } => (node, bootstrap_nodes, piece_getter),
785        SubspaceNetworking::Create { config: dsn_config } => {
786            let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
787
788            debug!(
789                chain_type=?config.base.chain_spec.chain_type(),
790                genesis_hash=%hex::encode(client.chain_info().genesis_hash),
791                "Setting DSN protocol version..."
792            );
793
794            let out_connections = dsn_config.max_out_connections;
795            let (node, mut node_runner) = create_dsn_instance(
796                dsn_protocol_version,
797                dsn_config.clone(),
798                prometheus_registry,
799            )?;
800
801            info!("Subspace networking initialized: Node ID is {}", node.id());
802
803            node.on_new_listener(Arc::new({
804                let node = node.clone();
805
806                move |address| {
807                    info!(
808                        "DSN listening on {}",
809                        address.clone().with(Protocol::P2p(node.id()))
810                    );
811                }
812            }))
813            .detach();
814
815            task_manager
816                .spawn_essential_handle()
817                .spawn_essential_blocking(
818                    "node-runner",
819                    Some("subspace-networking"),
820                    Box::pin(
821                        async move {
822                            node_runner.run().await;
823                        }
824                        .in_current_span(),
825                    ),
826                );
827
828            let piece_provider = PieceProvider::new(
829                node.clone(),
830                SegmentCommitmentPieceValidator::new(
831                    node.clone(),
832                    subspace_link.kzg().clone(),
833                    segment_headers_store.clone(),
834                ),
835                Arc::new(Semaphore::new(
836                    out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
837                )),
838            );
839
840            (
841                node,
842                dsn_config.bootstrap_nodes,
843                Arc::new(DsnPieceGetter::new(piece_provider)) as _,
844            )
845        }
846    };
847
848    let dsn_bootstrap_nodes = {
849        // Fall back to node itself as bootstrap node for DSN so farmer always has someone to
850        // connect to
851        if bootstrap_nodes.is_empty() {
852            let (node_address_sender, node_address_receiver) = oneshot::channel();
853            let _handler = node.on_new_listener(Arc::new({
854                let node_address_sender = Mutex::new(Some(node_address_sender));
855
856                move |address| {
857                    if matches!(address.iter().next(), Some(Protocol::Ip4(_))) {
858                        if let Some(node_address_sender) = node_address_sender.lock().take() {
859                            if let Err(err) = node_address_sender.send(address.clone()) {
860                                debug!(?err, "Couldn't send a node address to the channel.");
861                            }
862                        }
863                    }
864                }
865            }));
866
867            let mut node_listeners = node.listeners();
868
869            if node_listeners.is_empty() {
870                let Ok(listener) = node_address_receiver.await else {
871                    return Err(Error::Other(
872                        "Oneshot receiver dropped before DSN node listener was ready"
873                            .to_string()
874                            .into(),
875                    ));
876                };
877
878                node_listeners = vec![listener];
879            }
880
881            node_listeners.iter_mut().for_each(|multiaddr| {
882                multiaddr.push(Protocol::P2p(node.id()));
883            });
884
885            node_listeners
886        } else {
887            bootstrap_nodes
888        }
889    };
890
891    let substrate_prometheus_registry = config
892        .base
893        .prometheus_config
894        .as_ref()
895        .map(|prometheus_config| prometheus_config.registry.clone());
896    let import_queue_service1 = import_queue.service();
897    let import_queue_service2 = import_queue.service();
898    let network_wrapper = Arc::new(NetworkWrapper::default());
899    let block_relay = build_consensus_relay(
900        network_wrapper.clone(),
901        client.clone(),
902        transaction_pool.clone(),
903        substrate_prometheus_registry.as_ref(),
904    )
905    .map_err(Error::BlockRelay)?;
906    let mut net_config = sc_network::config::FullNetworkConfiguration::new(
907        &config.base.network,
908        substrate_prometheus_registry.clone(),
909    );
910    let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
911        xdm_gossip_peers_set_config();
912    net_config.add_notification_protocol(xdm_gossip_notification_config);
913    let (pot_gossip_notification_config, pot_gossip_notification_service) =
914        pot_gossip_peers_set_config();
915    net_config.add_notification_protocol(pot_gossip_notification_config);
916    let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
917
918    let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
919        + net_config
920            .network_config
921            .default_peers_set
922            .reserved_nodes
923            .len();
924
925    let protocol_id = config.base.protocol_id();
926    let fork_id = config.base.chain_spec.fork_id();
927
928    // enable domain block ER request handler
929    let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
930        NetworkWorker<Block, <Block as BlockT>::Hash>,
931    >(fork_id, client.clone(), num_peer_hint);
932    task_manager.spawn_handle().spawn(
933        "domain-block-er-request-handler",
934        Some("networking"),
935        handler.run(),
936    );
937    net_config.add_request_response_protocol(protocol_config);
938
939    if let Some(offchain_storage) = backend.offchain_storage() {
940        // Allow both outgoing and incoming requests.
941        let (handler, protocol_config) =
942            MmrRequestHandler::new::<NetworkWorker<Block, <Block as BlockT>::Hash>>(
943                &config.base.protocol_id(),
944                fork_id,
945                client.clone(),
946                num_peer_hint,
947                offchain_storage,
948            );
949        task_manager
950            .spawn_handle()
951            .spawn("mmr-request-handler", Some("networking"), handler.run());
952
953        net_config.add_request_response_protocol(protocol_config);
954    }
955
956    let network_service_provider = NetworkServiceProvider::new();
957    let network_service_handle = network_service_provider.handle();
958    let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = {
959        let spawn_handle = task_manager.spawn_handle();
960        let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
961
962        // TODO: Remove BlockRelayParams here and simplify relay initialization
963        let block_downloader = {
964            let BlockRelayParams {
965                mut server,
966                downloader,
967                request_response_config,
968            } = block_relay;
969            net_config.add_request_response_protocol(request_response_config);
970
971            spawn_handle.spawn("block-request-handler", Some("networking"), async move {
972                server.run().await;
973            });
974
975            downloader
976        };
977
978        let syncing_strategy = build_polkadot_syncing_strategy(
979            protocol_id.clone(),
980            fork_id,
981            &mut net_config,
982            None,
983            block_downloader,
984            client.clone(),
985            &spawn_handle,
986            substrate_prometheus_registry.as_ref(),
987        )?;
988
989        let (syncing_engine, sync_service, block_announce_config) =
990            SyncingEngine::new::<NetworkWorker<_, _>>(
991                Roles::from(&config.base.role),
992                Arc::clone(&client),
993                substrate_prometheus_registry.as_ref(),
994                metrics.clone(),
995                &net_config,
996                protocol_id.clone(),
997                fork_id,
998                Box::new(DefaultBlockAnnounceValidator),
999                syncing_strategy,
1000                network_service_provider.handle(),
1001                import_queue.service(),
1002                net_config.peer_store_handle(),
1003                config.base.network.force_synced,
1004            )
1005            .map_err(sc_service::Error::from)?;
1006
1007        spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1008
1009        build_network_advanced(BuildNetworkAdvancedParams {
1010            role: config.base.role,
1011            protocol_id,
1012            fork_id,
1013            ipfs_server: config.base.network.ipfs_server,
1014            announce_block: config.base.announce_block,
1015            net_config,
1016            client: Arc::clone(&client),
1017            transaction_pool: Arc::clone(&transaction_pool),
1018            spawn_handle,
1019            import_queue,
1020            sync_service,
1021            block_announce_config,
1022            network_service_provider,
1023            metrics_registry: substrate_prometheus_registry.as_ref(),
1024            metrics,
1025        })?
1026    };
1027
1028    task_manager.spawn_handle().spawn(
1029        "sync-target-follower",
1030        None,
1031        Box::pin({
1032            let sync_service = sync_service.clone();
1033            let sync_target_block_number = Arc::clone(&sync_target_block_number);
1034
1035            async move {
1036                loop {
1037                    let best_seen_block = sync_service
1038                        .status()
1039                        .await
1040                        .map(|status| status.best_seen_block.unwrap_or_default())
1041                        .unwrap_or_default();
1042                    sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1043
1044                    tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1045                }
1046            }
1047        }),
1048    );
1049
1050    let sync_oracle = SubspaceSyncOracle::new(
1051        config.base.force_authoring,
1052        Arc::clone(&pause_sync),
1053        sync_service.clone(),
1054    );
1055
1056    let subspace_archiver = tokio::task::block_in_place(|| {
1057        create_subspace_archiver(
1058            segment_headers_store.clone(),
1059            subspace_link.clone(),
1060            client.clone(),
1061            sync_oracle.clone(),
1062            telemetry.as_ref().map(|telemetry| telemetry.handle()),
1063            config.create_object_mappings,
1064        )
1065    })
1066    .map_err(ServiceError::Client)?;
1067
1068    task_manager
1069        .spawn_essential_handle()
1070        .spawn_essential_blocking(
1071            "subspace-archiver",
1072            None,
1073            Box::pin(async move {
1074                if let Err(error) = subspace_archiver.await {
1075                    error!(%error, "Archiver exited with error");
1076                }
1077            }),
1078        );
1079
1080    network_wrapper.set(network_service.clone());
1081
1082    if !config.base.network.force_synced {
1083        // Start with DSN sync in this case
1084        pause_sync.store(true, Ordering::Release);
1085    }
1086
1087    let snap_sync_task = snap_sync(
1088        segment_headers_store.clone(),
1089        node.clone(),
1090        fork_id.map(|fork_id| fork_id.to_string()),
1091        Arc::clone(&client),
1092        import_queue_service1,
1093        pause_sync.clone(),
1094        piece_getter.clone(),
1095        sync_service.clone(),
1096        network_service_handle,
1097        subspace_link.erasure_coding().clone(),
1098        consensus_snap_sync_target_block_receiver,
1099        backend.offchain_storage(),
1100        network_service.clone(),
1101    );
1102
1103    let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1104        segment_headers_store.clone(),
1105        Arc::clone(&network_service),
1106        node.clone(),
1107        Arc::clone(&client),
1108        import_queue_service2,
1109        sync_service.clone(),
1110        sync_target_block_number,
1111        pause_sync,
1112        piece_getter,
1113        subspace_link.erasure_coding().clone(),
1114    );
1115    task_manager
1116        .spawn_handle()
1117        .spawn("observer", Some("sync-from-dsn"), observer);
1118    task_manager
1119        .spawn_essential_handle()
1120        .spawn_essential_blocking(
1121            "worker",
1122            Some("sync-from-dsn"),
1123            Box::pin(async move {
1124                // Run snap-sync before DSN-sync.
1125                if config.sync == ChainSyncMode::Snap {
1126                    if let Err(error) = snap_sync_task.in_current_span().await {
1127                        error!(%error, "Snap sync exited with a fatal error");
1128                        return;
1129                    }
1130                }
1131
1132                if let Err(error) = worker.await {
1133                    error!(%error, "Sync from DSN exited with an error");
1134                }
1135            }),
1136        );
1137
1138    if let Some(registry) = substrate_prometheus_registry.as_ref() {
1139        match NodeMetrics::new(
1140            client.clone(),
1141            client.every_import_notification_stream(),
1142            registry,
1143        ) {
1144            Ok(node_metrics) => {
1145                task_manager.spawn_handle().spawn(
1146                    "node_metrics",
1147                    None,
1148                    Box::pin(async move {
1149                        node_metrics.run().await;
1150                    }),
1151                );
1152            }
1153            Err(err) => {
1154                error!("Failed to initialize node metrics: {err:?}");
1155            }
1156        }
1157    }
1158
1159    let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1160
1161    if config.base.offchain_worker.enabled {
1162        let offchain_workers =
1163            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1164                runtime_api_provider: client.clone(),
1165                is_validator: config.base.role.is_authority(),
1166                keystore: Some(keystore_container.keystore()),
1167                offchain_db: backend.offchain_storage(),
1168                transaction_pool: Some(offchain_tx_pool_factory.clone()),
1169                network_provider: Arc::new(network_service.clone()),
1170                enable_http_requests: true,
1171                custom_extensions: |_| vec![],
1172            })?;
1173        task_manager.spawn_handle().spawn(
1174            "offchain-workers-runner",
1175            "offchain-worker",
1176            offchain_workers
1177                .run(client.clone(), task_manager.spawn_handle())
1178                .boxed(),
1179        );
1180    }
1181
1182    // mmr offchain indexer
1183    if offchain_indexing_enabled {
1184        task_manager.spawn_essential_handle().spawn_blocking(
1185            "mmr-gadget",
1186            None,
1187            mmr_gadget::MmrGadget::start(
1188                client.clone(),
1189                backend.clone(),
1190                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1191            ),
1192        );
1193    }
1194
1195    let backoff_authoring_blocks: Option<()> = None;
1196
1197    let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1198    let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1199    let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1200    let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1201    let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1202
1203    let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1204        config.is_timekeeper,
1205        config.timekeeper_cpu_cores,
1206        client.clone(),
1207        pot_verifier.clone(),
1208        Arc::clone(&network_service),
1209        pot_gossip_notification_service,
1210        sync_service.clone(),
1211        sync_oracle.clone(),
1212    )
1213    .map_err(|error| Error::Other(error.into()))?;
1214
1215    let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1216
1217    task_manager
1218        .spawn_essential_handle()
1219        .spawn("pot-source", Some("pot"), pot_source_worker.run());
1220    task_manager
1221        .spawn_essential_handle()
1222        .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1223
1224    if config.base.role.is_authority() || config.force_new_slot_notifications {
1225        let proposer_factory = ProposerFactory::new(
1226            task_manager.spawn_handle(),
1227            client.clone(),
1228            transaction_pool.clone(),
1229            substrate_prometheus_registry.as_ref(),
1230            telemetry.as_ref().map(|x| x.handle()),
1231        );
1232
1233        let subspace_slot_worker =
1234            SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1235                client: client.clone(),
1236                env: proposer_factory,
1237                block_import,
1238                sync_oracle: sync_oracle.clone(),
1239                justification_sync_link: sync_service.clone(),
1240                force_authoring: config.base.force_authoring,
1241                backoff_authoring_blocks,
1242                subspace_link: subspace_link.clone(),
1243                segment_headers_store: segment_headers_store.clone(),
1244                block_proposal_slot_portion,
1245                max_block_proposal_slot_portion: None,
1246                telemetry: telemetry.as_ref().map(|x| x.handle()),
1247                offchain_tx_pool_factory,
1248                pot_verifier,
1249            });
1250
1251        let create_inherent_data_providers = {
1252            let client = client.clone();
1253            let segment_headers_store = segment_headers_store.clone();
1254
1255            move |parent_hash, ()| {
1256                let client = client.clone();
1257                let segment_headers_store = segment_headers_store.clone();
1258
1259                async move {
1260                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1261
1262                    let parent_header = client
1263                        .header(parent_hash)?
1264                        .expect("Parent header must always exist when block is created; qed");
1265
1266                    let parent_block_number = parent_header.number;
1267
1268                    let subspace_inherents =
1269                        sp_consensus_subspace::inherents::InherentDataProvider::new(
1270                            segment_headers_store
1271                                .segment_headers_for_block(parent_block_number + 1),
1272                        );
1273
1274                    Ok((timestamp, subspace_inherents))
1275                }
1276            }
1277        };
1278
1279        info!(target: "subspace", "🧑‍🌾 Starting Subspace Authorship worker");
1280        let slot_worker_task = sc_proof_of_time::start_slot_worker(
1281            subspace_link.chain_constants().slot_duration(),
1282            client.clone(),
1283            select_chain.clone(),
1284            subspace_slot_worker,
1285            sync_oracle.clone(),
1286            create_inherent_data_providers,
1287            pot_slot_info_stream,
1288        );
1289
1290        // Subspace authoring task is considered essential, i.e. if it fails we take down the
1291        // service with it.
1292        task_manager.spawn_essential_handle().spawn_blocking(
1293            "subspace-proposer",
1294            Some("block-authoring"),
1295            slot_worker_task,
1296        );
1297    }
1298
1299    // We replace the Substrate implementation of metrics server with our own.
1300    config.base.prometheus_config.take();
1301
1302    let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1303        network: network_service.clone(),
1304        client: client.clone(),
1305        keystore: keystore_container.keystore(),
1306        task_manager: &mut task_manager,
1307        transaction_pool: transaction_pool.clone(),
1308        rpc_builder: if enable_rpc_extensions {
1309            let client = client.clone();
1310            let new_slot_notification_stream = new_slot_notification_stream.clone();
1311            let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1312            let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1313            let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1314            let transaction_pool = transaction_pool.clone();
1315            let backend = backend.clone();
1316
1317            Box::new(move |subscription_executor| {
1318                let deps = rpc::FullDeps {
1319                    client: client.clone(),
1320                    pool: transaction_pool.clone(),
1321                    subscription_executor,
1322                    new_slot_notification_stream: new_slot_notification_stream.clone(),
1323                    reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1324                    object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1325                    archived_segment_notification_stream: archived_segment_notification_stream
1326                        .clone(),
1327                    dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1328                    segment_headers_store: segment_headers_store.clone(),
1329                    sync_oracle: sync_oracle.clone(),
1330                    kzg: subspace_link.kzg().clone(),
1331                    erasure_coding: subspace_link.erasure_coding().clone(),
1332                    backend: backend.clone(),
1333                };
1334
1335                rpc::create_full(deps).map_err(Into::into)
1336            })
1337        } else {
1338            Box::new(|_| Ok(RpcModule::new(())))
1339        },
1340        backend: backend.clone(),
1341        system_rpc_tx,
1342        config: config.base,
1343        telemetry: telemetry.as_mut(),
1344        tx_handler_controller,
1345        sync_service: sync_service.clone(),
1346    })?;
1347
1348    Ok(NewFull {
1349        task_manager,
1350        client,
1351        select_chain,
1352        network_service,
1353        xdm_gossip_notification_service,
1354        sync_service,
1355        rpc_handlers,
1356        backend,
1357        pot_slot_info_stream: additional_pot_slot_info_stream,
1358        new_slot_notification_stream,
1359        reward_signing_notification_stream,
1360        block_importing_notification_stream,
1361        object_mapping_notification_stream,
1362        archived_segment_notification_stream,
1363        network_starter,
1364        transaction_pool,
1365    })
1366}
1367
1368/// The storage key of the `ConfirmationDepthK` storage item in `pallet-runtime-configs`
1369fn confirmation_depth_storage_key() -> StorageKey {
1370    StorageKey(
1371        frame_support::storage::storage_prefix(
1372            // This is the name used for `pallet-runtime-configs` in the `construct_runtime` macro
1373            // i.e. `RuntimeConfigs: pallet_runtime_configs = 14`
1374            "RuntimeConfigs".as_bytes(),
1375            // This is the storage item name used inside `pallet-runtime-configs`
1376            "ConfirmationDepthK".as_bytes(),
1377        )
1378        .to_vec(),
1379    )
1380}
1381
1382fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1383    let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1384    let spec: serde_json::Value =
1385        serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1386    let encoded_confirmation_depth = hex::decode(
1387        spec.pointer(format!("/genesis/raw/top/{}", storage_key).as_str())?
1388            .as_str()?
1389            .trim_start_matches("0x"),
1390    )
1391    .ok()?;
1392    u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1393}
1394
1395#[cfg(test)]
1396mod test {
1397    use static_assertions::const_assert_eq;
1398    use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1399    use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1400
1401    /// Runtime and archiver code must agree on the consensus block length.
1402    /// (This avoids importing all the runtime primitives code into the farmer and gateway.)
1403    #[test]
1404    fn max_block_length_consistent() {
1405        const_assert_eq!(
1406            CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1407            ARCHIVER_MAX_BLOCK_LENGTH,
1408        );
1409    }
1410}