subspace_service/
task_spawner.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17use sc_client_api::{
18    BlockBackend, BlockchainEvents, ExecutorProvider, ProofProvider, StorageProvider, UsageProvider,
19};
20use sc_network::multiaddr::Protocol;
21use sc_network::Multiaddr;
22use sc_rpc_api::DenyUnsafe;
23use sc_service::{
24    gen_rpc_module, init_telemetry, propagate_transaction_notifications, start_rpc_servers, Error,
25    MetricsService, RpcHandlers, SpawnTasksParams,
26};
27use sc_transaction_pool_api::MaintainedTransactionPool;
28use sp_api::{CallApiAt, ProvideRuntimeApi};
29use sp_blockchain::{HeaderBackend, HeaderMetadata};
30use sp_consensus::block_validation::Chain;
31use sp_runtime::traits::{Block as BlockT, BlockIdTo};
32use std::sync::Arc;
33use tracing::info;
34
35/// Spawn the tasks that are required to run a node.
36#[expect(clippy::result_large_err, reason = "Comes from Substrate")]
37pub(super) fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
38    params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
39) -> Result<RpcHandlers, Error>
40where
41    TCl: ProvideRuntimeApi<TBl>
42        + HeaderMetadata<TBl, Error = sp_blockchain::Error>
43        + Chain<TBl>
44        + BlockBackend<TBl>
45        + BlockIdTo<TBl, Error = sp_blockchain::Error>
46        + ProofProvider<TBl>
47        + HeaderBackend<TBl>
48        + BlockchainEvents<TBl>
49        + ExecutorProvider<TBl>
50        + UsageProvider<TBl>
51        + StorageProvider<TBl, TBackend>
52        + CallApiAt<TBl>
53        + Send
54        + 'static,
55    TCl::Api: sp_api::Metadata<TBl>
56        + sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
57        + sp_session::SessionKeys<TBl>
58        + sp_api::ApiExt<TBl>,
59    TBl: BlockT,
60    TBl::Hash: Unpin,
61    TBl::Header: Unpin,
62    TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
63    TExPool: MaintainedTransactionPool<Block = TBl, Hash = TBl::Hash> + 'static,
64{
65    let SpawnTasksParams {
66        // TODO: Stop using `Configuration` once
67        //  https://github.com/paritytech/polkadot-sdk/pull/5364 is in our fork
68        mut config,
69        task_manager,
70        client,
71        backend,
72        keystore,
73        transaction_pool,
74        rpc_builder,
75        network,
76        system_rpc_tx,
77        tx_handler_controller,
78        sync_service,
79        telemetry,
80    } = params;
81
82    let chain_info = client.usage_info().chain;
83
84    let sysinfo = sc_sysinfo::gather_sysinfo();
85    sc_sysinfo::print_sysinfo(&sysinfo);
86
87    let telemetry = telemetry
88        .map(|telemetry| {
89            init_telemetry(
90                config.network.node_name.clone(),
91                config.impl_name.clone(),
92                config.impl_version.clone(),
93                config.chain_spec.name().to_string(),
94                config.role.is_authority(),
95                network.clone(),
96                client.clone(),
97                telemetry,
98                Some(sysinfo),
99            )
100        })
101        .transpose()?;
102
103    info!("📦 Highest known block at #{}", chain_info.best_number);
104
105    let spawn_handle = task_manager.spawn_handle();
106
107    // Inform the tx pool about imported and finalized blocks.
108    spawn_handle.spawn(
109        "txpool-notifications",
110        Some("transaction-pool"),
111        sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
112    );
113
114    spawn_handle.spawn(
115        "on-transaction-imported",
116        Some("transaction-pool"),
117        propagate_transaction_notifications(
118            transaction_pool.clone(),
119            tx_handler_controller,
120            telemetry.clone(),
121        ),
122    );
123
124    // Periodically updated metrics and telemetry updates.
125    spawn_handle.spawn(
126        "telemetry-periodic-send",
127        None,
128        MetricsService::new(telemetry).run(
129            client.clone(),
130            transaction_pool.clone(),
131            network.clone(),
132            sync_service.clone(),
133        ),
134    );
135
136    let rpc_id_provider = config.rpc.id_provider.take();
137
138    // jsonrpsee RPC
139    let gen_rpc_module = || {
140        gen_rpc_module(
141            task_manager.spawn_handle(),
142            client.clone(),
143            transaction_pool.clone(),
144            keystore.clone(),
145            system_rpc_tx.clone(),
146            config.impl_name.clone(),
147            config.impl_version.clone(),
148            config.chain_spec.as_ref(),
149            &config.state_pruning,
150            config.blocks_pruning,
151            backend.clone(),
152            &*rpc_builder,
153        )
154    };
155
156    let rpc_server_handle = start_rpc_servers(
157        &config.rpc,
158        config.prometheus_registry(),
159        &config.tokio_handle,
160        gen_rpc_module,
161        rpc_id_provider,
162    )?;
163
164    let listen_addrs = rpc_server_handle
165        .listen_addrs()
166        .iter()
167        .map(|socket_addr| {
168            let mut multiaddr: Multiaddr = socket_addr.ip().into();
169            multiaddr.push(Protocol::Tcp(socket_addr.port()));
170            multiaddr
171        })
172        .collect();
173
174    let in_memory_rpc = {
175        let mut module = gen_rpc_module()?;
176        module.extensions_mut().insert(DenyUnsafe::No);
177        module
178    };
179
180    let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
181    // Spawn informant task
182    spawn_handle.spawn(
183        "informant",
184        None,
185        sc_informant::build(client.clone(), network, sync_service.clone()),
186    );
187
188    task_manager.keep_alive((config.base_path, rpc_server_handle));
189
190    Ok(in_memory_rpc_handle)
191}