subspace_malicious_operator/
malicious_domain_instance_starter.rs1use crate::DomainCli;
2use crate::malicious_bundle_producer::MaliciousBundleProducer;
3use cross_domain_message_gossip::{ChainMsg, Message};
4use domain_client_operator::snap_sync::ConsensusChainSyncParams;
5use domain_client_operator::{BootstrapResult, OperatorStreams};
6use domain_eth_service::DefaultEthConfig;
7use domain_eth_service::provider::EthProvider;
8use domain_runtime_primitives::opaque::Block as DomainBlock;
9use domain_service::providers::DefaultProvider;
10use domain_service::{FullBackend, FullClient};
11use evm_domain_runtime::AccountId as AccountId20;
12use futures::StreamExt;
13use sc_consensus_subspace::block_import::BlockImportingNotification;
14use sc_consensus_subspace::notification::SubspaceNotificationStream;
15use sc_consensus_subspace::slot_worker::NewSlotNotification;
16use sc_network::NetworkPeers;
17use sc_service::Configuration;
18use sc_transaction_pool_api::OffchainTransactionPoolFactory;
19use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
20use sp_api::ProvideRuntimeApi;
21use sp_blockchain::HeaderBackend;
22use sp_consensus_subspace::SubspaceApi;
23use sp_core::crypto::AccountId32;
24use sp_core::traits::SpawnEssentialNamed;
25use sp_domains::{DomainInstanceData, DomainsApi, RuntimeType};
26use sp_keystore::KeystorePtr;
27use std::sync::Arc;
28use subspace_runtime::RuntimeApi as CRuntimeApi;
29use subspace_runtime_primitives::opaque::Block as CBlock;
30use subspace_runtime_primitives::{AccountId, HeaderFor};
31use subspace_service::FullClient as CFullClient;
32
33pub struct DomainInstanceStarter {
36 pub domain_cli: DomainCli,
37 pub consensus_client: Arc<CFullClient<CRuntimeApi>>,
38 pub consensus_keystore: KeystorePtr,
39 pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
40 pub block_importing_notification_stream:
41 SubspaceNotificationStream<BlockImportingNotification<CBlock>>,
42 pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
43 pub consensus_sync_service: Arc<sc_network_sync::SyncingService<CBlock>>,
44 pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
45 pub gossip_message_sink: TracingUnboundedSender<Message>,
46 pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
47 pub domain_backend: Arc<FullBackend<DomainBlock>>,
48 pub domain_config: Configuration,
49}
50
51impl DomainInstanceStarter {
52 pub async fn start(
53 self,
54 bootstrap_result: BootstrapResult<CBlock>,
55 sudo_account: AccountId,
56 ) -> Result<Arc<sc_domains::RuntimeExecutor>, Box<dyn std::error::Error>> {
57 let BootstrapResult {
58 domain_instance_data,
59 domain_created_at,
60 imported_block_notification_stream,
61 } = bootstrap_result;
62
63 let DomainInstanceData {
64 runtime_type,
65 raw_genesis,
66 } = domain_instance_data;
67
68 let DomainInstanceStarter {
69 domain_cli,
70 consensus_client,
71 consensus_keystore,
72 consensus_offchain_tx_pool_factory,
73 block_importing_notification_stream,
74 new_slot_notification_stream,
75 consensus_sync_service,
76 domain_message_receiver,
77 gossip_message_sink,
78 consensus_network,
79 domain_backend,
80 mut domain_config,
81 } = self;
82
83 let domain_id = domain_cli.domain_id.into();
84 domain_config
85 .chain_spec
86 .set_storage(raw_genesis.into_storage());
87
88 let block_importing_notification_stream = block_importing_notification_stream
89 .subscribe()
90 .then(|block_importing_notification| async move {
91 (
92 block_importing_notification.block_number,
93 block_importing_notification.acknowledgement_sender,
94 )
95 })
96 .boxed();
97
98 let new_slot_notification_stream = || {
99 new_slot_notification_stream
100 .subscribe()
101 .then(|slot_notification| async move {
102 (
103 slot_notification.new_slot_info.slot,
104 slot_notification.new_slot_info.proof_of_time,
105 )
106 })
107 };
108
109 let operator_streams = OperatorStreams {
110 consensus_block_import_throttling_buffer_size: 10,
112 block_importing_notification_stream,
113 imported_block_notification_stream,
114 new_slot_notification_stream: new_slot_notification_stream(),
115 acknowledgement_sender_stream: futures::stream::empty(),
116 _phantom: Default::default(),
117 };
118
119 let consensus_best_hash = consensus_client.info().best_hash;
120 let runtime_api = consensus_client.runtime_api();
121 let chain_constants = runtime_api.chain_constants(consensus_best_hash)?;
122
123 let domain_block_pruning_depth = runtime_api.block_pruning_depth(consensus_best_hash)?;
124 match runtime_type {
125 RuntimeType::Evm => {
126 let evm_base_path = domain_config
127 .base_path
128 .config_dir(domain_config.chain_spec.id());
129
130 let eth_provider =
131 EthProvider::<
132 evm_domain_runtime::TransactionConverter,
133 DefaultEthConfig<
134 FullClient<DomainBlock, evm_domain_runtime::RuntimeApi>,
135 FullBackend<DomainBlock>,
136 >,
137 >::new(Some(&evm_base_path), domain_cli.additional_args());
138
139 let domain_params = domain_service::DomainParams {
140 domain_id,
141 domain_config,
142 domain_created_at,
143 consensus_client: consensus_client.clone(),
144 consensus_offchain_tx_pool_factory: consensus_offchain_tx_pool_factory.clone(),
145 consensus_network,
146 domain_sync_oracle: consensus_sync_service.clone(),
147 operator_streams,
148 gossip_message_sink,
149 domain_message_receiver,
150 provider: eth_provider,
151 skip_empty_bundle_production: true,
152 skip_out_of_order_slot: false,
153 maybe_operator_id: None,
155 confirmation_depth_k: chain_constants.confirmation_depth_k(),
156 consensus_chain_sync_params: None::<
157 ConsensusChainSyncParams<_, HeaderFor<DomainBlock>>,
158 >,
159 challenge_period: domain_block_pruning_depth,
160 domain_backend,
161 };
162
163 let mut domain_node = domain_service::new_full::<
164 _,
165 _,
166 _,
167 _,
168 _,
169 _,
170 evm_domain_runtime::RuntimeApi,
171 AccountId20,
172 _,
173 >(domain_params)
174 .await?;
175
176 let malicious_bundle_producer = MaliciousBundleProducer::new(
177 domain_id,
178 domain_node.client.clone(),
179 consensus_client,
180 consensus_keystore,
181 consensus_offchain_tx_pool_factory,
182 domain_node.transaction_pool.clone(),
183 sudo_account,
184 );
185
186 domain_node
187 .task_manager
188 .spawn_essential_handle()
189 .spawn_essential_blocking(
190 "malicious-bundle-producer",
191 None,
192 Box::pin(malicious_bundle_producer.start(new_slot_notification_stream())),
193 );
194
195 domain_node.task_manager.future().await?;
196
197 Ok(domain_node.code_executor.clone())
198 }
199 RuntimeType::AutoId => {
200 let domain_params = domain_service::DomainParams {
201 domain_id,
202 domain_config,
203 domain_created_at,
204 consensus_client: consensus_client.clone(),
205 consensus_offchain_tx_pool_factory: consensus_offchain_tx_pool_factory.clone(),
206 consensus_network,
207 domain_sync_oracle: consensus_sync_service.clone(),
208 operator_streams,
209 gossip_message_sink,
210 domain_message_receiver,
211 provider: DefaultProvider,
212 skip_empty_bundle_production: true,
213 skip_out_of_order_slot: false,
214 maybe_operator_id: None,
216 confirmation_depth_k: chain_constants.confirmation_depth_k(),
217 consensus_chain_sync_params: None::<
218 ConsensusChainSyncParams<_, HeaderFor<DomainBlock>>,
219 >,
220 challenge_period: domain_block_pruning_depth,
221 domain_backend,
222 };
223
224 let mut domain_node = domain_service::new_full::<
225 _,
226 _,
227 _,
228 _,
229 _,
230 _,
231 auto_id_domain_runtime::RuntimeApi,
232 AccountId32,
233 _,
234 >(domain_params)
235 .await?;
236
237 let malicious_bundle_producer = MaliciousBundleProducer::new(
238 domain_id,
239 domain_node.client.clone(),
240 consensus_client,
241 consensus_keystore,
242 consensus_offchain_tx_pool_factory,
243 domain_node.transaction_pool.clone(),
244 sudo_account,
245 );
246
247 domain_node
248 .task_manager
249 .spawn_essential_handle()
250 .spawn_essential_blocking(
251 "malicious-bundle-producer",
252 None,
253 Box::pin(malicious_bundle_producer.start(new_slot_notification_stream())),
254 );
255
256 domain_node.task_manager.future().await?;
257
258 Ok(domain_node.code_executor.clone())
259 }
260 }
261 }
262}