Skip to main content

subspace_service/
lib.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2
3pub mod config;
4pub mod dsn;
5mod metrics;
6pub(crate) mod mmr;
7pub mod rpc;
8pub mod sync_from_dsn;
9mod task_spawner;
10mod utils;
11
12use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
13use crate::dsn::{DsnConfigurationError, create_dsn_instance};
14use crate::metrics::NodeMetrics;
15use crate::mmr::request_handler::MmrRequestHandler;
16use crate::sync_from_dsn::DsnPieceGetter;
17use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
18use crate::sync_from_dsn::snap_sync::snap_sync;
19use async_lock::Semaphore;
20use core::sync::atomic::{AtomicU32, Ordering};
21use cross_domain_message_gossip::xdm_gossip_peers_set_config;
22use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
23use frame_system_rpc_runtime_api::AccountNonceApi;
24use futures::FutureExt;
25use futures::channel::oneshot;
26use jsonrpsee::RpcModule;
27use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
28use parity_scale_codec::Decode;
29use parking_lot::Mutex;
30use prometheus_client::registry::Registry;
31use sc_basic_authorship::ProposerFactory;
32use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
33use sc_client_api::execution_extensions::ExtensionsFactory;
34use sc_client_api::{
35    AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
36};
37use sc_consensus::{
38    BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
39    DefaultImportQueue, ImportQueue, ImportResult,
40};
41use sc_consensus_slots::SlotProportion;
42use sc_consensus_subspace::SubspaceLink;
43use sc_consensus_subspace::archiver::{
44    ArchivedSegmentNotification, ObjectMappingNotification, SegmentHeadersStore,
45    create_subspace_archiver,
46};
47use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
48use sc_consensus_subspace::notification::SubspaceNotificationStream;
49use sc_consensus_subspace::slot_worker::{
50    NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
51    SubspaceSyncOracle,
52};
53use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
54use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
55use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
56use sc_network::service::traits::NetworkService;
57use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
58use sc_network_sync::block_relay_protocol::BlockRelayParams;
59use sc_network_sync::engine::SyncingEngine;
60use sc_network_sync::service::network::NetworkServiceProvider;
61use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
62use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
63use sc_proof_of_time::verifier::PotVerifier;
64use sc_service::error::Error as ServiceError;
65use sc_service::{
66    BuildNetworkAdvancedParams, Configuration, SpawnTasksParams, TaskManager,
67    build_network_advanced, build_polkadot_syncing_strategy,
68};
69use sc_subspace_block_relay::{
70    BlockRelayConfigurationError, NetworkWrapper, build_consensus_relay,
71};
72use sc_telemetry::{Telemetry, TelemetryWorker};
73use sc_transaction_pool::TransactionPoolHandle;
74use sc_transaction_pool_api::OffchainTransactionPoolFactory;
75use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
76use sp_block_builder::BlockBuilder;
77use sp_blockchain::HeaderMetadata;
78use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
79use sp_consensus_slots::Slot;
80use sp_consensus_subspace::digests::extract_pre_digest;
81use sp_consensus_subspace::{
82    KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
83};
84use sp_core::H256;
85use sp_core::offchain::OffchainDbExt;
86use sp_core::offchain::storage::OffchainDb;
87use sp_core::traits::SpawnEssentialNamed;
88use sp_domains::storage::StorageKey;
89use sp_domains::{BundleProducerElectionApi, DomainsApi};
90use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
91use sp_externalities::Extensions;
92use sp_messenger::MessengerApi;
93use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
94use sp_mmr_primitives::MmrApi;
95use sp_objects::ObjectsApi;
96use sp_offchain::OffchainWorkerApi;
97use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
98use sp_session::SessionKeys;
99use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
100use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
101use static_assertions::const_assert;
102use std::marker::PhantomData;
103use std::num::NonZeroUsize;
104use std::sync::Arc;
105use std::time::Duration;
106use subspace_core_primitives::pieces::Record;
107use subspace_core_primitives::pot::PotSeed;
108use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
109use subspace_erasure_coding::ErasureCoding;
110use subspace_kzg::Kzg;
111use subspace_networking::libp2p::multiaddr::Protocol;
112use subspace_networking::utils::piece_provider::PieceProvider;
113use subspace_proof_of_space::Table;
114use subspace_runtime_primitives::opaque::Block;
115use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, Hash, Nonce};
116use tokio::sync::broadcast;
117use tracing::{Instrument, debug, error, info};
118pub use utils::wait_for_block_import;
119
120// There are multiple places where it is assumed that node is running on 64-bit system, refuse to
121// compile otherwise
122const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
123
124/// This is over 15 minutes of slots assuming there are no forks, should be both sufficient and not
125/// too large to handle
126const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
127const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
128/// Multiplier on top of outgoing connections number for piece downloading purposes
129const PIECE_PROVIDER_MULTIPLIER: usize = 10;
130
131/// Error type for Subspace service.
132#[derive(thiserror::Error, Debug)]
133pub enum Error {
134    /// IO error.
135    #[error(transparent)]
136    Io(#[from] std::io::Error),
137
138    /// Address parsing error.
139    #[error(transparent)]
140    AddrFormatInvalid(#[from] std::net::AddrParseError),
141
142    /// Substrate service error.
143    #[error(transparent)]
144    Sub(#[from] sc_service::Error),
145
146    /// Substrate consensus error.
147    #[error(transparent)]
148    Consensus(#[from] sp_consensus::Error),
149
150    /// Telemetry error.
151    #[error(transparent)]
152    Telemetry(#[from] sc_telemetry::Error),
153
154    /// Subspace networking (DSN) error.
155    #[error(transparent)]
156    SubspaceDsn(#[from] DsnConfigurationError),
157
158    /// Failed to set up block relay.
159    #[error(transparent)]
160    BlockRelay(#[from] BlockRelayConfigurationError),
161
162    /// Other.
163    #[error(transparent)]
164    Other(Box<dyn std::error::Error + Send + Sync>),
165}
166
167// Simple wrapper whose ony purpose is to convert error type
168#[derive(Clone)]
169struct BlockImportWrapper<BI>(BI);
170
171#[async_trait::async_trait]
172impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
173where
174    Block: BlockT,
175    BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
176        + Send
177        + Sync,
178{
179    type Error = sp_consensus::Error;
180
181    async fn check_block(
182        &self,
183        block: BlockCheckParams<Block>,
184    ) -> Result<ImportResult, Self::Error> {
185        self.0
186            .check_block(block)
187            .await
188            .map_err(|error| sp_consensus::Error::Other(error.into()))
189    }
190
191    async fn import_block(
192        &self,
193        block: BlockImportParams<Block>,
194    ) -> Result<ImportResult, Self::Error> {
195        self.0
196            .import_block(block)
197            .await
198            .map_err(|error| sp_consensus::Error::Other(error.into()))
199    }
200}
201
202/// Host functions required for Subspace
203#[cfg(not(feature = "runtime-benchmarks"))]
204pub type HostFunctions = (
205    sp_io::SubstrateHostFunctions,
206    sp_consensus_subspace::consensus::HostFunctions,
207    sp_domains_fraud_proof::HostFunctions,
208    sp_subspace_mmr::HostFunctions,
209    sp_messenger_host_functions::HostFunctions,
210);
211
212/// Host functions required for Subspace
213#[cfg(feature = "runtime-benchmarks")]
214pub type HostFunctions = (
215    sp_io::SubstrateHostFunctions,
216    frame_benchmarking::benchmarking::HostFunctions,
217    sp_consensus_subspace::consensus::HostFunctions,
218    sp_domains_fraud_proof::HostFunctions,
219    sp_subspace_mmr::HostFunctions,
220    sp_messenger_host_functions::HostFunctions,
221);
222
223/// Runtime executor for Subspace
224pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
225
226/// Subspace-like full client.
227pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
228
229pub type FullBackend = sc_service::TFullBackend<Block>;
230pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
231
232struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
233    kzg: Kzg,
234    client: Arc<Client>,
235    backend: Arc<FullBackend>,
236    pot_verifier: PotVerifier,
237    domains_executor: Arc<sc_domains::RuntimeExecutor>,
238    confirmation_depth_k: BlockNumber,
239    _pos_table: PhantomData<(PosTable, DomainBlock)>,
240}
241
242impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
243    for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
244where
245    PosTable: Table,
246    Block: BlockT,
247    Block::Hash: From<H256> + Into<H256>,
248    DomainBlock: BlockT,
249    DomainBlock::Hash: Into<H256> + From<H256>,
250    Client: BlockBackend<Block>
251        + HeaderBackend<Block>
252        + ProvideRuntimeApi<Block>
253        + Send
254        + Sync
255        + 'static,
256    Client::Api: SubspaceApi<Block, PublicKey>
257        + DomainsApi<Block, DomainBlock::Header>
258        + BundleProducerElectionApi<Block, Balance>
259        + MmrApi<Block, H256, NumberFor<Block>>
260        + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
261{
262    fn extensions_for(
263        &self,
264        _block_hash: Block::Hash,
265        _block_number: NumberFor<Block>,
266    ) -> Extensions {
267        let confirmation_depth_k = self.confirmation_depth_k;
268        let mut exts = Extensions::new();
269        exts.register(KzgExtension::new(self.kzg.clone()));
270        exts.register(PosExtension::new::<PosTable>());
271        exts.register(PotExtension::new({
272            let client = Arc::clone(&self.client);
273            let pot_verifier = self.pot_verifier.clone();
274
275            Box::new(
276                move |parent_hash, slot, proof_of_time, quick_verification| {
277                    let parent_hash = {
278                        let mut converted_parent_hash = Block::Hash::default();
279                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
280                        converted_parent_hash
281                    };
282
283                    let parent_header = match client.header(parent_hash) {
284                        Ok(Some(parent_header)) => parent_header,
285                        Ok(None) => {
286                            if quick_verification {
287                                error!(
288                                    %parent_hash,
289                                    "Header not found during proof of time verification"
290                                );
291
292                                return false;
293                            } else {
294                                debug!(
295                                    %parent_hash,
296                                    "Header not found during proof of time verification"
297                                );
298
299                                // This can only happen during special sync modes there are no other
300                                // cases where parent header may not be available, hence allow it
301                                return true;
302                            }
303                        }
304                        Err(error) => {
305                            error!(
306                                %error,
307                                %parent_hash,
308                                "Failed to retrieve header during proof of time verification"
309                            );
310
311                            return false;
312                        }
313                    };
314
315                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
316                        Ok(parent_pre_digest) => parent_pre_digest,
317                        Err(error) => {
318                            error!(
319                                %error,
320                                %parent_hash,
321                                parent_number = %parent_header.number(),
322                                "Failed to extract pre-digest from parent header during proof of \
323                                time verification, this must never happen"
324                            );
325
326                            return false;
327                        }
328                    };
329
330                    let parent_slot = parent_pre_digest.slot();
331                    if slot <= *parent_slot {
332                        return false;
333                    }
334
335                    let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
336                        Ok(pot_parameters) => pot_parameters,
337                        Err(error) => {
338                            debug!(
339                                %error,
340                                %parent_hash,
341                                parent_number = %parent_header.number(),
342                                "Failed to retrieve proof of time parameters during proof of time \
343                                verification"
344                            );
345
346                            return false;
347                        }
348                    };
349
350                    let pot_input = if parent_header.number().is_zero() {
351                        PotNextSlotInput {
352                            slot: parent_slot + Slot::from(1),
353                            slot_iterations: pot_parameters.slot_iterations(),
354                            seed: pot_verifier.genesis_seed(),
355                        }
356                    } else {
357                        let pot_info = parent_pre_digest.pot_info();
358
359                        PotNextSlotInput::derive(
360                            pot_parameters.slot_iterations(),
361                            parent_slot,
362                            pot_info.proof_of_time(),
363                            &pot_parameters.next_parameters_change(),
364                        )
365                    };
366
367                    // Ensure proof of time and future proof of time included in upcoming block are
368                    // valid
369
370                    if quick_verification {
371                        pot_verifier.try_is_output_valid(
372                            pot_input,
373                            Slot::from(slot) - parent_slot,
374                            proof_of_time,
375                            pot_parameters.next_parameters_change(),
376                        )
377                    } else {
378                        pot_verifier.is_output_valid(
379                            pot_input,
380                            Slot::from(slot) - parent_slot,
381                            proof_of_time,
382                            pot_parameters.next_parameters_change(),
383                        )
384                    }
385                },
386            )
387        }));
388
389        exts.register(FraudProofExtension::new(Arc::new(
390            FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
391                self.client.clone(),
392                self.domains_executor.clone(),
393                move |client, executor| {
394                    let extension_factory =
395                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
396                            client,
397                            executor,
398                            confirmation_depth_k,
399                        );
400                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
401                },
402            ),
403        )));
404
405        exts.register(SubspaceMmrExtension::new(Arc::new(
406            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
407                self.client.clone(),
408                confirmation_depth_k,
409            ),
410        )));
411
412        exts.register(MessengerExtension::new(Arc::new(
413            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
414                self.client.clone(),
415                self.domains_executor.clone(),
416            ),
417        )));
418
419        // if the offchain storage is available, then add offchain extension
420        // to generate and verify MMR proofs
421        if let Some(offchain_storage) = self.backend.offchain_storage() {
422            let offchain_db = OffchainDb::new(offchain_storage);
423            exts.register(OffchainDbExt::new(offchain_db));
424        }
425
426        exts
427    }
428}
429
430/// Other partial components returned by [`new_partial()`]
431pub struct OtherPartialComponents<RuntimeApi>
432where
433    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
434{
435    /// Subspace block import
436    pub block_import: BoxBlockImport<Block>,
437    /// Subspace link
438    pub subspace_link: SubspaceLink<Block>,
439    /// Segment headers store
440    pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
441    /// Proof of time verifier
442    pub pot_verifier: PotVerifier,
443    /// Approximate target block number for syncing purposes
444    pub sync_target_block_number: Arc<AtomicU32>,
445    /// Telemetry
446    pub telemetry: Option<Telemetry>,
447}
448
449type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
450    FullClient<RuntimeApi>,
451    FullBackend,
452    FullSelectChain,
453    DefaultImportQueue<Block>,
454    TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
455    OtherPartialComponents<RuntimeApi>,
456>;
457
458/// Creates `PartialComponents` for Subspace client.
459#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
460pub fn new_partial<PosTable, RuntimeApi>(
461    // TODO: Stop using `Configuration` once
462    //  https://github.com/paritytech/polkadot-sdk/pull/5364 is in our fork
463    config: &Configuration,
464    // TODO: Replace with check for `ChainSyncMode` once we get rid of ^ `Configuration`
465    snap_sync: bool,
466    pot_external_entropy: &[u8],
467) -> Result<PartialComponents<RuntimeApi>, ServiceError>
468where
469    PosTable: Table,
470    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
471    RuntimeApi::RuntimeApi: ApiExt<Block>
472        + Metadata<Block>
473        + BlockBuilder<Block>
474        + OffchainWorkerApi<Block>
475        + SessionKeys<Block>
476        + TaggedTransactionQueue<Block>
477        + SubspaceApi<Block, PublicKey>
478        + DomainsApi<Block, DomainHeader>
479        + FraudProofApi<Block, DomainHeader>
480        + BundleProducerElectionApi<Block, Balance>
481        + ObjectsApi<Block>
482        + MmrApi<Block, H256, NumberFor<Block>>
483        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
484{
485    let telemetry = config
486        .telemetry_endpoints
487        .clone()
488        .filter(|x| !x.is_empty())
489        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
490            let worker = TelemetryWorker::new(16)?;
491            let telemetry = worker.handle().new_telemetry(endpoints);
492            Ok((worker, telemetry))
493        })
494        .transpose()?;
495
496    let executor = sc_service::new_wasm_executor(&config.executor);
497    let domains_executor = sc_service::new_wasm_executor(&config.executor);
498
499    let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
500        ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
501    )?;
502
503    let backend = Arc::new(sc_client_db::Backend::new(
504        config.db_config(),
505        confirmation_depth_k.into(),
506    )?);
507
508    let genesis_block_builder = GenesisBlockBuilder::new(
509        config.chain_spec.as_storage_builder(),
510        !snap_sync,
511        backend.clone(),
512        executor.clone(),
513    )?;
514
515    let (client, backend, keystore_container, task_manager) =
516        sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
517            config,
518            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
519            executor.clone(),
520            backend,
521            genesis_block_builder,
522            false,
523        )?;
524
525    // TODO: Make these explicit arguments we no longer use Substate's `Configuration`
526    let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
527        rayon::join(Kzg::new, || {
528            ErasureCoding::new(
529                NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
530                    .expect("Not zero; qed"),
531            )
532            .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
533        })
534    });
535    let erasure_coding = maybe_erasure_coding?;
536
537    let client = Arc::new(client);
538    let client_info = client.info();
539    let chain_constants = client
540        .runtime_api()
541        .chain_constants(client_info.best_hash)
542        .map_err(|error| ServiceError::Application(error.into()))?;
543
544    let pot_verifier = PotVerifier::new(
545        PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
546        POT_VERIFIER_CACHE_SIZE,
547    );
548
549    // ensure the extracted confirmation_depth matches the one from runtime
550    assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
551
552    client
553        .execution_extensions()
554        .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
555            kzg: kzg.clone(),
556            client: Arc::clone(&client),
557            pot_verifier: pot_verifier.clone(),
558            domains_executor: Arc::new(domains_executor),
559            backend: backend.clone(),
560            confirmation_depth_k: chain_constants.confirmation_depth_k(),
561            _pos_table: PhantomData,
562        });
563
564    let telemetry = telemetry.map(|(worker, telemetry)| {
565        task_manager
566            .spawn_handle()
567            .spawn("telemetry", None, worker.run());
568        telemetry
569    });
570
571    let select_chain = sc_consensus::LongestChain::new(backend.clone());
572
573    let segment_headers_store = tokio::task::block_in_place(|| {
574        SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
575    })
576    .map_err(|error| ServiceError::Application(error.into()))?;
577
578    let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
579    let segment_headers_store = segment_headers_store.clone();
580
581    let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
582        client.clone(),
583        client.clone(),
584        subspace_link.clone(),
585        {
586            let client = client.clone();
587            let segment_headers_store = segment_headers_store.clone();
588
589            move |parent_hash, ()| {
590                let client = client.clone();
591                let segment_headers_store = segment_headers_store.clone();
592
593                async move {
594                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
595
596                    let parent_header = client
597                        .header(parent_hash)?
598                        .expect("Parent header must always exist when block is created; qed");
599
600                    let parent_block_number = parent_header.number;
601
602                    let subspace_inherents =
603                        sp_consensus_subspace::inherents::InherentDataProvider::new(
604                            segment_headers_store
605                                .segment_headers_for_block(parent_block_number + 1),
606                        );
607
608                    Ok((timestamp, subspace_inherents))
609                }
610            }
611        },
612        segment_headers_store.clone(),
613        pot_verifier.clone(),
614    );
615
616    let sync_target_block_number = Arc::new(AtomicU32::new(0));
617    let transaction_pool = Arc::from(
618        sc_transaction_pool::Builder::new(
619            task_manager.spawn_essential_handle(),
620            client.clone(),
621            config.role.is_authority().into(),
622        )
623        .with_options(config.transaction_pool.clone())
624        .with_prometheus(config.prometheus_registry())
625        .build(),
626    );
627
628    let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
629        client: client.clone(),
630        chain_constants,
631        kzg,
632        telemetry: telemetry.as_ref().map(|x| x.handle()),
633        reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
634        sync_target_block_number: Arc::clone(&sync_target_block_number),
635        is_authoring_blocks: config.role.is_authority(),
636        pot_verifier: pot_verifier.clone(),
637    });
638
639    let import_queue = BasicQueue::new(
640        verifier,
641        Box::new(BlockImportWrapper(block_import.clone())),
642        None,
643        &task_manager.spawn_essential_handle(),
644        config.prometheus_registry(),
645    );
646
647    let other = OtherPartialComponents {
648        block_import: Box::new(BlockImportWrapper(block_import.clone())),
649        subspace_link,
650        segment_headers_store,
651        pot_verifier,
652        sync_target_block_number,
653        telemetry,
654    };
655
656    Ok(PartialComponents {
657        client,
658        backend,
659        task_manager,
660        import_queue,
661        keystore_container,
662        select_chain,
663        transaction_pool,
664        other,
665    })
666}
667
668/// Full node along with some other components.
669pub struct NewFull<Client>
670where
671    Client: ProvideRuntimeApi<Block>
672        + AuxStore
673        + BlockBackend<Block>
674        + BlockIdTo<Block>
675        + HeaderBackend<Block>
676        + HeaderMetadata<Block, Error = sp_blockchain::Error>
677        + 'static,
678    Client::Api: TaggedTransactionQueue<Block>
679        + DomainsApi<Block, DomainHeader>
680        + FraudProofApi<Block, DomainHeader>
681        + SubspaceApi<Block, PublicKey>
682        + MmrApi<Block, H256, NumberFor<Block>>
683        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
684{
685    /// Task manager.
686    pub task_manager: TaskManager,
687    /// Full client.
688    pub client: Arc<Client>,
689    /// Chain selection rule.
690    pub select_chain: FullSelectChain,
691    /// Network service.
692    pub network_service: Arc<dyn NetworkService + Send + Sync>,
693    /// Cross-domain gossip notification service.
694    pub xdm_gossip_notification_service: Box<dyn NotificationService>,
695    /// Sync service.
696    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
697    /// RPC handlers.
698    pub rpc_handlers: sc_service::RpcHandlers,
699    /// Full client backend.
700    pub backend: Arc<FullBackend>,
701    /// Pot slot info stream.
702    pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
703    /// New slot stream.
704    /// Note: this is currently used to send solutions from the farmer during tests.
705    pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
706    /// Block signing stream.
707    pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
708    /// Stream of notifications about blocks about to be imported.
709    pub block_importing_notification_stream:
710        SubspaceNotificationStream<BlockImportingNotification<Block>>,
711    /// Archived object mapping stream.
712    pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
713    /// Archived segment stream.
714    pub archived_segment_notification_stream:
715        SubspaceNotificationStream<ArchivedSegmentNotification>,
716    /// Transaction pool.
717    pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
718}
719
720type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
721
722/// Builds a new service for a full client.
723pub async fn new_full<PosTable, RuntimeApi>(
724    mut config: SubspaceConfiguration,
725    partial_components: PartialComponents<RuntimeApi>,
726    prometheus_registry: Option<&mut Registry>,
727    enable_rpc_extensions: bool,
728    block_proposal_slot_portion: SlotProportion,
729    consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
730) -> Result<FullNode<RuntimeApi>, Error>
731where
732    PosTable: Table,
733    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
734    RuntimeApi::RuntimeApi: ApiExt<Block>
735        + Metadata<Block>
736        + AccountNonceApi<Block, AccountId, Nonce>
737        + BlockBuilder<Block>
738        + OffchainWorkerApi<Block>
739        + SessionKeys<Block>
740        + TaggedTransactionQueue<Block>
741        + TransactionPaymentApi<Block, Balance>
742        + SubspaceApi<Block, PublicKey>
743        + DomainsApi<Block, DomainHeader>
744        + FraudProofApi<Block, DomainHeader>
745        + ObjectsApi<Block>
746        + MmrApi<Block, Hash, BlockNumber>
747        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
748{
749    let PartialComponents {
750        client,
751        backend,
752        mut task_manager,
753        import_queue,
754        keystore_container,
755        select_chain,
756        transaction_pool,
757        other,
758    } = partial_components;
759    let OtherPartialComponents {
760        block_import,
761        subspace_link,
762        segment_headers_store,
763        pot_verifier,
764        sync_target_block_number,
765        mut telemetry,
766    } = other;
767
768    let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
769    let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
770        SubspaceNetworking::Reuse {
771            node,
772            bootstrap_nodes,
773            piece_getter,
774        } => (node, bootstrap_nodes, piece_getter),
775        SubspaceNetworking::Create { config: dsn_config } => {
776            let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
777
778            debug!(
779                chain_type=?config.base.chain_spec.chain_type(),
780                genesis_hash=%hex::encode(client.chain_info().genesis_hash),
781                "Setting DSN protocol version..."
782            );
783
784            let out_connections = dsn_config.max_out_connections;
785            let (node, mut node_runner) = create_dsn_instance(
786                dsn_protocol_version,
787                dsn_config.clone(),
788                prometheus_registry,
789            )?;
790
791            info!("Subspace networking initialized: Node ID is {}", node.id());
792
793            node.on_new_listener(Arc::new({
794                let node = node.clone();
795
796                move |address| {
797                    info!(
798                        "DSN listening on {}",
799                        address.clone().with(Protocol::P2p(node.id()))
800                    );
801                }
802            }))
803            .detach();
804
805            task_manager
806                .spawn_essential_handle()
807                .spawn_essential_blocking(
808                    "node-runner",
809                    Some("subspace-networking"),
810                    Box::pin(
811                        async move {
812                            node_runner.run().await;
813                        }
814                        .in_current_span(),
815                    ),
816                );
817
818            let piece_provider = PieceProvider::new(
819                node.clone(),
820                SegmentCommitmentPieceValidator::new(
821                    node.clone(),
822                    subspace_link.kzg().clone(),
823                    segment_headers_store.clone(),
824                ),
825                Arc::new(Semaphore::new(
826                    out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
827                )),
828            );
829
830            (
831                node,
832                dsn_config.bootstrap_nodes,
833                Arc::new(DsnPieceGetter::new(piece_provider)) as _,
834            )
835        }
836    };
837
838    let dsn_bootstrap_nodes = {
839        // Fall back to node itself as bootstrap node for DSN so farmer always has someone to
840        // connect to
841        if bootstrap_nodes.is_empty() {
842            let (node_address_sender, node_address_receiver) = oneshot::channel();
843            let _handler = node.on_new_listener(Arc::new({
844                let node_address_sender = Mutex::new(Some(node_address_sender));
845
846                move |address| {
847                    if matches!(address.iter().next(), Some(Protocol::Ip4(_)))
848                        && let Some(node_address_sender) = node_address_sender.lock().take()
849                        && let Err(err) = node_address_sender.send(address.clone())
850                    {
851                        debug!(?err, "Couldn't send a node address to the channel.");
852                    }
853                }
854            }));
855
856            let mut node_listeners = node.listeners();
857
858            if node_listeners.is_empty() {
859                let Ok(listener) = node_address_receiver.await else {
860                    return Err(Error::Other(
861                        "Oneshot receiver dropped before DSN node listener was ready"
862                            .to_string()
863                            .into(),
864                    ));
865                };
866
867                node_listeners = vec![listener];
868            }
869
870            node_listeners.iter_mut().for_each(|multiaddr| {
871                multiaddr.push(Protocol::P2p(node.id()));
872            });
873
874            node_listeners
875        } else {
876            bootstrap_nodes
877        }
878    };
879
880    let substrate_prometheus_registry = config
881        .base
882        .prometheus_config
883        .as_ref()
884        .map(|prometheus_config| prometheus_config.registry.clone());
885    let import_queue_service1 = import_queue.service();
886    let import_queue_service2 = import_queue.service();
887    let network_wrapper = Arc::new(NetworkWrapper::default());
888    let block_relay = build_consensus_relay(
889        network_wrapper.clone(),
890        client.clone(),
891        transaction_pool.clone(),
892        substrate_prometheus_registry.as_ref(),
893    )
894    .map_err(Error::BlockRelay)?;
895    let mut net_config = sc_network::config::FullNetworkConfiguration::new(
896        &config.base.network,
897        substrate_prometheus_registry.clone(),
898    );
899    let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
900        xdm_gossip_peers_set_config();
901    net_config.add_notification_protocol(xdm_gossip_notification_config);
902    let (pot_gossip_notification_config, pot_gossip_notification_service) =
903        pot_gossip_peers_set_config();
904    net_config.add_notification_protocol(pot_gossip_notification_config);
905    let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
906
907    let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
908        + net_config
909            .network_config
910            .default_peers_set
911            .reserved_nodes
912            .len();
913
914    let protocol_id = config.base.protocol_id();
915    let fork_id = config.base.chain_spec.fork_id();
916
917    // enable domain block ER request handler
918    let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
919        NetworkWorker<Block, BlockHashFor<Block>>,
920    >(fork_id, client.clone(), num_peer_hint);
921    task_manager.spawn_handle().spawn(
922        "domain-block-er-request-handler",
923        Some("networking"),
924        handler.run(),
925    );
926    net_config.add_request_response_protocol(protocol_config);
927
928    if let Some(offchain_storage) = backend.offchain_storage() {
929        // Allow both outgoing and incoming requests.
930        let (handler, protocol_config) =
931            MmrRequestHandler::new::<NetworkWorker<Block, BlockHashFor<Block>>>(
932                &config.base.protocol_id(),
933                fork_id,
934                client.clone(),
935                num_peer_hint,
936                offchain_storage,
937            );
938        task_manager
939            .spawn_handle()
940            .spawn("mmr-request-handler", Some("networking"), handler.run());
941
942        net_config.add_request_response_protocol(protocol_config);
943    }
944
945    let network_service_provider = NetworkServiceProvider::new();
946    let network_service_handle = network_service_provider.handle();
947    let (network_service, system_rpc_tx, tx_handler_controller, sync_service) = {
948        let spawn_handle = task_manager.spawn_handle();
949        let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
950
951        // TODO: Remove BlockRelayParams here and simplify relay initialization
952        let block_downloader = {
953            let BlockRelayParams {
954                mut server,
955                downloader,
956                request_response_config,
957            } = block_relay;
958            net_config.add_request_response_protocol(request_response_config);
959
960            spawn_handle.spawn("block-request-handler", Some("networking"), async move {
961                server.run().await;
962            });
963
964            downloader
965        };
966
967        let syncing_strategy = build_polkadot_syncing_strategy(
968            protocol_id.clone(),
969            fork_id,
970            &mut net_config,
971            None,
972            block_downloader,
973            client.clone(),
974            &spawn_handle,
975            substrate_prometheus_registry.as_ref(),
976        )?;
977
978        let (syncing_engine, sync_service, block_announce_config) =
979            SyncingEngine::new::<NetworkWorker<_, _>>(
980                Roles::from(&config.base.role),
981                Arc::clone(&client),
982                substrate_prometheus_registry.as_ref(),
983                metrics.clone(),
984                &net_config,
985                protocol_id.clone(),
986                fork_id,
987                Box::new(DefaultBlockAnnounceValidator),
988                syncing_strategy,
989                network_service_provider.handle(),
990                import_queue.service(),
991                net_config.peer_store_handle(),
992                config.base.network.force_synced,
993            )
994            .map_err(sc_service::Error::from)?;
995
996        spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
997
998        build_network_advanced(BuildNetworkAdvancedParams {
999            role: config.base.role,
1000            protocol_id,
1001            fork_id,
1002            ipfs_server: config.base.network.ipfs_server,
1003            announce_block: config.base.announce_block,
1004            net_config,
1005            client: Arc::clone(&client),
1006            transaction_pool: Arc::clone(&transaction_pool),
1007            spawn_handle,
1008            import_queue,
1009            sync_service,
1010            block_announce_config,
1011            network_service_provider,
1012            metrics_registry: substrate_prometheus_registry.as_ref(),
1013            metrics,
1014        })?
1015    };
1016
1017    task_manager.spawn_handle().spawn(
1018        "sync-target-follower",
1019        None,
1020        Box::pin({
1021            let sync_service = sync_service.clone();
1022            let sync_target_block_number = Arc::clone(&sync_target_block_number);
1023
1024            async move {
1025                loop {
1026                    let best_seen_block = sync_service
1027                        .status()
1028                        .await
1029                        .map(|status| status.best_seen_block.unwrap_or_default())
1030                        .unwrap_or_default();
1031                    sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1032
1033                    tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1034                }
1035            }
1036        }),
1037    );
1038
1039    let sync_oracle = SubspaceSyncOracle::new(
1040        config.base.force_authoring,
1041        Arc::clone(&pause_sync),
1042        sync_service.clone(),
1043    );
1044
1045    let subspace_archiver = tokio::task::block_in_place(|| {
1046        create_subspace_archiver(
1047            segment_headers_store.clone(),
1048            subspace_link.clone(),
1049            client.clone(),
1050            sync_oracle.clone(),
1051            telemetry.as_ref().map(|telemetry| telemetry.handle()),
1052            config.create_object_mappings,
1053        )
1054    })
1055    .map_err(ServiceError::Client)?;
1056
1057    task_manager
1058        .spawn_essential_handle()
1059        .spawn_essential_blocking(
1060            "subspace-archiver",
1061            None,
1062            Box::pin(async move {
1063                if let Err(error) = subspace_archiver.await {
1064                    error!(%error, "Archiver exited with error");
1065                }
1066            }),
1067        );
1068
1069    network_wrapper.set(network_service.clone());
1070
1071    if !config.base.network.force_synced {
1072        // Start with DSN sync in this case
1073        pause_sync.store(true, Ordering::Release);
1074    }
1075
1076    let snap_sync_task = snap_sync(
1077        segment_headers_store.clone(),
1078        node.clone(),
1079        fork_id.map(|fork_id| fork_id.to_string()),
1080        Arc::clone(&client),
1081        import_queue_service1,
1082        pause_sync.clone(),
1083        piece_getter.clone(),
1084        sync_service.clone(),
1085        network_service_handle,
1086        subspace_link.erasure_coding().clone(),
1087        consensus_snap_sync_target_block_receiver,
1088        backend.offchain_storage(),
1089        network_service.clone(),
1090    );
1091
1092    let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1093        segment_headers_store.clone(),
1094        Arc::clone(&network_service),
1095        node.clone(),
1096        Arc::clone(&client),
1097        import_queue_service2,
1098        sync_service.clone(),
1099        sync_target_block_number,
1100        pause_sync,
1101        piece_getter,
1102        subspace_link.erasure_coding().clone(),
1103    );
1104    task_manager
1105        .spawn_handle()
1106        .spawn("observer", Some("sync-from-dsn"), observer);
1107    task_manager
1108        .spawn_essential_handle()
1109        .spawn_essential_blocking(
1110            "worker",
1111            Some("sync-from-dsn"),
1112            Box::pin(async move {
1113                // Run snap-sync before DSN-sync.
1114                if config.sync == ChainSyncMode::Snap
1115                    && let Err(error) = snap_sync_task.in_current_span().await
1116                {
1117                    error!(%error, "Snap sync exited with a fatal error");
1118                    return;
1119                }
1120
1121                if let Err(error) = worker.await {
1122                    error!(%error, "Sync from DSN exited with an error");
1123                }
1124            }),
1125        );
1126
1127    if let Some(registry) = substrate_prometheus_registry.as_ref() {
1128        match NodeMetrics::new(
1129            client.clone(),
1130            client.every_import_notification_stream(),
1131            registry,
1132        ) {
1133            Ok(node_metrics) => {
1134                task_manager.spawn_handle().spawn(
1135                    "node_metrics",
1136                    None,
1137                    Box::pin(async move {
1138                        node_metrics.run().await;
1139                    }),
1140                );
1141            }
1142            Err(err) => {
1143                error!("Failed to initialize node metrics: {err:?}");
1144            }
1145        }
1146    }
1147
1148    let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1149
1150    if config.base.offchain_worker.enabled {
1151        let offchain_workers =
1152            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1153                runtime_api_provider: client.clone(),
1154                is_validator: config.base.role.is_authority(),
1155                keystore: Some(keystore_container.keystore()),
1156                offchain_db: backend.offchain_storage(),
1157                transaction_pool: Some(offchain_tx_pool_factory.clone()),
1158                network_provider: Arc::new(network_service.clone()),
1159                enable_http_requests: true,
1160                custom_extensions: |_| vec![],
1161            })?;
1162        task_manager.spawn_handle().spawn(
1163            "offchain-workers-runner",
1164            "offchain-worker",
1165            offchain_workers
1166                .run(client.clone(), task_manager.spawn_handle())
1167                .boxed(),
1168        );
1169    }
1170
1171    // mmr offchain indexer
1172    if offchain_indexing_enabled {
1173        task_manager.spawn_essential_handle().spawn_blocking(
1174            "mmr-gadget",
1175            None,
1176            mmr_gadget::MmrGadget::start(
1177                client.clone(),
1178                backend.clone(),
1179                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1180            ),
1181        );
1182    }
1183
1184    let backoff_authoring_blocks: Option<()> = None;
1185
1186    let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1187    let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1188    let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1189    let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1190    let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1191
1192    let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1193        config.is_timekeeper,
1194        config.timekeeper_cpu_cores,
1195        client.clone(),
1196        pot_verifier.clone(),
1197        Arc::clone(&network_service),
1198        pot_gossip_notification_service,
1199        sync_service.clone(),
1200        sync_oracle.clone(),
1201    )
1202    .map_err(|error| Error::Other(error.into()))?;
1203
1204    let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1205
1206    task_manager
1207        .spawn_essential_handle()
1208        .spawn("pot-source", Some("pot"), pot_source_worker.run());
1209    task_manager
1210        .spawn_essential_handle()
1211        .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1212
1213    if config.base.role.is_authority() || config.force_new_slot_notifications {
1214        let proposer_factory = ProposerFactory::new(
1215            task_manager.spawn_handle(),
1216            client.clone(),
1217            transaction_pool.clone(),
1218            substrate_prometheus_registry.as_ref(),
1219            telemetry.as_ref().map(|x| x.handle()),
1220        );
1221
1222        let subspace_slot_worker =
1223            SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1224                client: client.clone(),
1225                env: proposer_factory,
1226                block_import,
1227                sync_oracle: sync_oracle.clone(),
1228                justification_sync_link: sync_service.clone(),
1229                force_authoring: config.base.force_authoring,
1230                backoff_authoring_blocks,
1231                subspace_link: subspace_link.clone(),
1232                segment_headers_store: segment_headers_store.clone(),
1233                block_proposal_slot_portion,
1234                max_block_proposal_slot_portion: None,
1235                telemetry: telemetry.as_ref().map(|x| x.handle()),
1236                offchain_tx_pool_factory,
1237                pot_verifier,
1238            });
1239
1240        let create_inherent_data_providers = {
1241            let client = client.clone();
1242            let segment_headers_store = segment_headers_store.clone();
1243
1244            move |parent_hash, ()| {
1245                let client = client.clone();
1246                let segment_headers_store = segment_headers_store.clone();
1247
1248                async move {
1249                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1250
1251                    let parent_header = client
1252                        .header(parent_hash)?
1253                        .expect("Parent header must always exist when block is created; qed");
1254
1255                    let parent_block_number = parent_header.number;
1256
1257                    let subspace_inherents =
1258                        sp_consensus_subspace::inherents::InherentDataProvider::new(
1259                            segment_headers_store
1260                                .segment_headers_for_block(parent_block_number + 1),
1261                        );
1262
1263                    Ok((timestamp, subspace_inherents))
1264                }
1265            }
1266        };
1267
1268        info!("🧑‍🌾 Starting Subspace Authorship worker");
1269        let slot_worker_task = sc_proof_of_time::start_slot_worker(
1270            subspace_link.chain_constants().slot_duration(),
1271            client.clone(),
1272            select_chain.clone(),
1273            subspace_slot_worker,
1274            sync_oracle.clone(),
1275            create_inherent_data_providers,
1276            pot_slot_info_stream,
1277        );
1278
1279        // Subspace authoring task is considered essential, i.e. if it fails we take down the
1280        // service with it.
1281        task_manager.spawn_essential_handle().spawn_blocking(
1282            "subspace-proposer",
1283            Some("block-authoring"),
1284            slot_worker_task,
1285        );
1286    }
1287
1288    // We replace the Substrate implementation of metrics server with our own.
1289    config.base.prometheus_config.take();
1290
1291    let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1292        network: network_service.clone(),
1293        client: client.clone(),
1294        keystore: keystore_container.keystore(),
1295        task_manager: &mut task_manager,
1296        transaction_pool: transaction_pool.clone(),
1297        rpc_builder: if enable_rpc_extensions {
1298            let client = client.clone();
1299            let new_slot_notification_stream = new_slot_notification_stream.clone();
1300            let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1301            let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1302            let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1303            let transaction_pool = transaction_pool.clone();
1304            let backend = backend.clone();
1305
1306            #[allow(clippy::result_large_err)]
1307            Box::new(move |subscription_executor| {
1308                let deps = rpc::FullDeps {
1309                    client: client.clone(),
1310                    pool: transaction_pool.clone(),
1311                    subscription_executor,
1312                    new_slot_notification_stream: new_slot_notification_stream.clone(),
1313                    reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1314                    object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1315                    archived_segment_notification_stream: archived_segment_notification_stream
1316                        .clone(),
1317                    dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1318                    segment_headers_store: segment_headers_store.clone(),
1319                    sync_oracle: sync_oracle.clone(),
1320                    kzg: subspace_link.kzg().clone(),
1321                    erasure_coding: subspace_link.erasure_coding().clone(),
1322                    backend: backend.clone(),
1323                };
1324
1325                rpc::create_full(deps).map_err(Into::into)
1326            })
1327        } else {
1328            #[allow(clippy::result_large_err)]
1329            Box::new(|_| Ok(RpcModule::new(())))
1330        },
1331        backend: backend.clone(),
1332        system_rpc_tx,
1333        config: config.base,
1334        telemetry: telemetry.as_mut(),
1335        tx_handler_controller,
1336        sync_service: sync_service.clone(),
1337        // None: trace_block RPC not supported, Subspace uses its own tracing
1338        tracing_execute_block: None,
1339    })?;
1340
1341    Ok(NewFull {
1342        task_manager,
1343        client,
1344        select_chain,
1345        network_service,
1346        xdm_gossip_notification_service,
1347        sync_service,
1348        rpc_handlers,
1349        backend,
1350        pot_slot_info_stream: additional_pot_slot_info_stream,
1351        new_slot_notification_stream,
1352        reward_signing_notification_stream,
1353        block_importing_notification_stream,
1354        object_mapping_notification_stream,
1355        archived_segment_notification_stream,
1356        transaction_pool,
1357    })
1358}
1359
1360/// The storage key of the `ConfirmationDepthK` storage item in `pallet-runtime-configs`
1361fn confirmation_depth_storage_key() -> StorageKey {
1362    StorageKey(
1363        frame_support::storage::storage_prefix(
1364            // This is the name used for `pallet-runtime-configs` in the `construct_runtime` macro
1365            // i.e. `RuntimeConfigs: pallet_runtime_configs = 14`
1366            "RuntimeConfigs".as_bytes(),
1367            // This is the storage item name used inside `pallet-runtime-configs`
1368            "ConfirmationDepthK".as_bytes(),
1369        )
1370        .to_vec(),
1371    )
1372}
1373
1374fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1375    let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1376    let spec: serde_json::Value =
1377        serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1378    let encoded_confirmation_depth = hex::decode(
1379        spec.pointer(format!("/genesis/raw/top/{storage_key}").as_str())?
1380            .as_str()?
1381            .trim_start_matches("0x"),
1382    )
1383    .ok()?;
1384    u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1385}
1386
1387#[cfg(test)]
1388mod test {
1389    use static_assertions::const_assert_eq;
1390    use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1391    use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1392
1393    /// Runtime and archiver code must agree on the consensus block length.
1394    /// (This avoids importing all the runtime primitives code into the farmer and gateway.)
1395    #[test]
1396    fn max_block_length_consistent() {
1397        const_assert_eq!(
1398            CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1399            ARCHIVER_MAX_BLOCK_LENGTH,
1400        );
1401    }
1402}