subspace_service/
lib.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2#![feature(
3    duration_constructors_lite,
4    impl_trait_in_assoc_type,
5    int_roundings,
6    type_alias_impl_trait,
7    type_changing_struct_update
8)]
9
10pub mod config;
11pub mod dsn;
12mod metrics;
13pub(crate) mod mmr;
14pub mod rpc;
15pub mod sync_from_dsn;
16mod task_spawner;
17mod utils;
18
19use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
20use crate::dsn::{DsnConfigurationError, create_dsn_instance};
21use crate::metrics::NodeMetrics;
22use crate::mmr::request_handler::MmrRequestHandler;
23use crate::sync_from_dsn::DsnPieceGetter;
24use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
25use crate::sync_from_dsn::snap_sync::snap_sync;
26use async_lock::Semaphore;
27use core::sync::atomic::{AtomicU32, Ordering};
28use cross_domain_message_gossip::xdm_gossip_peers_set_config;
29use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
30use frame_system_rpc_runtime_api::AccountNonceApi;
31use futures::FutureExt;
32use futures::channel::oneshot;
33use jsonrpsee::RpcModule;
34use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
35use parity_scale_codec::Decode;
36use parking_lot::Mutex;
37use prometheus_client::registry::Registry;
38use sc_basic_authorship::ProposerFactory;
39use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
40use sc_client_api::execution_extensions::ExtensionsFactory;
41use sc_client_api::{
42    AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
43};
44use sc_consensus::{
45    BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
46    DefaultImportQueue, ImportQueue, ImportResult,
47};
48use sc_consensus_slots::SlotProportion;
49use sc_consensus_subspace::SubspaceLink;
50use sc_consensus_subspace::archiver::{
51    ArchivedSegmentNotification, ObjectMappingNotification, SegmentHeadersStore,
52    create_subspace_archiver,
53};
54use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
55use sc_consensus_subspace::notification::SubspaceNotificationStream;
56use sc_consensus_subspace::slot_worker::{
57    NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
58    SubspaceSyncOracle,
59};
60use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
61use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
62use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
63use sc_network::service::traits::NetworkService;
64use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
65use sc_network_sync::block_relay_protocol::BlockRelayParams;
66use sc_network_sync::engine::SyncingEngine;
67use sc_network_sync::service::network::NetworkServiceProvider;
68use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
69use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
70use sc_proof_of_time::verifier::PotVerifier;
71use sc_service::error::Error as ServiceError;
72use sc_service::{
73    BuildNetworkAdvancedParams, Configuration, NetworkStarter, SpawnTasksParams, TaskManager,
74    build_network_advanced, build_polkadot_syncing_strategy,
75};
76use sc_subspace_block_relay::{
77    BlockRelayConfigurationError, NetworkWrapper, build_consensus_relay,
78};
79use sc_telemetry::{Telemetry, TelemetryWorker};
80use sc_transaction_pool::TransactionPoolHandle;
81use sc_transaction_pool_api::OffchainTransactionPoolFactory;
82use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
83use sp_block_builder::BlockBuilder;
84use sp_blockchain::HeaderMetadata;
85use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
86use sp_consensus_slots::Slot;
87use sp_consensus_subspace::digests::extract_pre_digest;
88use sp_consensus_subspace::{
89    KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
90};
91use sp_core::H256;
92use sp_core::offchain::OffchainDbExt;
93use sp_core::offchain::storage::OffchainDb;
94use sp_core::traits::SpawnEssentialNamed;
95use sp_domains::storage::StorageKey;
96use sp_domains::{BundleProducerElectionApi, DomainsApi};
97use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
98use sp_externalities::Extensions;
99use sp_messenger::MessengerApi;
100use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
101use sp_mmr_primitives::MmrApi;
102use sp_objects::ObjectsApi;
103use sp_offchain::OffchainWorkerApi;
104use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
105use sp_session::SessionKeys;
106use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
107use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
108use static_assertions::const_assert;
109use std::marker::PhantomData;
110use std::num::NonZeroUsize;
111use std::sync::Arc;
112use std::time::Duration;
113use subspace_core_primitives::pieces::Record;
114use subspace_core_primitives::pot::PotSeed;
115use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
116use subspace_erasure_coding::ErasureCoding;
117use subspace_kzg::Kzg;
118use subspace_networking::libp2p::multiaddr::Protocol;
119use subspace_networking::utils::piece_provider::PieceProvider;
120use subspace_proof_of_space::Table;
121use subspace_runtime_primitives::opaque::Block;
122use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, Hash, Nonce};
123use tokio::sync::broadcast;
124use tracing::{Instrument, debug, error, info};
125pub use utils::wait_for_block_import;
126
127// There are multiple places where it is assumed that node is running on 64-bit system, refuse to
128// compile otherwise
129const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
130
131/// This is over 15 minutes of slots assuming there are no forks, should be both sufficient and not
132/// too large to handle
133const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
134const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
135/// Multiplier on top of outgoing connections number for piece downloading purposes
136const PIECE_PROVIDER_MULTIPLIER: usize = 10;
137
138/// Error type for Subspace service.
139#[derive(thiserror::Error, Debug)]
140pub enum Error {
141    /// IO error.
142    #[error(transparent)]
143    Io(#[from] std::io::Error),
144
145    /// Address parsing error.
146    #[error(transparent)]
147    AddrFormatInvalid(#[from] std::net::AddrParseError),
148
149    /// Substrate service error.
150    #[error(transparent)]
151    Sub(#[from] sc_service::Error),
152
153    /// Substrate consensus error.
154    #[error(transparent)]
155    Consensus(#[from] sp_consensus::Error),
156
157    /// Telemetry error.
158    #[error(transparent)]
159    Telemetry(#[from] sc_telemetry::Error),
160
161    /// Subspace networking (DSN) error.
162    #[error(transparent)]
163    SubspaceDsn(#[from] DsnConfigurationError),
164
165    /// Failed to set up block relay.
166    #[error(transparent)]
167    BlockRelay(#[from] BlockRelayConfigurationError),
168
169    /// Other.
170    #[error(transparent)]
171    Other(Box<dyn std::error::Error + Send + Sync>),
172}
173
174// Simple wrapper whose ony purpose is to convert error type
175#[derive(Clone)]
176struct BlockImportWrapper<BI>(BI);
177
178#[async_trait::async_trait]
179impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
180where
181    Block: BlockT,
182    BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
183        + Send
184        + Sync,
185{
186    type Error = sp_consensus::Error;
187
188    async fn check_block(
189        &self,
190        block: BlockCheckParams<Block>,
191    ) -> Result<ImportResult, Self::Error> {
192        self.0
193            .check_block(block)
194            .await
195            .map_err(|error| sp_consensus::Error::Other(error.into()))
196    }
197
198    async fn import_block(
199        &self,
200        block: BlockImportParams<Block>,
201    ) -> Result<ImportResult, Self::Error> {
202        self.0
203            .import_block(block)
204            .await
205            .map_err(|error| sp_consensus::Error::Other(error.into()))
206    }
207}
208
209/// Host functions required for Subspace
210#[cfg(not(feature = "runtime-benchmarks"))]
211pub type HostFunctions = (
212    sp_io::SubstrateHostFunctions,
213    sp_consensus_subspace::consensus::HostFunctions,
214    sp_domains_fraud_proof::HostFunctions,
215    sp_subspace_mmr::HostFunctions,
216    sp_messenger_host_functions::HostFunctions,
217);
218
219/// Host functions required for Subspace
220#[cfg(feature = "runtime-benchmarks")]
221pub type HostFunctions = (
222    sp_io::SubstrateHostFunctions,
223    frame_benchmarking::benchmarking::HostFunctions,
224    sp_consensus_subspace::consensus::HostFunctions,
225    sp_domains_fraud_proof::HostFunctions,
226    sp_subspace_mmr::HostFunctions,
227    sp_messenger_host_functions::HostFunctions,
228);
229
230/// Runtime executor for Subspace
231pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
232
233/// Subspace-like full client.
234pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
235
236pub type FullBackend = sc_service::TFullBackend<Block>;
237pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
238
239struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
240    kzg: Kzg,
241    client: Arc<Client>,
242    backend: Arc<FullBackend>,
243    pot_verifier: PotVerifier,
244    domains_executor: Arc<sc_domains::RuntimeExecutor>,
245    confirmation_depth_k: BlockNumber,
246    _pos_table: PhantomData<(PosTable, DomainBlock)>,
247}
248
249impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
250    for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
251where
252    PosTable: Table,
253    Block: BlockT,
254    Block::Hash: From<H256> + Into<H256>,
255    DomainBlock: BlockT,
256    DomainBlock::Hash: Into<H256> + From<H256>,
257    Client: BlockBackend<Block>
258        + HeaderBackend<Block>
259        + ProvideRuntimeApi<Block>
260        + Send
261        + Sync
262        + 'static,
263    Client::Api: SubspaceApi<Block, PublicKey>
264        + DomainsApi<Block, DomainBlock::Header>
265        + BundleProducerElectionApi<Block, Balance>
266        + MmrApi<Block, H256, NumberFor<Block>>
267        + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
268{
269    fn extensions_for(
270        &self,
271        _block_hash: Block::Hash,
272        _block_number: NumberFor<Block>,
273    ) -> Extensions {
274        let confirmation_depth_k = self.confirmation_depth_k;
275        let mut exts = Extensions::new();
276        exts.register(KzgExtension::new(self.kzg.clone()));
277        exts.register(PosExtension::new::<PosTable>());
278        exts.register(PotExtension::new({
279            let client = Arc::clone(&self.client);
280            let pot_verifier = self.pot_verifier.clone();
281
282            Box::new(
283                move |parent_hash, slot, proof_of_time, quick_verification| {
284                    let parent_hash = {
285                        let mut converted_parent_hash = Block::Hash::default();
286                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
287                        converted_parent_hash
288                    };
289
290                    let parent_header = match client.header(parent_hash) {
291                        Ok(Some(parent_header)) => parent_header,
292                        Ok(None) => {
293                            if quick_verification {
294                                error!(
295                                    %parent_hash,
296                                    "Header not found during proof of time verification"
297                                );
298
299                                return false;
300                            } else {
301                                debug!(
302                                    %parent_hash,
303                                    "Header not found during proof of time verification"
304                                );
305
306                                // This can only happen during special sync modes there are no other
307                                // cases where parent header may not be available, hence allow it
308                                return true;
309                            }
310                        }
311                        Err(error) => {
312                            error!(
313                                %error,
314                                %parent_hash,
315                                "Failed to retrieve header during proof of time verification"
316                            );
317
318                            return false;
319                        }
320                    };
321
322                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
323                        Ok(parent_pre_digest) => parent_pre_digest,
324                        Err(error) => {
325                            error!(
326                                %error,
327                                %parent_hash,
328                                parent_number = %parent_header.number(),
329                                "Failed to extract pre-digest from parent header during proof of \
330                                time verification, this must never happen"
331                            );
332
333                            return false;
334                        }
335                    };
336
337                    let parent_slot = parent_pre_digest.slot();
338                    if slot <= *parent_slot {
339                        return false;
340                    }
341
342                    let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
343                        Ok(pot_parameters) => pot_parameters,
344                        Err(error) => {
345                            debug!(
346                                %error,
347                                %parent_hash,
348                                parent_number = %parent_header.number(),
349                                "Failed to retrieve proof of time parameters during proof of time \
350                                verification"
351                            );
352
353                            return false;
354                        }
355                    };
356
357                    let pot_input = if parent_header.number().is_zero() {
358                        PotNextSlotInput {
359                            slot: parent_slot + Slot::from(1),
360                            slot_iterations: pot_parameters.slot_iterations(),
361                            seed: pot_verifier.genesis_seed(),
362                        }
363                    } else {
364                        let pot_info = parent_pre_digest.pot_info();
365
366                        PotNextSlotInput::derive(
367                            pot_parameters.slot_iterations(),
368                            parent_slot,
369                            pot_info.proof_of_time(),
370                            &pot_parameters.next_parameters_change(),
371                        )
372                    };
373
374                    // Ensure proof of time and future proof of time included in upcoming block are
375                    // valid
376
377                    if quick_verification {
378                        pot_verifier.try_is_output_valid(
379                            pot_input,
380                            Slot::from(slot) - parent_slot,
381                            proof_of_time,
382                            pot_parameters.next_parameters_change(),
383                        )
384                    } else {
385                        pot_verifier.is_output_valid(
386                            pot_input,
387                            Slot::from(slot) - parent_slot,
388                            proof_of_time,
389                            pot_parameters.next_parameters_change(),
390                        )
391                    }
392                },
393            )
394        }));
395
396        exts.register(FraudProofExtension::new(Arc::new(
397            FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
398                self.client.clone(),
399                self.domains_executor.clone(),
400                move |client, executor| {
401                    let extension_factory =
402                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
403                            client,
404                            executor,
405                            confirmation_depth_k,
406                        );
407                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
408                },
409            ),
410        )));
411
412        exts.register(SubspaceMmrExtension::new(Arc::new(
413            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
414                self.client.clone(),
415                confirmation_depth_k,
416            ),
417        )));
418
419        exts.register(MessengerExtension::new(Arc::new(
420            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
421                self.client.clone(),
422                self.domains_executor.clone(),
423            ),
424        )));
425
426        // if the offchain storage is available, then add offchain extension
427        // to generate and verify MMR proofs
428        if let Some(offchain_storage) = self.backend.offchain_storage() {
429            let offchain_db = OffchainDb::new(offchain_storage);
430            exts.register(OffchainDbExt::new(offchain_db));
431        }
432
433        exts
434    }
435}
436
437/// Other partial components returned by [`new_partial()`]
438pub struct OtherPartialComponents<RuntimeApi>
439where
440    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
441{
442    /// Subspace block import
443    pub block_import: BoxBlockImport<Block>,
444    /// Subspace link
445    pub subspace_link: SubspaceLink<Block>,
446    /// Segment headers store
447    pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
448    /// Proof of time verifier
449    pub pot_verifier: PotVerifier,
450    /// Approximate target block number for syncing purposes
451    pub sync_target_block_number: Arc<AtomicU32>,
452    /// Telemetry
453    pub telemetry: Option<Telemetry>,
454}
455
456type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
457    FullClient<RuntimeApi>,
458    FullBackend,
459    FullSelectChain,
460    DefaultImportQueue<Block>,
461    TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
462    OtherPartialComponents<RuntimeApi>,
463>;
464
465/// Creates `PartialComponents` for Subspace client.
466#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
467pub fn new_partial<PosTable, RuntimeApi>(
468    // TODO: Stop using `Configuration` once
469    //  https://github.com/paritytech/polkadot-sdk/pull/5364 is in our fork
470    config: &Configuration,
471    // TODO: Replace with check for `ChainSyncMode` once we get rid of ^ `Configuration`
472    snap_sync: bool,
473    pot_external_entropy: &[u8],
474) -> Result<PartialComponents<RuntimeApi>, ServiceError>
475where
476    PosTable: Table,
477    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
478    RuntimeApi::RuntimeApi: ApiExt<Block>
479        + Metadata<Block>
480        + BlockBuilder<Block>
481        + OffchainWorkerApi<Block>
482        + SessionKeys<Block>
483        + TaggedTransactionQueue<Block>
484        + SubspaceApi<Block, PublicKey>
485        + DomainsApi<Block, DomainHeader>
486        + FraudProofApi<Block, DomainHeader>
487        + BundleProducerElectionApi<Block, Balance>
488        + ObjectsApi<Block>
489        + MmrApi<Block, H256, NumberFor<Block>>
490        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
491{
492    let telemetry = config
493        .telemetry_endpoints
494        .clone()
495        .filter(|x| !x.is_empty())
496        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
497            let worker = TelemetryWorker::new(16)?;
498            let telemetry = worker.handle().new_telemetry(endpoints);
499            Ok((worker, telemetry))
500        })
501        .transpose()?;
502
503    let executor = sc_service::new_wasm_executor(&config.executor);
504    let domains_executor = sc_service::new_wasm_executor(&config.executor);
505
506    let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
507        ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
508    )?;
509
510    let backend = Arc::new(sc_client_db::Backend::new(
511        config.db_config(),
512        confirmation_depth_k.into(),
513    )?);
514
515    let genesis_block_builder = GenesisBlockBuilder::new(
516        config.chain_spec.as_storage_builder(),
517        !snap_sync,
518        backend.clone(),
519        executor.clone(),
520    )?;
521
522    let (client, backend, keystore_container, task_manager) =
523        sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
524            config,
525            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
526            executor.clone(),
527            backend,
528            genesis_block_builder,
529            false,
530        )?;
531
532    // TODO: Make these explicit arguments we no longer use Substate's `Configuration`
533    let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
534        rayon::join(Kzg::new, || {
535            ErasureCoding::new(
536                NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
537                    .expect("Not zero; qed"),
538            )
539            .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
540        })
541    });
542    let erasure_coding = maybe_erasure_coding?;
543
544    let client = Arc::new(client);
545    let client_info = client.info();
546    let chain_constants = client
547        .runtime_api()
548        .chain_constants(client_info.best_hash)
549        .map_err(|error| ServiceError::Application(error.into()))?;
550
551    let pot_verifier = PotVerifier::new(
552        PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
553        POT_VERIFIER_CACHE_SIZE,
554    );
555
556    // ensure the extracted confirmation_depth matches the one from runtime
557    assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
558
559    client
560        .execution_extensions()
561        .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
562            kzg: kzg.clone(),
563            client: Arc::clone(&client),
564            pot_verifier: pot_verifier.clone(),
565            domains_executor: Arc::new(domains_executor),
566            backend: backend.clone(),
567            confirmation_depth_k: chain_constants.confirmation_depth_k(),
568            _pos_table: PhantomData,
569        });
570
571    let telemetry = telemetry.map(|(worker, telemetry)| {
572        task_manager
573            .spawn_handle()
574            .spawn("telemetry", None, worker.run());
575        telemetry
576    });
577
578    let select_chain = sc_consensus::LongestChain::new(backend.clone());
579
580    let segment_headers_store = tokio::task::block_in_place(|| {
581        SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
582    })
583    .map_err(|error| ServiceError::Application(error.into()))?;
584
585    let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
586    let segment_headers_store = segment_headers_store.clone();
587
588    let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
589        client.clone(),
590        client.clone(),
591        subspace_link.clone(),
592        {
593            let client = client.clone();
594            let segment_headers_store = segment_headers_store.clone();
595
596            move |parent_hash, ()| {
597                let client = client.clone();
598                let segment_headers_store = segment_headers_store.clone();
599
600                async move {
601                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
602
603                    let parent_header = client
604                        .header(parent_hash)?
605                        .expect("Parent header must always exist when block is created; qed");
606
607                    let parent_block_number = parent_header.number;
608
609                    let subspace_inherents =
610                        sp_consensus_subspace::inherents::InherentDataProvider::new(
611                            segment_headers_store
612                                .segment_headers_for_block(parent_block_number + 1),
613                        );
614
615                    Ok((timestamp, subspace_inherents))
616                }
617            }
618        },
619        segment_headers_store.clone(),
620        pot_verifier.clone(),
621    );
622
623    let sync_target_block_number = Arc::new(AtomicU32::new(0));
624    let transaction_pool = Arc::from(
625        sc_transaction_pool::Builder::new(
626            task_manager.spawn_essential_handle(),
627            client.clone(),
628            config.role.is_authority().into(),
629        )
630        .with_options(config.transaction_pool.clone())
631        .with_prometheus(config.prometheus_registry())
632        .build(),
633    );
634
635    let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
636        client: client.clone(),
637        chain_constants,
638        kzg,
639        telemetry: telemetry.as_ref().map(|x| x.handle()),
640        reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
641        sync_target_block_number: Arc::clone(&sync_target_block_number),
642        is_authoring_blocks: config.role.is_authority(),
643        pot_verifier: pot_verifier.clone(),
644    });
645
646    let import_queue = BasicQueue::new(
647        verifier,
648        Box::new(BlockImportWrapper(block_import.clone())),
649        None,
650        &task_manager.spawn_essential_handle(),
651        config.prometheus_registry(),
652    );
653
654    let other = OtherPartialComponents {
655        block_import: Box::new(BlockImportWrapper(block_import.clone())),
656        subspace_link,
657        segment_headers_store,
658        pot_verifier,
659        sync_target_block_number,
660        telemetry,
661    };
662
663    Ok(PartialComponents {
664        client,
665        backend,
666        task_manager,
667        import_queue,
668        keystore_container,
669        select_chain,
670        transaction_pool,
671        other,
672    })
673}
674
675/// Full node along with some other components.
676pub struct NewFull<Client>
677where
678    Client: ProvideRuntimeApi<Block>
679        + AuxStore
680        + BlockBackend<Block>
681        + BlockIdTo<Block>
682        + HeaderBackend<Block>
683        + HeaderMetadata<Block, Error = sp_blockchain::Error>
684        + 'static,
685    Client::Api: TaggedTransactionQueue<Block>
686        + DomainsApi<Block, DomainHeader>
687        + FraudProofApi<Block, DomainHeader>
688        + SubspaceApi<Block, PublicKey>
689        + MmrApi<Block, H256, NumberFor<Block>>
690        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
691{
692    /// Task manager.
693    pub task_manager: TaskManager,
694    /// Full client.
695    pub client: Arc<Client>,
696    /// Chain selection rule.
697    pub select_chain: FullSelectChain,
698    /// Network service.
699    pub network_service: Arc<dyn NetworkService + Send + Sync>,
700    /// Cross-domain gossip notification service.
701    pub xdm_gossip_notification_service: Box<dyn NotificationService>,
702    /// Sync service.
703    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
704    /// RPC handlers.
705    pub rpc_handlers: sc_service::RpcHandlers,
706    /// Full client backend.
707    pub backend: Arc<FullBackend>,
708    /// Pot slot info stream.
709    pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
710    /// New slot stream.
711    /// Note: this is currently used to send solutions from the farmer during tests.
712    pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
713    /// Block signing stream.
714    pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
715    /// Stream of notifications about blocks about to be imported.
716    pub block_importing_notification_stream:
717        SubspaceNotificationStream<BlockImportingNotification<Block>>,
718    /// Archived object mapping stream.
719    pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
720    /// Archived segment stream.
721    pub archived_segment_notification_stream:
722        SubspaceNotificationStream<ArchivedSegmentNotification>,
723    /// Network starter.
724    pub network_starter: NetworkStarter,
725    /// Transaction pool.
726    pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
727}
728
729type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
730
731/// Builds a new service for a full client.
732pub async fn new_full<PosTable, RuntimeApi>(
733    mut config: SubspaceConfiguration,
734    partial_components: PartialComponents<RuntimeApi>,
735    prometheus_registry: Option<&mut Registry>,
736    enable_rpc_extensions: bool,
737    block_proposal_slot_portion: SlotProportion,
738    consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
739) -> Result<FullNode<RuntimeApi>, Error>
740where
741    PosTable: Table,
742    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
743    RuntimeApi::RuntimeApi: ApiExt<Block>
744        + Metadata<Block>
745        + AccountNonceApi<Block, AccountId, Nonce>
746        + BlockBuilder<Block>
747        + OffchainWorkerApi<Block>
748        + SessionKeys<Block>
749        + TaggedTransactionQueue<Block>
750        + TransactionPaymentApi<Block, Balance>
751        + SubspaceApi<Block, PublicKey>
752        + DomainsApi<Block, DomainHeader>
753        + FraudProofApi<Block, DomainHeader>
754        + ObjectsApi<Block>
755        + MmrApi<Block, Hash, BlockNumber>
756        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
757{
758    let PartialComponents {
759        client,
760        backend,
761        mut task_manager,
762        import_queue,
763        keystore_container,
764        select_chain,
765        transaction_pool,
766        other,
767    } = partial_components;
768    let OtherPartialComponents {
769        block_import,
770        subspace_link,
771        segment_headers_store,
772        pot_verifier,
773        sync_target_block_number,
774        mut telemetry,
775    } = other;
776
777    let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
778    let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
779        SubspaceNetworking::Reuse {
780            node,
781            bootstrap_nodes,
782            piece_getter,
783        } => (node, bootstrap_nodes, piece_getter),
784        SubspaceNetworking::Create { config: dsn_config } => {
785            let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
786
787            debug!(
788                chain_type=?config.base.chain_spec.chain_type(),
789                genesis_hash=%hex::encode(client.chain_info().genesis_hash),
790                "Setting DSN protocol version..."
791            );
792
793            let out_connections = dsn_config.max_out_connections;
794            let (node, mut node_runner) = create_dsn_instance(
795                dsn_protocol_version,
796                dsn_config.clone(),
797                prometheus_registry,
798            )?;
799
800            info!("Subspace networking initialized: Node ID is {}", node.id());
801
802            node.on_new_listener(Arc::new({
803                let node = node.clone();
804
805                move |address| {
806                    info!(
807                        "DSN listening on {}",
808                        address.clone().with(Protocol::P2p(node.id()))
809                    );
810                }
811            }))
812            .detach();
813
814            task_manager
815                .spawn_essential_handle()
816                .spawn_essential_blocking(
817                    "node-runner",
818                    Some("subspace-networking"),
819                    Box::pin(
820                        async move {
821                            node_runner.run().await;
822                        }
823                        .in_current_span(),
824                    ),
825                );
826
827            let piece_provider = PieceProvider::new(
828                node.clone(),
829                SegmentCommitmentPieceValidator::new(
830                    node.clone(),
831                    subspace_link.kzg().clone(),
832                    segment_headers_store.clone(),
833                ),
834                Arc::new(Semaphore::new(
835                    out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
836                )),
837            );
838
839            (
840                node,
841                dsn_config.bootstrap_nodes,
842                Arc::new(DsnPieceGetter::new(piece_provider)) as _,
843            )
844        }
845    };
846
847    let dsn_bootstrap_nodes = {
848        // Fall back to node itself as bootstrap node for DSN so farmer always has someone to
849        // connect to
850        if bootstrap_nodes.is_empty() {
851            let (node_address_sender, node_address_receiver) = oneshot::channel();
852            let _handler = node.on_new_listener(Arc::new({
853                let node_address_sender = Mutex::new(Some(node_address_sender));
854
855                move |address| {
856                    if matches!(address.iter().next(), Some(Protocol::Ip4(_)))
857                        && let Some(node_address_sender) = node_address_sender.lock().take()
858                        && let Err(err) = node_address_sender.send(address.clone())
859                    {
860                        debug!(?err, "Couldn't send a node address to the channel.");
861                    }
862                }
863            }));
864
865            let mut node_listeners = node.listeners();
866
867            if node_listeners.is_empty() {
868                let Ok(listener) = node_address_receiver.await else {
869                    return Err(Error::Other(
870                        "Oneshot receiver dropped before DSN node listener was ready"
871                            .to_string()
872                            .into(),
873                    ));
874                };
875
876                node_listeners = vec![listener];
877            }
878
879            node_listeners.iter_mut().for_each(|multiaddr| {
880                multiaddr.push(Protocol::P2p(node.id()));
881            });
882
883            node_listeners
884        } else {
885            bootstrap_nodes
886        }
887    };
888
889    let substrate_prometheus_registry = config
890        .base
891        .prometheus_config
892        .as_ref()
893        .map(|prometheus_config| prometheus_config.registry.clone());
894    let import_queue_service1 = import_queue.service();
895    let import_queue_service2 = import_queue.service();
896    let network_wrapper = Arc::new(NetworkWrapper::default());
897    let block_relay = build_consensus_relay(
898        network_wrapper.clone(),
899        client.clone(),
900        transaction_pool.clone(),
901        substrate_prometheus_registry.as_ref(),
902    )
903    .map_err(Error::BlockRelay)?;
904    let mut net_config = sc_network::config::FullNetworkConfiguration::new(
905        &config.base.network,
906        substrate_prometheus_registry.clone(),
907    );
908    let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
909        xdm_gossip_peers_set_config();
910    net_config.add_notification_protocol(xdm_gossip_notification_config);
911    let (pot_gossip_notification_config, pot_gossip_notification_service) =
912        pot_gossip_peers_set_config();
913    net_config.add_notification_protocol(pot_gossip_notification_config);
914    let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
915
916    let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
917        + net_config
918            .network_config
919            .default_peers_set
920            .reserved_nodes
921            .len();
922
923    let protocol_id = config.base.protocol_id();
924    let fork_id = config.base.chain_spec.fork_id();
925
926    // enable domain block ER request handler
927    let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
928        NetworkWorker<Block, BlockHashFor<Block>>,
929    >(fork_id, client.clone(), num_peer_hint);
930    task_manager.spawn_handle().spawn(
931        "domain-block-er-request-handler",
932        Some("networking"),
933        handler.run(),
934    );
935    net_config.add_request_response_protocol(protocol_config);
936
937    if let Some(offchain_storage) = backend.offchain_storage() {
938        // Allow both outgoing and incoming requests.
939        let (handler, protocol_config) =
940            MmrRequestHandler::new::<NetworkWorker<Block, BlockHashFor<Block>>>(
941                &config.base.protocol_id(),
942                fork_id,
943                client.clone(),
944                num_peer_hint,
945                offchain_storage,
946            );
947        task_manager
948            .spawn_handle()
949            .spawn("mmr-request-handler", Some("networking"), handler.run());
950
951        net_config.add_request_response_protocol(protocol_config);
952    }
953
954    let network_service_provider = NetworkServiceProvider::new();
955    let network_service_handle = network_service_provider.handle();
956    let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = {
957        let spawn_handle = task_manager.spawn_handle();
958        let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
959
960        // TODO: Remove BlockRelayParams here and simplify relay initialization
961        let block_downloader = {
962            let BlockRelayParams {
963                mut server,
964                downloader,
965                request_response_config,
966            } = block_relay;
967            net_config.add_request_response_protocol(request_response_config);
968
969            spawn_handle.spawn("block-request-handler", Some("networking"), async move {
970                server.run().await;
971            });
972
973            downloader
974        };
975
976        let syncing_strategy = build_polkadot_syncing_strategy(
977            protocol_id.clone(),
978            fork_id,
979            &mut net_config,
980            None,
981            block_downloader,
982            client.clone(),
983            &spawn_handle,
984            substrate_prometheus_registry.as_ref(),
985        )?;
986
987        let (syncing_engine, sync_service, block_announce_config) =
988            SyncingEngine::new::<NetworkWorker<_, _>>(
989                Roles::from(&config.base.role),
990                Arc::clone(&client),
991                substrate_prometheus_registry.as_ref(),
992                metrics.clone(),
993                &net_config,
994                protocol_id.clone(),
995                fork_id,
996                Box::new(DefaultBlockAnnounceValidator),
997                syncing_strategy,
998                network_service_provider.handle(),
999                import_queue.service(),
1000                net_config.peer_store_handle(),
1001                config.base.network.force_synced,
1002            )
1003            .map_err(sc_service::Error::from)?;
1004
1005        spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1006
1007        build_network_advanced(BuildNetworkAdvancedParams {
1008            role: config.base.role,
1009            protocol_id,
1010            fork_id,
1011            ipfs_server: config.base.network.ipfs_server,
1012            announce_block: config.base.announce_block,
1013            net_config,
1014            client: Arc::clone(&client),
1015            transaction_pool: Arc::clone(&transaction_pool),
1016            spawn_handle,
1017            import_queue,
1018            sync_service,
1019            block_announce_config,
1020            network_service_provider,
1021            metrics_registry: substrate_prometheus_registry.as_ref(),
1022            metrics,
1023        })?
1024    };
1025
1026    task_manager.spawn_handle().spawn(
1027        "sync-target-follower",
1028        None,
1029        Box::pin({
1030            let sync_service = sync_service.clone();
1031            let sync_target_block_number = Arc::clone(&sync_target_block_number);
1032
1033            async move {
1034                loop {
1035                    let best_seen_block = sync_service
1036                        .status()
1037                        .await
1038                        .map(|status| status.best_seen_block.unwrap_or_default())
1039                        .unwrap_or_default();
1040                    sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1041
1042                    tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1043                }
1044            }
1045        }),
1046    );
1047
1048    let sync_oracle = SubspaceSyncOracle::new(
1049        config.base.force_authoring,
1050        Arc::clone(&pause_sync),
1051        sync_service.clone(),
1052    );
1053
1054    let subspace_archiver = tokio::task::block_in_place(|| {
1055        create_subspace_archiver(
1056            segment_headers_store.clone(),
1057            subspace_link.clone(),
1058            client.clone(),
1059            sync_oracle.clone(),
1060            telemetry.as_ref().map(|telemetry| telemetry.handle()),
1061            config.create_object_mappings,
1062        )
1063    })
1064    .map_err(ServiceError::Client)?;
1065
1066    task_manager
1067        .spawn_essential_handle()
1068        .spawn_essential_blocking(
1069            "subspace-archiver",
1070            None,
1071            Box::pin(async move {
1072                if let Err(error) = subspace_archiver.await {
1073                    error!(%error, "Archiver exited with error");
1074                }
1075            }),
1076        );
1077
1078    network_wrapper.set(network_service.clone());
1079
1080    if !config.base.network.force_synced {
1081        // Start with DSN sync in this case
1082        pause_sync.store(true, Ordering::Release);
1083    }
1084
1085    let snap_sync_task = snap_sync(
1086        segment_headers_store.clone(),
1087        node.clone(),
1088        fork_id.map(|fork_id| fork_id.to_string()),
1089        Arc::clone(&client),
1090        import_queue_service1,
1091        pause_sync.clone(),
1092        piece_getter.clone(),
1093        sync_service.clone(),
1094        network_service_handle,
1095        subspace_link.erasure_coding().clone(),
1096        consensus_snap_sync_target_block_receiver,
1097        backend.offchain_storage(),
1098        network_service.clone(),
1099    );
1100
1101    let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1102        segment_headers_store.clone(),
1103        Arc::clone(&network_service),
1104        node.clone(),
1105        Arc::clone(&client),
1106        import_queue_service2,
1107        sync_service.clone(),
1108        sync_target_block_number,
1109        pause_sync,
1110        piece_getter,
1111        subspace_link.erasure_coding().clone(),
1112    );
1113    task_manager
1114        .spawn_handle()
1115        .spawn("observer", Some("sync-from-dsn"), observer);
1116    task_manager
1117        .spawn_essential_handle()
1118        .spawn_essential_blocking(
1119            "worker",
1120            Some("sync-from-dsn"),
1121            Box::pin(async move {
1122                // Run snap-sync before DSN-sync.
1123                if config.sync == ChainSyncMode::Snap
1124                    && let Err(error) = snap_sync_task.in_current_span().await
1125                {
1126                    error!(%error, "Snap sync exited with a fatal error");
1127                    return;
1128                }
1129
1130                if let Err(error) = worker.await {
1131                    error!(%error, "Sync from DSN exited with an error");
1132                }
1133            }),
1134        );
1135
1136    if let Some(registry) = substrate_prometheus_registry.as_ref() {
1137        match NodeMetrics::new(
1138            client.clone(),
1139            client.every_import_notification_stream(),
1140            registry,
1141        ) {
1142            Ok(node_metrics) => {
1143                task_manager.spawn_handle().spawn(
1144                    "node_metrics",
1145                    None,
1146                    Box::pin(async move {
1147                        node_metrics.run().await;
1148                    }),
1149                );
1150            }
1151            Err(err) => {
1152                error!("Failed to initialize node metrics: {err:?}");
1153            }
1154        }
1155    }
1156
1157    let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1158
1159    if config.base.offchain_worker.enabled {
1160        let offchain_workers =
1161            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1162                runtime_api_provider: client.clone(),
1163                is_validator: config.base.role.is_authority(),
1164                keystore: Some(keystore_container.keystore()),
1165                offchain_db: backend.offchain_storage(),
1166                transaction_pool: Some(offchain_tx_pool_factory.clone()),
1167                network_provider: Arc::new(network_service.clone()),
1168                enable_http_requests: true,
1169                custom_extensions: |_| vec![],
1170            })?;
1171        task_manager.spawn_handle().spawn(
1172            "offchain-workers-runner",
1173            "offchain-worker",
1174            offchain_workers
1175                .run(client.clone(), task_manager.spawn_handle())
1176                .boxed(),
1177        );
1178    }
1179
1180    // mmr offchain indexer
1181    if offchain_indexing_enabled {
1182        task_manager.spawn_essential_handle().spawn_blocking(
1183            "mmr-gadget",
1184            None,
1185            mmr_gadget::MmrGadget::start(
1186                client.clone(),
1187                backend.clone(),
1188                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1189            ),
1190        );
1191    }
1192
1193    let backoff_authoring_blocks: Option<()> = None;
1194
1195    let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1196    let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1197    let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1198    let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1199    let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1200
1201    let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1202        config.is_timekeeper,
1203        config.timekeeper_cpu_cores,
1204        client.clone(),
1205        pot_verifier.clone(),
1206        Arc::clone(&network_service),
1207        pot_gossip_notification_service,
1208        sync_service.clone(),
1209        sync_oracle.clone(),
1210    )
1211    .map_err(|error| Error::Other(error.into()))?;
1212
1213    let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1214
1215    task_manager
1216        .spawn_essential_handle()
1217        .spawn("pot-source", Some("pot"), pot_source_worker.run());
1218    task_manager
1219        .spawn_essential_handle()
1220        .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1221
1222    if config.base.role.is_authority() || config.force_new_slot_notifications {
1223        let proposer_factory = ProposerFactory::new(
1224            task_manager.spawn_handle(),
1225            client.clone(),
1226            transaction_pool.clone(),
1227            substrate_prometheus_registry.as_ref(),
1228            telemetry.as_ref().map(|x| x.handle()),
1229        );
1230
1231        let subspace_slot_worker =
1232            SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1233                client: client.clone(),
1234                env: proposer_factory,
1235                block_import,
1236                sync_oracle: sync_oracle.clone(),
1237                justification_sync_link: sync_service.clone(),
1238                force_authoring: config.base.force_authoring,
1239                backoff_authoring_blocks,
1240                subspace_link: subspace_link.clone(),
1241                segment_headers_store: segment_headers_store.clone(),
1242                block_proposal_slot_portion,
1243                max_block_proposal_slot_portion: None,
1244                telemetry: telemetry.as_ref().map(|x| x.handle()),
1245                offchain_tx_pool_factory,
1246                pot_verifier,
1247            });
1248
1249        let create_inherent_data_providers = {
1250            let client = client.clone();
1251            let segment_headers_store = segment_headers_store.clone();
1252
1253            move |parent_hash, ()| {
1254                let client = client.clone();
1255                let segment_headers_store = segment_headers_store.clone();
1256
1257                async move {
1258                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1259
1260                    let parent_header = client
1261                        .header(parent_hash)?
1262                        .expect("Parent header must always exist when block is created; qed");
1263
1264                    let parent_block_number = parent_header.number;
1265
1266                    let subspace_inherents =
1267                        sp_consensus_subspace::inherents::InherentDataProvider::new(
1268                            segment_headers_store
1269                                .segment_headers_for_block(parent_block_number + 1),
1270                        );
1271
1272                    Ok((timestamp, subspace_inherents))
1273                }
1274            }
1275        };
1276
1277        info!("🧑‍🌾 Starting Subspace Authorship worker");
1278        let slot_worker_task = sc_proof_of_time::start_slot_worker(
1279            subspace_link.chain_constants().slot_duration(),
1280            client.clone(),
1281            select_chain.clone(),
1282            subspace_slot_worker,
1283            sync_oracle.clone(),
1284            create_inherent_data_providers,
1285            pot_slot_info_stream,
1286        );
1287
1288        // Subspace authoring task is considered essential, i.e. if it fails we take down the
1289        // service with it.
1290        task_manager.spawn_essential_handle().spawn_blocking(
1291            "subspace-proposer",
1292            Some("block-authoring"),
1293            slot_worker_task,
1294        );
1295    }
1296
1297    // We replace the Substrate implementation of metrics server with our own.
1298    config.base.prometheus_config.take();
1299
1300    let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1301        network: network_service.clone(),
1302        client: client.clone(),
1303        keystore: keystore_container.keystore(),
1304        task_manager: &mut task_manager,
1305        transaction_pool: transaction_pool.clone(),
1306        rpc_builder: if enable_rpc_extensions {
1307            let client = client.clone();
1308            let new_slot_notification_stream = new_slot_notification_stream.clone();
1309            let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1310            let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1311            let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1312            let transaction_pool = transaction_pool.clone();
1313            let backend = backend.clone();
1314
1315            Box::new(move |subscription_executor| {
1316                let deps = rpc::FullDeps {
1317                    client: client.clone(),
1318                    pool: transaction_pool.clone(),
1319                    subscription_executor,
1320                    new_slot_notification_stream: new_slot_notification_stream.clone(),
1321                    reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1322                    object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1323                    archived_segment_notification_stream: archived_segment_notification_stream
1324                        .clone(),
1325                    dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1326                    segment_headers_store: segment_headers_store.clone(),
1327                    sync_oracle: sync_oracle.clone(),
1328                    kzg: subspace_link.kzg().clone(),
1329                    erasure_coding: subspace_link.erasure_coding().clone(),
1330                    backend: backend.clone(),
1331                };
1332
1333                rpc::create_full(deps).map_err(Into::into)
1334            })
1335        } else {
1336            Box::new(|_| Ok(RpcModule::new(())))
1337        },
1338        backend: backend.clone(),
1339        system_rpc_tx,
1340        config: config.base,
1341        telemetry: telemetry.as_mut(),
1342        tx_handler_controller,
1343        sync_service: sync_service.clone(),
1344    })?;
1345
1346    Ok(NewFull {
1347        task_manager,
1348        client,
1349        select_chain,
1350        network_service,
1351        xdm_gossip_notification_service,
1352        sync_service,
1353        rpc_handlers,
1354        backend,
1355        pot_slot_info_stream: additional_pot_slot_info_stream,
1356        new_slot_notification_stream,
1357        reward_signing_notification_stream,
1358        block_importing_notification_stream,
1359        object_mapping_notification_stream,
1360        archived_segment_notification_stream,
1361        network_starter,
1362        transaction_pool,
1363    })
1364}
1365
1366/// The storage key of the `ConfirmationDepthK` storage item in `pallet-runtime-configs`
1367fn confirmation_depth_storage_key() -> StorageKey {
1368    StorageKey(
1369        frame_support::storage::storage_prefix(
1370            // This is the name used for `pallet-runtime-configs` in the `construct_runtime` macro
1371            // i.e. `RuntimeConfigs: pallet_runtime_configs = 14`
1372            "RuntimeConfigs".as_bytes(),
1373            // This is the storage item name used inside `pallet-runtime-configs`
1374            "ConfirmationDepthK".as_bytes(),
1375        )
1376        .to_vec(),
1377    )
1378}
1379
1380fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1381    let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1382    let spec: serde_json::Value =
1383        serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1384    let encoded_confirmation_depth = hex::decode(
1385        spec.pointer(format!("/genesis/raw/top/{storage_key}").as_str())?
1386            .as_str()?
1387            .trim_start_matches("0x"),
1388    )
1389    .ok()?;
1390    u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1391}
1392
1393#[cfg(test)]
1394mod test {
1395    use static_assertions::const_assert_eq;
1396    use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1397    use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1398
1399    /// Runtime and archiver code must agree on the consensus block length.
1400    /// (This avoids importing all the runtime primitives code into the farmer and gateway.)
1401    #[test]
1402    fn max_block_length_consistent() {
1403        const_assert_eq!(
1404            CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1405            ARCHIVER_MAX_BLOCK_LENGTH,
1406        );
1407    }
1408}