subspace_service/
lib.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2#![feature(
3    impl_trait_in_assoc_type,
4    int_roundings,
5    type_alias_impl_trait,
6    type_changing_struct_update
7)]
8
9pub mod config;
10pub mod dsn;
11mod metrics;
12pub(crate) mod mmr;
13pub mod rpc;
14pub mod sync_from_dsn;
15mod task_spawner;
16mod utils;
17
18use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
19use crate::dsn::{DsnConfigurationError, create_dsn_instance};
20use crate::metrics::NodeMetrics;
21use crate::mmr::request_handler::MmrRequestHandler;
22use crate::sync_from_dsn::DsnPieceGetter;
23use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
24use crate::sync_from_dsn::snap_sync::snap_sync;
25use async_lock::Semaphore;
26use core::sync::atomic::{AtomicU32, Ordering};
27use cross_domain_message_gossip::xdm_gossip_peers_set_config;
28use domain_runtime_primitives::opaque::{Block as DomainBlock, Header as DomainHeader};
29use frame_system_rpc_runtime_api::AccountNonceApi;
30use futures::FutureExt;
31use futures::channel::oneshot;
32use jsonrpsee::RpcModule;
33use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi;
34use parity_scale_codec::Decode;
35use parking_lot::Mutex;
36use prometheus_client::registry::Registry;
37use sc_basic_authorship::ProposerFactory;
38use sc_chain_spec::{ChainSpec, GenesisBlockBuilder};
39use sc_client_api::execution_extensions::ExtensionsFactory;
40use sc_client_api::{
41    AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend,
42};
43use sc_consensus::{
44    BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
45    DefaultImportQueue, ImportQueue, ImportResult,
46};
47use sc_consensus_slots::SlotProportion;
48use sc_consensus_subspace::SubspaceLink;
49use sc_consensus_subspace::archiver::{
50    ArchivedSegmentNotification, ObjectMappingNotification, SegmentHeadersStore,
51    create_subspace_archiver,
52};
53use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport};
54use sc_consensus_subspace::notification::SubspaceNotificationStream;
55use sc_consensus_subspace::slot_worker::{
56    NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions,
57    SubspaceSyncOracle,
58};
59use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions};
60use sc_domains::ExtensionsFactory as DomainsExtensionFactory;
61use sc_domains::domain_block_er::execution_receipt_protocol::DomainBlockERRequestHandler;
62use sc_network::service::traits::NetworkService;
63use sc_network::{NetworkWorker, NotificationMetrics, NotificationService, Roles};
64use sc_network_sync::block_relay_protocol::BlockRelayParams;
65use sc_network_sync::engine::SyncingEngine;
66use sc_network_sync::service::network::NetworkServiceProvider;
67use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
68use sc_proof_of_time::source::{PotSlotInfo, PotSourceWorker};
69use sc_proof_of_time::verifier::PotVerifier;
70use sc_service::error::Error as ServiceError;
71use sc_service::{
72    BuildNetworkAdvancedParams, Configuration, SpawnTasksParams, TaskManager,
73    build_network_advanced, build_polkadot_syncing_strategy,
74};
75use sc_subspace_block_relay::{
76    BlockRelayConfigurationError, NetworkWrapper, build_consensus_relay,
77};
78use sc_telemetry::{Telemetry, TelemetryWorker};
79use sc_transaction_pool::TransactionPoolHandle;
80use sc_transaction_pool_api::OffchainTransactionPoolFactory;
81use sp_api::{ApiExt, ConstructRuntimeApi, Metadata, ProvideRuntimeApi};
82use sp_block_builder::BlockBuilder;
83use sp_blockchain::HeaderMetadata;
84use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
85use sp_consensus_slots::Slot;
86use sp_consensus_subspace::digests::extract_pre_digest;
87use sp_consensus_subspace::{
88    KzgExtension, PosExtension, PotExtension, PotNextSlotInput, SubspaceApi,
89};
90use sp_core::H256;
91use sp_core::offchain::OffchainDbExt;
92use sp_core::offchain::storage::OffchainDb;
93use sp_core::traits::SpawnEssentialNamed;
94use sp_domains::storage::StorageKey;
95use sp_domains::{BundleProducerElectionApi, DomainsApi};
96use sp_domains_fraud_proof::{FraudProofApi, FraudProofExtension, FraudProofHostFunctionsImpl};
97use sp_externalities::Extensions;
98use sp_messenger::MessengerApi;
99use sp_messenger_host_functions::{MessengerExtension, MessengerHostFunctionsImpl};
100use sp_mmr_primitives::MmrApi;
101use sp_objects::ObjectsApi;
102use sp_offchain::OffchainWorkerApi;
103use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header, NumberFor, Zero};
104use sp_session::SessionKeys;
105use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunctionsImpl};
106use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
107use static_assertions::const_assert;
108use std::marker::PhantomData;
109use std::num::NonZeroUsize;
110use std::sync::Arc;
111use std::time::Duration;
112use subspace_core_primitives::pieces::Record;
113use subspace_core_primitives::pot::PotSeed;
114use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
115use subspace_erasure_coding::ErasureCoding;
116use subspace_kzg::Kzg;
117use subspace_networking::libp2p::multiaddr::Protocol;
118use subspace_networking::utils::piece_provider::PieceProvider;
119use subspace_proof_of_space::Table;
120use subspace_runtime_primitives::opaque::Block;
121use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, Hash, Nonce};
122use tokio::sync::broadcast;
123use tracing::{Instrument, debug, error, info};
124pub use utils::wait_for_block_import;
125
126// There are multiple places where it is assumed that node is running on 64-bit system, refuse to
127// compile otherwise
128const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
129
130/// This is over 15 minutes of slots assuming there are no forks, should be both sufficient and not
131/// too large to handle
132const POT_VERIFIER_CACHE_SIZE: u32 = 30_000;
133const SYNC_TARGET_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
134/// Multiplier on top of outgoing connections number for piece downloading purposes
135const PIECE_PROVIDER_MULTIPLIER: usize = 10;
136
137/// Error type for Subspace service.
138#[derive(thiserror::Error, Debug)]
139pub enum Error {
140    /// IO error.
141    #[error(transparent)]
142    Io(#[from] std::io::Error),
143
144    /// Address parsing error.
145    #[error(transparent)]
146    AddrFormatInvalid(#[from] std::net::AddrParseError),
147
148    /// Substrate service error.
149    #[error(transparent)]
150    Sub(#[from] sc_service::Error),
151
152    /// Substrate consensus error.
153    #[error(transparent)]
154    Consensus(#[from] sp_consensus::Error),
155
156    /// Telemetry error.
157    #[error(transparent)]
158    Telemetry(#[from] sc_telemetry::Error),
159
160    /// Subspace networking (DSN) error.
161    #[error(transparent)]
162    SubspaceDsn(#[from] DsnConfigurationError),
163
164    /// Failed to set up block relay.
165    #[error(transparent)]
166    BlockRelay(#[from] BlockRelayConfigurationError),
167
168    /// Other.
169    #[error(transparent)]
170    Other(Box<dyn std::error::Error + Send + Sync>),
171}
172
173// Simple wrapper whose ony purpose is to convert error type
174#[derive(Clone)]
175struct BlockImportWrapper<BI>(BI);
176
177#[async_trait::async_trait]
178impl<Block, BI> BlockImport<Block> for BlockImportWrapper<BI>
179where
180    Block: BlockT,
181    BI: BlockImport<Block, Error = sc_consensus_subspace::block_import::Error<Block::Header>>
182        + Send
183        + Sync,
184{
185    type Error = sp_consensus::Error;
186
187    async fn check_block(
188        &self,
189        block: BlockCheckParams<Block>,
190    ) -> Result<ImportResult, Self::Error> {
191        self.0
192            .check_block(block)
193            .await
194            .map_err(|error| sp_consensus::Error::Other(error.into()))
195    }
196
197    async fn import_block(
198        &self,
199        block: BlockImportParams<Block>,
200    ) -> Result<ImportResult, Self::Error> {
201        self.0
202            .import_block(block)
203            .await
204            .map_err(|error| sp_consensus::Error::Other(error.into()))
205    }
206}
207
208/// Host functions required for Subspace
209#[cfg(not(feature = "runtime-benchmarks"))]
210pub type HostFunctions = (
211    sp_io::SubstrateHostFunctions,
212    sp_consensus_subspace::consensus::HostFunctions,
213    sp_domains_fraud_proof::HostFunctions,
214    sp_subspace_mmr::HostFunctions,
215    sp_messenger_host_functions::HostFunctions,
216);
217
218/// Host functions required for Subspace
219#[cfg(feature = "runtime-benchmarks")]
220pub type HostFunctions = (
221    sp_io::SubstrateHostFunctions,
222    frame_benchmarking::benchmarking::HostFunctions,
223    sp_consensus_subspace::consensus::HostFunctions,
224    sp_domains_fraud_proof::HostFunctions,
225    sp_subspace_mmr::HostFunctions,
226    sp_messenger_host_functions::HostFunctions,
227);
228
229/// Runtime executor for Subspace
230pub type RuntimeExecutor = sc_executor::WasmExecutor<HostFunctions>;
231
232/// Subspace-like full client.
233pub type FullClient<RuntimeApi> = sc_service::TFullClient<Block, RuntimeApi, RuntimeExecutor>;
234
235pub type FullBackend = sc_service::TFullBackend<Block>;
236pub type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
237
238struct SubspaceExtensionsFactory<PosTable, Client, DomainBlock> {
239    kzg: Kzg,
240    client: Arc<Client>,
241    backend: Arc<FullBackend>,
242    pot_verifier: PotVerifier,
243    domains_executor: Arc<sc_domains::RuntimeExecutor>,
244    confirmation_depth_k: BlockNumber,
245    _pos_table: PhantomData<(PosTable, DomainBlock)>,
246}
247
248impl<PosTable, Block, Client, DomainBlock> ExtensionsFactory<Block>
249    for SubspaceExtensionsFactory<PosTable, Client, DomainBlock>
250where
251    PosTable: Table,
252    Block: BlockT,
253    Block::Hash: From<H256> + Into<H256>,
254    DomainBlock: BlockT,
255    DomainBlock::Hash: Into<H256> + From<H256>,
256    Client: BlockBackend<Block>
257        + HeaderBackend<Block>
258        + ProvideRuntimeApi<Block>
259        + Send
260        + Sync
261        + 'static,
262    Client::Api: SubspaceApi<Block, PublicKey>
263        + DomainsApi<Block, DomainBlock::Header>
264        + BundleProducerElectionApi<Block, Balance>
265        + MmrApi<Block, H256, NumberFor<Block>>
266        + MessengerApi<Block, NumberFor<Block>, Block::Hash>,
267{
268    fn extensions_for(
269        &self,
270        _block_hash: Block::Hash,
271        _block_number: NumberFor<Block>,
272    ) -> Extensions {
273        let confirmation_depth_k = self.confirmation_depth_k;
274        let mut exts = Extensions::new();
275        exts.register(KzgExtension::new(self.kzg.clone()));
276        exts.register(PosExtension::new::<PosTable>());
277        exts.register(PotExtension::new({
278            let client = Arc::clone(&self.client);
279            let pot_verifier = self.pot_verifier.clone();
280
281            Box::new(
282                move |parent_hash, slot, proof_of_time, quick_verification| {
283                    let parent_hash = {
284                        let mut converted_parent_hash = Block::Hash::default();
285                        converted_parent_hash.as_mut().copy_from_slice(&parent_hash);
286                        converted_parent_hash
287                    };
288
289                    let parent_header = match client.header(parent_hash) {
290                        Ok(Some(parent_header)) => parent_header,
291                        Ok(None) => {
292                            if quick_verification {
293                                error!(
294                                    %parent_hash,
295                                    "Header not found during proof of time verification"
296                                );
297
298                                return false;
299                            } else {
300                                debug!(
301                                    %parent_hash,
302                                    "Header not found during proof of time verification"
303                                );
304
305                                // This can only happen during special sync modes there are no other
306                                // cases where parent header may not be available, hence allow it
307                                return true;
308                            }
309                        }
310                        Err(error) => {
311                            error!(
312                                %error,
313                                %parent_hash,
314                                "Failed to retrieve header during proof of time verification"
315                            );
316
317                            return false;
318                        }
319                    };
320
321                    let parent_pre_digest = match extract_pre_digest(&parent_header) {
322                        Ok(parent_pre_digest) => parent_pre_digest,
323                        Err(error) => {
324                            error!(
325                                %error,
326                                %parent_hash,
327                                parent_number = %parent_header.number(),
328                                "Failed to extract pre-digest from parent header during proof of \
329                                time verification, this must never happen"
330                            );
331
332                            return false;
333                        }
334                    };
335
336                    let parent_slot = parent_pre_digest.slot();
337                    if slot <= *parent_slot {
338                        return false;
339                    }
340
341                    let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) {
342                        Ok(pot_parameters) => pot_parameters,
343                        Err(error) => {
344                            debug!(
345                                %error,
346                                %parent_hash,
347                                parent_number = %parent_header.number(),
348                                "Failed to retrieve proof of time parameters during proof of time \
349                                verification"
350                            );
351
352                            return false;
353                        }
354                    };
355
356                    let pot_input = if parent_header.number().is_zero() {
357                        PotNextSlotInput {
358                            slot: parent_slot + Slot::from(1),
359                            slot_iterations: pot_parameters.slot_iterations(),
360                            seed: pot_verifier.genesis_seed(),
361                        }
362                    } else {
363                        let pot_info = parent_pre_digest.pot_info();
364
365                        PotNextSlotInput::derive(
366                            pot_parameters.slot_iterations(),
367                            parent_slot,
368                            pot_info.proof_of_time(),
369                            &pot_parameters.next_parameters_change(),
370                        )
371                    };
372
373                    // Ensure proof of time and future proof of time included in upcoming block are
374                    // valid
375
376                    if quick_verification {
377                        pot_verifier.try_is_output_valid(
378                            pot_input,
379                            Slot::from(slot) - parent_slot,
380                            proof_of_time,
381                            pot_parameters.next_parameters_change(),
382                        )
383                    } else {
384                        pot_verifier.is_output_valid(
385                            pot_input,
386                            Slot::from(slot) - parent_slot,
387                            proof_of_time,
388                            pot_parameters.next_parameters_change(),
389                        )
390                    }
391                },
392            )
393        }));
394
395        exts.register(FraudProofExtension::new(Arc::new(
396            FraudProofHostFunctionsImpl::<_, _, DomainBlock, _, _>::new(
397                self.client.clone(),
398                self.domains_executor.clone(),
399                move |client, executor| {
400                    let extension_factory =
401                        DomainsExtensionFactory::<_, Block, DomainBlock, _>::new(
402                            client,
403                            executor,
404                            confirmation_depth_k,
405                        );
406                    Box::new(extension_factory) as Box<dyn ExtensionsFactory<DomainBlock>>
407                },
408            ),
409        )));
410
411        exts.register(SubspaceMmrExtension::new(Arc::new(
412            SubspaceMmrHostFunctionsImpl::<Block, _>::new(
413                self.client.clone(),
414                confirmation_depth_k,
415            ),
416        )));
417
418        exts.register(MessengerExtension::new(Arc::new(
419            MessengerHostFunctionsImpl::<Block, _, DomainBlock, _>::new(
420                self.client.clone(),
421                self.domains_executor.clone(),
422            ),
423        )));
424
425        // if the offchain storage is available, then add offchain extension
426        // to generate and verify MMR proofs
427        if let Some(offchain_storage) = self.backend.offchain_storage() {
428            let offchain_db = OffchainDb::new(offchain_storage);
429            exts.register(OffchainDbExt::new(offchain_db));
430        }
431
432        exts
433    }
434}
435
436/// Other partial components returned by [`new_partial()`]
437pub struct OtherPartialComponents<RuntimeApi>
438where
439    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
440{
441    /// Subspace block import
442    pub block_import: BoxBlockImport<Block>,
443    /// Subspace link
444    pub subspace_link: SubspaceLink<Block>,
445    /// Segment headers store
446    pub segment_headers_store: SegmentHeadersStore<FullClient<RuntimeApi>>,
447    /// Proof of time verifier
448    pub pot_verifier: PotVerifier,
449    /// Approximate target block number for syncing purposes
450    pub sync_target_block_number: Arc<AtomicU32>,
451    /// Telemetry
452    pub telemetry: Option<Telemetry>,
453}
454
455type PartialComponents<RuntimeApi> = sc_service::PartialComponents<
456    FullClient<RuntimeApi>,
457    FullBackend,
458    FullSelectChain,
459    DefaultImportQueue<Block>,
460    TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
461    OtherPartialComponents<RuntimeApi>,
462>;
463
464/// Creates `PartialComponents` for Subspace client.
465#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
466pub fn new_partial<PosTable, RuntimeApi>(
467    // TODO: Stop using `Configuration` once
468    //  https://github.com/paritytech/polkadot-sdk/pull/5364 is in our fork
469    config: &Configuration,
470    // TODO: Replace with check for `ChainSyncMode` once we get rid of ^ `Configuration`
471    snap_sync: bool,
472    pot_external_entropy: &[u8],
473) -> Result<PartialComponents<RuntimeApi>, ServiceError>
474where
475    PosTable: Table,
476    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
477    RuntimeApi::RuntimeApi: ApiExt<Block>
478        + Metadata<Block>
479        + BlockBuilder<Block>
480        + OffchainWorkerApi<Block>
481        + SessionKeys<Block>
482        + TaggedTransactionQueue<Block>
483        + SubspaceApi<Block, PublicKey>
484        + DomainsApi<Block, DomainHeader>
485        + FraudProofApi<Block, DomainHeader>
486        + BundleProducerElectionApi<Block, Balance>
487        + ObjectsApi<Block>
488        + MmrApi<Block, H256, NumberFor<Block>>
489        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
490{
491    let telemetry = config
492        .telemetry_endpoints
493        .clone()
494        .filter(|x| !x.is_empty())
495        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
496            let worker = TelemetryWorker::new(16)?;
497            let telemetry = worker.handle().new_telemetry(endpoints);
498            Ok((worker, telemetry))
499        })
500        .transpose()?;
501
502    let executor = sc_service::new_wasm_executor(&config.executor);
503    let domains_executor = sc_service::new_wasm_executor(&config.executor);
504
505    let confirmation_depth_k = extract_confirmation_depth(config.chain_spec.as_ref()).ok_or(
506        ServiceError::Other("Failed to extract confirmation depth from chain spec".to_string()),
507    )?;
508
509    let backend = Arc::new(sc_client_db::Backend::new(
510        config.db_config(),
511        confirmation_depth_k.into(),
512    )?);
513
514    let genesis_block_builder = GenesisBlockBuilder::new(
515        config.chain_spec.as_storage_builder(),
516        !snap_sync,
517        backend.clone(),
518        executor.clone(),
519    )?;
520
521    let (client, backend, keystore_container, task_manager) =
522        sc_service::new_full_parts_with_genesis_builder::<Block, RuntimeApi, _, _>(
523            config,
524            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
525            executor.clone(),
526            backend,
527            genesis_block_builder,
528            false,
529        )?;
530
531    // TODO: Make these explicit arguments we no longer use Substate's `Configuration`
532    let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| {
533        rayon::join(Kzg::new, || {
534            ErasureCoding::new(
535                NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
536                    .expect("Not zero; qed"),
537            )
538            .map_err(|error| format!("Failed to instantiate erasure coding: {error}"))
539        })
540    });
541    let erasure_coding = maybe_erasure_coding?;
542
543    let client = Arc::new(client);
544    let client_info = client.info();
545    let chain_constants = client
546        .runtime_api()
547        .chain_constants(client_info.best_hash)
548        .map_err(|error| ServiceError::Application(error.into()))?;
549
550    let pot_verifier = PotVerifier::new(
551        PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy),
552        POT_VERIFIER_CACHE_SIZE,
553    );
554
555    // ensure the extracted confirmation_depth matches the one from runtime
556    assert_eq!(confirmation_depth_k, chain_constants.confirmation_depth_k());
557
558    client
559        .execution_extensions()
560        .set_extensions_factory(SubspaceExtensionsFactory::<PosTable, _, DomainBlock> {
561            kzg: kzg.clone(),
562            client: Arc::clone(&client),
563            pot_verifier: pot_verifier.clone(),
564            domains_executor: Arc::new(domains_executor),
565            backend: backend.clone(),
566            confirmation_depth_k: chain_constants.confirmation_depth_k(),
567            _pos_table: PhantomData,
568        });
569
570    let telemetry = telemetry.map(|(worker, telemetry)| {
571        task_manager
572            .spawn_handle()
573            .spawn("telemetry", None, worker.run());
574        telemetry
575    });
576
577    let select_chain = sc_consensus::LongestChain::new(backend.clone());
578
579    let segment_headers_store = tokio::task::block_in_place(|| {
580        SegmentHeadersStore::new(client.clone(), chain_constants.confirmation_depth_k())
581    })
582    .map_err(|error| ServiceError::Application(error.into()))?;
583
584    let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding);
585    let segment_headers_store = segment_headers_store.clone();
586
587    let block_import = SubspaceBlockImport::<PosTable, _, _, _, _, _>::new(
588        client.clone(),
589        client.clone(),
590        subspace_link.clone(),
591        {
592            let client = client.clone();
593            let segment_headers_store = segment_headers_store.clone();
594
595            move |parent_hash, ()| {
596                let client = client.clone();
597                let segment_headers_store = segment_headers_store.clone();
598
599                async move {
600                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
601
602                    let parent_header = client
603                        .header(parent_hash)?
604                        .expect("Parent header must always exist when block is created; qed");
605
606                    let parent_block_number = parent_header.number;
607
608                    let subspace_inherents =
609                        sp_consensus_subspace::inherents::InherentDataProvider::new(
610                            segment_headers_store
611                                .segment_headers_for_block(parent_block_number + 1),
612                        );
613
614                    Ok((timestamp, subspace_inherents))
615                }
616            }
617        },
618        segment_headers_store.clone(),
619        pot_verifier.clone(),
620    );
621
622    let sync_target_block_number = Arc::new(AtomicU32::new(0));
623    let transaction_pool = Arc::from(
624        sc_transaction_pool::Builder::new(
625            task_manager.spawn_essential_handle(),
626            client.clone(),
627            config.role.is_authority().into(),
628        )
629        .with_options(config.transaction_pool.clone())
630        .with_prometheus(config.prometheus_registry())
631        .build(),
632    );
633
634    let verifier = SubspaceVerifier::<PosTable, _, _>::new(SubspaceVerifierOptions {
635        client: client.clone(),
636        chain_constants,
637        kzg,
638        telemetry: telemetry.as_ref().map(|x| x.handle()),
639        reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
640        sync_target_block_number: Arc::clone(&sync_target_block_number),
641        is_authoring_blocks: config.role.is_authority(),
642        pot_verifier: pot_verifier.clone(),
643    });
644
645    let import_queue = BasicQueue::new(
646        verifier,
647        Box::new(BlockImportWrapper(block_import.clone())),
648        None,
649        &task_manager.spawn_essential_handle(),
650        config.prometheus_registry(),
651    );
652
653    let other = OtherPartialComponents {
654        block_import: Box::new(BlockImportWrapper(block_import.clone())),
655        subspace_link,
656        segment_headers_store,
657        pot_verifier,
658        sync_target_block_number,
659        telemetry,
660    };
661
662    Ok(PartialComponents {
663        client,
664        backend,
665        task_manager,
666        import_queue,
667        keystore_container,
668        select_chain,
669        transaction_pool,
670        other,
671    })
672}
673
674/// Full node along with some other components.
675pub struct NewFull<Client>
676where
677    Client: ProvideRuntimeApi<Block>
678        + AuxStore
679        + BlockBackend<Block>
680        + BlockIdTo<Block>
681        + HeaderBackend<Block>
682        + HeaderMetadata<Block, Error = sp_blockchain::Error>
683        + 'static,
684    Client::Api: TaggedTransactionQueue<Block>
685        + DomainsApi<Block, DomainHeader>
686        + FraudProofApi<Block, DomainHeader>
687        + SubspaceApi<Block, PublicKey>
688        + MmrApi<Block, H256, NumberFor<Block>>
689        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
690{
691    /// Task manager.
692    pub task_manager: TaskManager,
693    /// Full client.
694    pub client: Arc<Client>,
695    /// Chain selection rule.
696    pub select_chain: FullSelectChain,
697    /// Network service.
698    pub network_service: Arc<dyn NetworkService + Send + Sync>,
699    /// Cross-domain gossip notification service.
700    pub xdm_gossip_notification_service: Box<dyn NotificationService>,
701    /// Sync service.
702    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
703    /// RPC handlers.
704    pub rpc_handlers: sc_service::RpcHandlers,
705    /// Full client backend.
706    pub backend: Arc<FullBackend>,
707    /// Pot slot info stream.
708    pub pot_slot_info_stream: broadcast::Receiver<PotSlotInfo>,
709    /// New slot stream.
710    /// Note: this is currently used to send solutions from the farmer during tests.
711    pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
712    /// Block signing stream.
713    pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
714    /// Stream of notifications about blocks about to be imported.
715    pub block_importing_notification_stream:
716        SubspaceNotificationStream<BlockImportingNotification<Block>>,
717    /// Archived object mapping stream.
718    pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
719    /// Archived segment stream.
720    pub archived_segment_notification_stream:
721        SubspaceNotificationStream<ArchivedSegmentNotification>,
722    /// Transaction pool.
723    pub transaction_pool: Arc<TransactionPoolHandle<Block, Client>>,
724}
725
726type FullNode<RuntimeApi> = NewFull<FullClient<RuntimeApi>>;
727
728/// Builds a new service for a full client.
729pub async fn new_full<PosTable, RuntimeApi>(
730    mut config: SubspaceConfiguration,
731    partial_components: PartialComponents<RuntimeApi>,
732    prometheus_registry: Option<&mut Registry>,
733    enable_rpc_extensions: bool,
734    block_proposal_slot_portion: SlotProportion,
735    consensus_snap_sync_target_block_receiver: Option<broadcast::Receiver<BlockNumber>>,
736) -> Result<FullNode<RuntimeApi>, Error>
737where
738    PosTable: Table,
739    RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
740    RuntimeApi::RuntimeApi: ApiExt<Block>
741        + Metadata<Block>
742        + AccountNonceApi<Block, AccountId, Nonce>
743        + BlockBuilder<Block>
744        + OffchainWorkerApi<Block>
745        + SessionKeys<Block>
746        + TaggedTransactionQueue<Block>
747        + TransactionPaymentApi<Block, Balance>
748        + SubspaceApi<Block, PublicKey>
749        + DomainsApi<Block, DomainHeader>
750        + FraudProofApi<Block, DomainHeader>
751        + ObjectsApi<Block>
752        + MmrApi<Block, Hash, BlockNumber>
753        + MessengerApi<Block, NumberFor<Block>, BlockHashFor<Block>>,
754{
755    let PartialComponents {
756        client,
757        backend,
758        mut task_manager,
759        import_queue,
760        keystore_container,
761        select_chain,
762        transaction_pool,
763        other,
764    } = partial_components;
765    let OtherPartialComponents {
766        block_import,
767        subspace_link,
768        segment_headers_store,
769        pot_verifier,
770        sync_target_block_number,
771        mut telemetry,
772    } = other;
773
774    let offchain_indexing_enabled = config.base.offchain_worker.indexing_enabled;
775    let (node, bootstrap_nodes, piece_getter) = match config.subspace_networking {
776        SubspaceNetworking::Reuse {
777            node,
778            bootstrap_nodes,
779            piece_getter,
780        } => (node, bootstrap_nodes, piece_getter),
781        SubspaceNetworking::Create { config: dsn_config } => {
782            let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash);
783
784            debug!(
785                chain_type=?config.base.chain_spec.chain_type(),
786                genesis_hash=%hex::encode(client.chain_info().genesis_hash),
787                "Setting DSN protocol version..."
788            );
789
790            let out_connections = dsn_config.max_out_connections;
791            let (node, mut node_runner) = create_dsn_instance(
792                dsn_protocol_version,
793                dsn_config.clone(),
794                prometheus_registry,
795            )?;
796
797            info!("Subspace networking initialized: Node ID is {}", node.id());
798
799            node.on_new_listener(Arc::new({
800                let node = node.clone();
801
802                move |address| {
803                    info!(
804                        "DSN listening on {}",
805                        address.clone().with(Protocol::P2p(node.id()))
806                    );
807                }
808            }))
809            .detach();
810
811            task_manager
812                .spawn_essential_handle()
813                .spawn_essential_blocking(
814                    "node-runner",
815                    Some("subspace-networking"),
816                    Box::pin(
817                        async move {
818                            node_runner.run().await;
819                        }
820                        .in_current_span(),
821                    ),
822                );
823
824            let piece_provider = PieceProvider::new(
825                node.clone(),
826                SegmentCommitmentPieceValidator::new(
827                    node.clone(),
828                    subspace_link.kzg().clone(),
829                    segment_headers_store.clone(),
830                ),
831                Arc::new(Semaphore::new(
832                    out_connections as usize * PIECE_PROVIDER_MULTIPLIER,
833                )),
834            );
835
836            (
837                node,
838                dsn_config.bootstrap_nodes,
839                Arc::new(DsnPieceGetter::new(piece_provider)) as _,
840            )
841        }
842    };
843
844    let dsn_bootstrap_nodes = {
845        // Fall back to node itself as bootstrap node for DSN so farmer always has someone to
846        // connect to
847        if bootstrap_nodes.is_empty() {
848            let (node_address_sender, node_address_receiver) = oneshot::channel();
849            let _handler = node.on_new_listener(Arc::new({
850                let node_address_sender = Mutex::new(Some(node_address_sender));
851
852                move |address| {
853                    if matches!(address.iter().next(), Some(Protocol::Ip4(_)))
854                        && let Some(node_address_sender) = node_address_sender.lock().take()
855                        && let Err(err) = node_address_sender.send(address.clone())
856                    {
857                        debug!(?err, "Couldn't send a node address to the channel.");
858                    }
859                }
860            }));
861
862            let mut node_listeners = node.listeners();
863
864            if node_listeners.is_empty() {
865                let Ok(listener) = node_address_receiver.await else {
866                    return Err(Error::Other(
867                        "Oneshot receiver dropped before DSN node listener was ready"
868                            .to_string()
869                            .into(),
870                    ));
871                };
872
873                node_listeners = vec![listener];
874            }
875
876            node_listeners.iter_mut().for_each(|multiaddr| {
877                multiaddr.push(Protocol::P2p(node.id()));
878            });
879
880            node_listeners
881        } else {
882            bootstrap_nodes
883        }
884    };
885
886    let substrate_prometheus_registry = config
887        .base
888        .prometheus_config
889        .as_ref()
890        .map(|prometheus_config| prometheus_config.registry.clone());
891    let import_queue_service1 = import_queue.service();
892    let import_queue_service2 = import_queue.service();
893    let network_wrapper = Arc::new(NetworkWrapper::default());
894    let block_relay = build_consensus_relay(
895        network_wrapper.clone(),
896        client.clone(),
897        transaction_pool.clone(),
898        substrate_prometheus_registry.as_ref(),
899    )
900    .map_err(Error::BlockRelay)?;
901    let mut net_config = sc_network::config::FullNetworkConfiguration::new(
902        &config.base.network,
903        substrate_prometheus_registry.clone(),
904    );
905    let (xdm_gossip_notification_config, xdm_gossip_notification_service) =
906        xdm_gossip_peers_set_config();
907    net_config.add_notification_protocol(xdm_gossip_notification_config);
908    let (pot_gossip_notification_config, pot_gossip_notification_service) =
909        pot_gossip_peers_set_config();
910    net_config.add_notification_protocol(pot_gossip_notification_config);
911    let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
912
913    let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
914        + net_config
915            .network_config
916            .default_peers_set
917            .reserved_nodes
918            .len();
919
920    let protocol_id = config.base.protocol_id();
921    let fork_id = config.base.chain_spec.fork_id();
922
923    // enable domain block ER request handler
924    let (handler, protocol_config) = DomainBlockERRequestHandler::new::<
925        NetworkWorker<Block, BlockHashFor<Block>>,
926    >(fork_id, client.clone(), num_peer_hint);
927    task_manager.spawn_handle().spawn(
928        "domain-block-er-request-handler",
929        Some("networking"),
930        handler.run(),
931    );
932    net_config.add_request_response_protocol(protocol_config);
933
934    if let Some(offchain_storage) = backend.offchain_storage() {
935        // Allow both outgoing and incoming requests.
936        let (handler, protocol_config) =
937            MmrRequestHandler::new::<NetworkWorker<Block, BlockHashFor<Block>>>(
938                &config.base.protocol_id(),
939                fork_id,
940                client.clone(),
941                num_peer_hint,
942                offchain_storage,
943            );
944        task_manager
945            .spawn_handle()
946            .spawn("mmr-request-handler", Some("networking"), handler.run());
947
948        net_config.add_request_response_protocol(protocol_config);
949    }
950
951    let network_service_provider = NetworkServiceProvider::new();
952    let network_service_handle = network_service_provider.handle();
953    let (network_service, system_rpc_tx, tx_handler_controller, sync_service) = {
954        let spawn_handle = task_manager.spawn_handle();
955        let metrics = NotificationMetrics::new(substrate_prometheus_registry.as_ref());
956
957        // TODO: Remove BlockRelayParams here and simplify relay initialization
958        let block_downloader = {
959            let BlockRelayParams {
960                mut server,
961                downloader,
962                request_response_config,
963            } = block_relay;
964            net_config.add_request_response_protocol(request_response_config);
965
966            spawn_handle.spawn("block-request-handler", Some("networking"), async move {
967                server.run().await;
968            });
969
970            downloader
971        };
972
973        let syncing_strategy = build_polkadot_syncing_strategy(
974            protocol_id.clone(),
975            fork_id,
976            &mut net_config,
977            None,
978            block_downloader,
979            client.clone(),
980            &spawn_handle,
981            substrate_prometheus_registry.as_ref(),
982        )?;
983
984        let (syncing_engine, sync_service, block_announce_config) =
985            SyncingEngine::new::<NetworkWorker<_, _>>(
986                Roles::from(&config.base.role),
987                Arc::clone(&client),
988                substrate_prometheus_registry.as_ref(),
989                metrics.clone(),
990                &net_config,
991                protocol_id.clone(),
992                fork_id,
993                Box::new(DefaultBlockAnnounceValidator),
994                syncing_strategy,
995                network_service_provider.handle(),
996                import_queue.service(),
997                net_config.peer_store_handle(),
998                config.base.network.force_synced,
999            )
1000            .map_err(sc_service::Error::from)?;
1001
1002        spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1003
1004        build_network_advanced(BuildNetworkAdvancedParams {
1005            role: config.base.role,
1006            protocol_id,
1007            fork_id,
1008            ipfs_server: config.base.network.ipfs_server,
1009            announce_block: config.base.announce_block,
1010            net_config,
1011            client: Arc::clone(&client),
1012            transaction_pool: Arc::clone(&transaction_pool),
1013            spawn_handle,
1014            import_queue,
1015            sync_service,
1016            block_announce_config,
1017            network_service_provider,
1018            metrics_registry: substrate_prometheus_registry.as_ref(),
1019            metrics,
1020        })?
1021    };
1022
1023    task_manager.spawn_handle().spawn(
1024        "sync-target-follower",
1025        None,
1026        Box::pin({
1027            let sync_service = sync_service.clone();
1028            let sync_target_block_number = Arc::clone(&sync_target_block_number);
1029
1030            async move {
1031                loop {
1032                    let best_seen_block = sync_service
1033                        .status()
1034                        .await
1035                        .map(|status| status.best_seen_block.unwrap_or_default())
1036                        .unwrap_or_default();
1037                    sync_target_block_number.store(best_seen_block, Ordering::Relaxed);
1038
1039                    tokio::time::sleep(SYNC_TARGET_UPDATE_INTERVAL).await;
1040                }
1041            }
1042        }),
1043    );
1044
1045    let sync_oracle = SubspaceSyncOracle::new(
1046        config.base.force_authoring,
1047        Arc::clone(&pause_sync),
1048        sync_service.clone(),
1049    );
1050
1051    let subspace_archiver = tokio::task::block_in_place(|| {
1052        create_subspace_archiver(
1053            segment_headers_store.clone(),
1054            subspace_link.clone(),
1055            client.clone(),
1056            sync_oracle.clone(),
1057            telemetry.as_ref().map(|telemetry| telemetry.handle()),
1058            config.create_object_mappings,
1059        )
1060    })
1061    .map_err(ServiceError::Client)?;
1062
1063    task_manager
1064        .spawn_essential_handle()
1065        .spawn_essential_blocking(
1066            "subspace-archiver",
1067            None,
1068            Box::pin(async move {
1069                if let Err(error) = subspace_archiver.await {
1070                    error!(%error, "Archiver exited with error");
1071                }
1072            }),
1073        );
1074
1075    network_wrapper.set(network_service.clone());
1076
1077    if !config.base.network.force_synced {
1078        // Start with DSN sync in this case
1079        pause_sync.store(true, Ordering::Release);
1080    }
1081
1082    let snap_sync_task = snap_sync(
1083        segment_headers_store.clone(),
1084        node.clone(),
1085        fork_id.map(|fork_id| fork_id.to_string()),
1086        Arc::clone(&client),
1087        import_queue_service1,
1088        pause_sync.clone(),
1089        piece_getter.clone(),
1090        sync_service.clone(),
1091        network_service_handle,
1092        subspace_link.erasure_coding().clone(),
1093        consensus_snap_sync_target_block_receiver,
1094        backend.offchain_storage(),
1095        network_service.clone(),
1096    );
1097
1098    let (observer, worker) = sync_from_dsn::create_observer_and_worker(
1099        segment_headers_store.clone(),
1100        Arc::clone(&network_service),
1101        node.clone(),
1102        Arc::clone(&client),
1103        import_queue_service2,
1104        sync_service.clone(),
1105        sync_target_block_number,
1106        pause_sync,
1107        piece_getter,
1108        subspace_link.erasure_coding().clone(),
1109    );
1110    task_manager
1111        .spawn_handle()
1112        .spawn("observer", Some("sync-from-dsn"), observer);
1113    task_manager
1114        .spawn_essential_handle()
1115        .spawn_essential_blocking(
1116            "worker",
1117            Some("sync-from-dsn"),
1118            Box::pin(async move {
1119                // Run snap-sync before DSN-sync.
1120                if config.sync == ChainSyncMode::Snap
1121                    && let Err(error) = snap_sync_task.in_current_span().await
1122                {
1123                    error!(%error, "Snap sync exited with a fatal error");
1124                    return;
1125                }
1126
1127                if let Err(error) = worker.await {
1128                    error!(%error, "Sync from DSN exited with an error");
1129                }
1130            }),
1131        );
1132
1133    if let Some(registry) = substrate_prometheus_registry.as_ref() {
1134        match NodeMetrics::new(
1135            client.clone(),
1136            client.every_import_notification_stream(),
1137            registry,
1138        ) {
1139            Ok(node_metrics) => {
1140                task_manager.spawn_handle().spawn(
1141                    "node_metrics",
1142                    None,
1143                    Box::pin(async move {
1144                        node_metrics.run().await;
1145                    }),
1146                );
1147            }
1148            Err(err) => {
1149                error!("Failed to initialize node metrics: {err:?}");
1150            }
1151        }
1152    }
1153
1154    let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
1155
1156    if config.base.offchain_worker.enabled {
1157        let offchain_workers =
1158            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1159                runtime_api_provider: client.clone(),
1160                is_validator: config.base.role.is_authority(),
1161                keystore: Some(keystore_container.keystore()),
1162                offchain_db: backend.offchain_storage(),
1163                transaction_pool: Some(offchain_tx_pool_factory.clone()),
1164                network_provider: Arc::new(network_service.clone()),
1165                enable_http_requests: true,
1166                custom_extensions: |_| vec![],
1167            })?;
1168        task_manager.spawn_handle().spawn(
1169            "offchain-workers-runner",
1170            "offchain-worker",
1171            offchain_workers
1172                .run(client.clone(), task_manager.spawn_handle())
1173                .boxed(),
1174        );
1175    }
1176
1177    // mmr offchain indexer
1178    if offchain_indexing_enabled {
1179        task_manager.spawn_essential_handle().spawn_blocking(
1180            "mmr-gadget",
1181            None,
1182            mmr_gadget::MmrGadget::start(
1183                client.clone(),
1184                backend.clone(),
1185                sp_mmr_primitives::INDEXING_PREFIX.to_vec(),
1186            ),
1187        );
1188    }
1189
1190    let backoff_authoring_blocks: Option<()> = None;
1191
1192    let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
1193    let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream();
1194    let block_importing_notification_stream = subspace_link.block_importing_notification_stream();
1195    let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream();
1196    let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
1197
1198    let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new(
1199        config.is_timekeeper,
1200        config.timekeeper_cpu_cores,
1201        client.clone(),
1202        pot_verifier.clone(),
1203        Arc::clone(&network_service),
1204        pot_gossip_notification_service,
1205        sync_service.clone(),
1206        sync_oracle.clone(),
1207    )
1208    .map_err(|error| Error::Other(error.into()))?;
1209
1210    let additional_pot_slot_info_stream = pot_source_worker.subscribe_pot_slot_info_stream();
1211
1212    task_manager
1213        .spawn_essential_handle()
1214        .spawn("pot-source", Some("pot"), pot_source_worker.run());
1215    task_manager
1216        .spawn_essential_handle()
1217        .spawn("pot-gossip", Some("pot"), pot_gossip_worker.run());
1218
1219    if config.base.role.is_authority() || config.force_new_slot_notifications {
1220        let proposer_factory = ProposerFactory::new(
1221            task_manager.spawn_handle(),
1222            client.clone(),
1223            transaction_pool.clone(),
1224            substrate_prometheus_registry.as_ref(),
1225            telemetry.as_ref().map(|x| x.handle()),
1226        );
1227
1228        let subspace_slot_worker =
1229            SubspaceSlotWorker::<PosTable, _, _, _, _, _, _, _>::new(SubspaceSlotWorkerOptions {
1230                client: client.clone(),
1231                env: proposer_factory,
1232                block_import,
1233                sync_oracle: sync_oracle.clone(),
1234                justification_sync_link: sync_service.clone(),
1235                force_authoring: config.base.force_authoring,
1236                backoff_authoring_blocks,
1237                subspace_link: subspace_link.clone(),
1238                segment_headers_store: segment_headers_store.clone(),
1239                block_proposal_slot_portion,
1240                max_block_proposal_slot_portion: None,
1241                telemetry: telemetry.as_ref().map(|x| x.handle()),
1242                offchain_tx_pool_factory,
1243                pot_verifier,
1244            });
1245
1246        let create_inherent_data_providers = {
1247            let client = client.clone();
1248            let segment_headers_store = segment_headers_store.clone();
1249
1250            move |parent_hash, ()| {
1251                let client = client.clone();
1252                let segment_headers_store = segment_headers_store.clone();
1253
1254                async move {
1255                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
1256
1257                    let parent_header = client
1258                        .header(parent_hash)?
1259                        .expect("Parent header must always exist when block is created; qed");
1260
1261                    let parent_block_number = parent_header.number;
1262
1263                    let subspace_inherents =
1264                        sp_consensus_subspace::inherents::InherentDataProvider::new(
1265                            segment_headers_store
1266                                .segment_headers_for_block(parent_block_number + 1),
1267                        );
1268
1269                    Ok((timestamp, subspace_inherents))
1270                }
1271            }
1272        };
1273
1274        info!("🧑‍🌾 Starting Subspace Authorship worker");
1275        let slot_worker_task = sc_proof_of_time::start_slot_worker(
1276            subspace_link.chain_constants().slot_duration(),
1277            client.clone(),
1278            select_chain.clone(),
1279            subspace_slot_worker,
1280            sync_oracle.clone(),
1281            create_inherent_data_providers,
1282            pot_slot_info_stream,
1283        );
1284
1285        // Subspace authoring task is considered essential, i.e. if it fails we take down the
1286        // service with it.
1287        task_manager.spawn_essential_handle().spawn_blocking(
1288            "subspace-proposer",
1289            Some("block-authoring"),
1290            slot_worker_task,
1291        );
1292    }
1293
1294    // We replace the Substrate implementation of metrics server with our own.
1295    config.base.prometheus_config.take();
1296
1297    let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
1298        network: network_service.clone(),
1299        client: client.clone(),
1300        keystore: keystore_container.keystore(),
1301        task_manager: &mut task_manager,
1302        transaction_pool: transaction_pool.clone(),
1303        rpc_builder: if enable_rpc_extensions {
1304            let client = client.clone();
1305            let new_slot_notification_stream = new_slot_notification_stream.clone();
1306            let reward_signing_notification_stream = reward_signing_notification_stream.clone();
1307            let object_mapping_notification_stream = object_mapping_notification_stream.clone();
1308            let archived_segment_notification_stream = archived_segment_notification_stream.clone();
1309            let transaction_pool = transaction_pool.clone();
1310            let backend = backend.clone();
1311
1312            Box::new(move |subscription_executor| {
1313                let deps = rpc::FullDeps {
1314                    client: client.clone(),
1315                    pool: transaction_pool.clone(),
1316                    subscription_executor,
1317                    new_slot_notification_stream: new_slot_notification_stream.clone(),
1318                    reward_signing_notification_stream: reward_signing_notification_stream.clone(),
1319                    object_mapping_notification_stream: object_mapping_notification_stream.clone(),
1320                    archived_segment_notification_stream: archived_segment_notification_stream
1321                        .clone(),
1322                    dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
1323                    segment_headers_store: segment_headers_store.clone(),
1324                    sync_oracle: sync_oracle.clone(),
1325                    kzg: subspace_link.kzg().clone(),
1326                    erasure_coding: subspace_link.erasure_coding().clone(),
1327                    backend: backend.clone(),
1328                };
1329
1330                rpc::create_full(deps).map_err(Into::into)
1331            })
1332        } else {
1333            Box::new(|_| Ok(RpcModule::new(())))
1334        },
1335        backend: backend.clone(),
1336        system_rpc_tx,
1337        config: config.base,
1338        telemetry: telemetry.as_mut(),
1339        tx_handler_controller,
1340        sync_service: sync_service.clone(),
1341    })?;
1342
1343    Ok(NewFull {
1344        task_manager,
1345        client,
1346        select_chain,
1347        network_service,
1348        xdm_gossip_notification_service,
1349        sync_service,
1350        rpc_handlers,
1351        backend,
1352        pot_slot_info_stream: additional_pot_slot_info_stream,
1353        new_slot_notification_stream,
1354        reward_signing_notification_stream,
1355        block_importing_notification_stream,
1356        object_mapping_notification_stream,
1357        archived_segment_notification_stream,
1358        transaction_pool,
1359    })
1360}
1361
1362/// The storage key of the `ConfirmationDepthK` storage item in `pallet-runtime-configs`
1363fn confirmation_depth_storage_key() -> StorageKey {
1364    StorageKey(
1365        frame_support::storage::storage_prefix(
1366            // This is the name used for `pallet-runtime-configs` in the `construct_runtime` macro
1367            // i.e. `RuntimeConfigs: pallet_runtime_configs = 14`
1368            "RuntimeConfigs".as_bytes(),
1369            // This is the storage item name used inside `pallet-runtime-configs`
1370            "ConfirmationDepthK".as_bytes(),
1371        )
1372        .to_vec(),
1373    )
1374}
1375
1376fn extract_confirmation_depth(chain_spec: &dyn ChainSpec) -> Option<u32> {
1377    let storage_key = format!("0x{}", hex::encode(confirmation_depth_storage_key().0));
1378    let spec: serde_json::Value =
1379        serde_json::from_str(chain_spec.as_json(true).ok()?.as_str()).ok()?;
1380    let encoded_confirmation_depth = hex::decode(
1381        spec.pointer(format!("/genesis/raw/top/{storage_key}").as_str())?
1382            .as_str()?
1383            .trim_start_matches("0x"),
1384    )
1385    .ok()?;
1386    u32::decode(&mut encoded_confirmation_depth.as_slice()).ok()
1387}
1388
1389#[cfg(test)]
1390mod test {
1391    use static_assertions::const_assert_eq;
1392    use subspace_data_retrieval::object_fetcher::MAX_BLOCK_LENGTH as ARCHIVER_MAX_BLOCK_LENGTH;
1393    use subspace_runtime_primitives::MAX_BLOCK_LENGTH as CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH;
1394
1395    /// Runtime and archiver code must agree on the consensus block length.
1396    /// (This avoids importing all the runtime primitives code into the farmer and gateway.)
1397    #[test]
1398    fn max_block_length_consistent() {
1399        const_assert_eq!(
1400            CONSENSUS_RUNTIME_MAX_BLOCK_LENGTH,
1401            ARCHIVER_MAX_BLOCK_LENGTH,
1402        );
1403    }
1404}