subspace_malicious_operator/
malicious_domain_instance_starter.rs

1use crate::malicious_bundle_producer::MaliciousBundleProducer;
2use crate::DomainCli;
3use cross_domain_message_gossip::{ChainMsg, Message};
4use domain_client_operator::snap_sync::ConsensusChainSyncParams;
5use domain_client_operator::{BootstrapResult, OperatorStreams};
6use domain_eth_service::provider::EthProvider;
7use domain_eth_service::DefaultEthConfig;
8use domain_runtime_primitives::opaque::Block as DomainBlock;
9use domain_service::providers::DefaultProvider;
10use domain_service::{FullBackend, FullClient};
11use evm_domain_runtime::AccountId as AccountId20;
12use futures::StreamExt;
13use sc_consensus_subspace::block_import::BlockImportingNotification;
14use sc_consensus_subspace::notification::SubspaceNotificationStream;
15use sc_consensus_subspace::slot_worker::NewSlotNotification;
16use sc_network::NetworkPeers;
17use sc_service::Configuration;
18use sc_transaction_pool_api::OffchainTransactionPoolFactory;
19use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
20use sp_api::ProvideRuntimeApi;
21use sp_blockchain::HeaderBackend;
22use sp_consensus_subspace::SubspaceApi;
23use sp_core::crypto::AccountId32;
24use sp_core::traits::SpawnEssentialNamed;
25use sp_domains::{DomainInstanceData, RuntimeType};
26use sp_keystore::KeystorePtr;
27use std::sync::Arc;
28use subspace_runtime::RuntimeApi as CRuntimeApi;
29use subspace_runtime_primitives::opaque::Block as CBlock;
30use subspace_runtime_primitives::{AccountId, HeaderFor, DOMAINS_BLOCK_PRUNING_DEPTH};
31use subspace_service::FullClient as CFullClient;
32
33/// `DomainInstanceStarter` used to start a domain instance node based on the given
34/// bootstrap result
35pub struct DomainInstanceStarter {
36    pub domain_cli: DomainCli,
37    pub consensus_client: Arc<CFullClient<CRuntimeApi>>,
38    pub consensus_keystore: KeystorePtr,
39    pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
40    pub block_importing_notification_stream:
41        SubspaceNotificationStream<BlockImportingNotification<CBlock>>,
42    pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
43    pub consensus_sync_service: Arc<sc_network_sync::SyncingService<CBlock>>,
44    pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
45    pub gossip_message_sink: TracingUnboundedSender<Message>,
46    pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
47    pub domain_backend: Arc<FullBackend<DomainBlock>>,
48    pub domain_config: Configuration,
49}
50
51impl DomainInstanceStarter {
52    pub async fn start(
53        self,
54        bootstrap_result: BootstrapResult<CBlock>,
55        sudo_account: AccountId,
56    ) -> Result<Arc<sc_domains::RuntimeExecutor>, Box<dyn std::error::Error>> {
57        let BootstrapResult {
58            domain_instance_data,
59            domain_created_at,
60            imported_block_notification_stream,
61        } = bootstrap_result;
62
63        let DomainInstanceData {
64            runtime_type,
65            raw_genesis,
66        } = domain_instance_data;
67
68        let DomainInstanceStarter {
69            domain_cli,
70            consensus_client,
71            consensus_keystore,
72            consensus_offchain_tx_pool_factory,
73            block_importing_notification_stream,
74            new_slot_notification_stream,
75            consensus_sync_service,
76            domain_message_receiver,
77            gossip_message_sink,
78            consensus_network,
79            domain_backend,
80            mut domain_config,
81        } = self;
82
83        let domain_id = domain_cli.domain_id.into();
84        domain_config
85            .chain_spec
86            .set_storage(raw_genesis.into_storage());
87
88        let block_importing_notification_stream = block_importing_notification_stream
89            .subscribe()
90            .then(|block_importing_notification| async move {
91                (
92                    block_importing_notification.block_number,
93                    block_importing_notification.acknowledgement_sender,
94                )
95            })
96            .boxed();
97
98        let new_slot_notification_stream = || {
99            new_slot_notification_stream
100                .subscribe()
101                .then(|slot_notification| async move {
102                    (
103                        slot_notification.new_slot_info.slot,
104                        slot_notification.new_slot_info.proof_of_time,
105                    )
106                })
107        };
108
109        let operator_streams = OperatorStreams {
110            // TODO: proper value
111            consensus_block_import_throttling_buffer_size: 10,
112            block_importing_notification_stream,
113            imported_block_notification_stream,
114            new_slot_notification_stream: new_slot_notification_stream(),
115            acknowledgement_sender_stream: futures::stream::empty(),
116            _phantom: Default::default(),
117        };
118
119        let consensus_best_hash = consensus_client.info().best_hash;
120        let chain_constants = consensus_client
121            .runtime_api()
122            .chain_constants(consensus_best_hash)?;
123
124        match runtime_type {
125            RuntimeType::Evm => {
126                let evm_base_path = domain_config
127                    .base_path
128                    .config_dir(domain_config.chain_spec.id());
129
130                let eth_provider =
131                    EthProvider::<
132                        evm_domain_runtime::TransactionConverter,
133                        DefaultEthConfig<
134                            FullClient<DomainBlock, evm_domain_runtime::RuntimeApi>,
135                            FullBackend<DomainBlock>,
136                        >,
137                    >::new(Some(&evm_base_path), domain_cli.additional_args());
138
139                let domain_params = domain_service::DomainParams {
140                    domain_id,
141                    domain_config,
142                    domain_created_at,
143                    consensus_client: consensus_client.clone(),
144                    consensus_offchain_tx_pool_factory: consensus_offchain_tx_pool_factory.clone(),
145                    consensus_network,
146                    domain_sync_oracle: consensus_sync_service.clone(),
147                    operator_streams,
148                    gossip_message_sink,
149                    domain_message_receiver,
150                    provider: eth_provider,
151                    skip_empty_bundle_production: true,
152                    skip_out_of_order_slot: false,
153                    // Always set it to `None` to not running the normal bundle producer
154                    maybe_operator_id: None,
155                    confirmation_depth_k: chain_constants.confirmation_depth_k(),
156                    consensus_chain_sync_params: None::<
157                        ConsensusChainSyncParams<_, HeaderFor<DomainBlock>>,
158                    >,
159                    challenge_period: DOMAINS_BLOCK_PRUNING_DEPTH,
160                    domain_backend,
161                };
162
163                let mut domain_node = domain_service::new_full::<
164                    _,
165                    _,
166                    _,
167                    _,
168                    _,
169                    _,
170                    evm_domain_runtime::RuntimeApi,
171                    AccountId20,
172                    _,
173                >(domain_params)
174                .await?;
175
176                let malicious_bundle_producer = MaliciousBundleProducer::new(
177                    domain_id,
178                    domain_node.client.clone(),
179                    consensus_client,
180                    consensus_keystore,
181                    consensus_offchain_tx_pool_factory,
182                    domain_node.transaction_pool.clone(),
183                    sudo_account,
184                );
185
186                domain_node
187                    .task_manager
188                    .spawn_essential_handle()
189                    .spawn_essential_blocking(
190                        "malicious-bundle-producer",
191                        None,
192                        Box::pin(malicious_bundle_producer.start(new_slot_notification_stream())),
193                    );
194
195                domain_node.network_starter.start_network();
196
197                domain_node.task_manager.future().await?;
198
199                Ok(domain_node.code_executor.clone())
200            }
201            RuntimeType::AutoId => {
202                let domain_params = domain_service::DomainParams {
203                    domain_id,
204                    domain_config,
205                    domain_created_at,
206                    consensus_client: consensus_client.clone(),
207                    consensus_offchain_tx_pool_factory: consensus_offchain_tx_pool_factory.clone(),
208                    consensus_network,
209                    domain_sync_oracle: consensus_sync_service.clone(),
210                    operator_streams,
211                    gossip_message_sink,
212                    domain_message_receiver,
213                    provider: DefaultProvider,
214                    skip_empty_bundle_production: true,
215                    skip_out_of_order_slot: false,
216                    // Always set it to `None` to not running the normal bundle producer
217                    maybe_operator_id: None,
218                    confirmation_depth_k: chain_constants.confirmation_depth_k(),
219                    consensus_chain_sync_params: None::<
220                        ConsensusChainSyncParams<_, HeaderFor<DomainBlock>>,
221                    >,
222                    challenge_period: DOMAINS_BLOCK_PRUNING_DEPTH,
223                    domain_backend,
224                };
225
226                let mut domain_node = domain_service::new_full::<
227                    _,
228                    _,
229                    _,
230                    _,
231                    _,
232                    _,
233                    auto_id_domain_runtime::RuntimeApi,
234                    AccountId32,
235                    _,
236                >(domain_params)
237                .await?;
238
239                let malicious_bundle_producer = MaliciousBundleProducer::new(
240                    domain_id,
241                    domain_node.client.clone(),
242                    consensus_client,
243                    consensus_keystore,
244                    consensus_offchain_tx_pool_factory,
245                    domain_node.transaction_pool.clone(),
246                    sudo_account,
247                );
248
249                domain_node
250                    .task_manager
251                    .spawn_essential_handle()
252                    .spawn_essential_blocking(
253                        "malicious-bundle-producer",
254                        None,
255                        Box::pin(malicious_bundle_producer.start(new_slot_notification_stream())),
256                    );
257
258                domain_node.network_starter.start_network();
259
260                domain_node.task_manager.future().await?;
261
262                Ok(domain_node.code_executor.clone())
263            }
264        }
265    }
266}