1use crate::malicious_bundle_tamper::MaliciousBundleTamper;
2use domain_client_operator::domain_bundle_producer::{BundleProducer, TestBundleProducer};
3use domain_client_operator::domain_bundle_proposer::DomainBundleProposer;
4use domain_client_operator::{OpaqueBundleFor, OperatorSlotInfo};
5use domain_runtime_primitives::opaque::Block as DomainBlock;
6use frame_system_rpc_runtime_api::AccountNonceApi;
7use futures::{Stream, StreamExt, TryFutureExt};
8use pallet_domains::OperatorConfig;
9use parity_scale_codec::Encode;
10use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
11use sc_service::KeystoreContainer;
12use sc_service::config::KeystoreConfig;
13use sc_transaction_pool_api::OffchainTransactionPoolFactory;
14use sc_utils::mpsc::tracing_unbounded;
15use sp_api::ProvideRuntimeApi;
16use sp_block_builder::BlockBuilder;
17use sp_blockchain::Info;
18use sp_consensus_slots::Slot;
19use sp_core::crypto::UncheckedFrom;
20use sp_domains::core_api::DomainCoreApi;
21use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey};
22use sp_keyring::Sr25519Keyring;
23use sp_keystore::{Keystore, KeystorePtr};
24use sp_messenger::MessengerApi;
25use sp_runtime::traits::NumberFor;
26use sp_runtime::{RuntimeAppPublic, generic};
27use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
28use std::error::Error;
29use std::sync::Arc;
30use subspace_core_primitives::pot::PotOutput;
31use subspace_runtime::{DisablePallets, Runtime, RuntimeCall, SignedExtra, UncheckedExtrinsic};
32use subspace_runtime_primitives::opaque::Block as CBlock;
33use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, HeaderFor, Nonce};
34
35const MALICIOUS_OPR_STAKE_MULTIPLIER: Balance = 3;
36
37enum MaliciousOperatorStatus {
38 Registering(OperatorPublicKey),
39 Registered {
40 operator_id: OperatorId,
41 signing_key: OperatorPublicKey,
42 },
43 NoStatus,
44}
45
46impl MaliciousOperatorStatus {
47 fn registering(&mut self, signing_key: OperatorPublicKey) {
48 *self = MaliciousOperatorStatus::Registering(signing_key)
49 }
50
51 fn registered(&mut self, operator_id: OperatorId, signing_key: OperatorPublicKey) {
52 *self = MaliciousOperatorStatus::Registered {
53 operator_id,
54 signing_key,
55 }
56 }
57
58 fn no_status(&mut self) {
59 *self = MaliciousOperatorStatus::NoStatus
60 }
61
62 fn registered_operator(&self) -> Option<(OperatorId, OperatorPublicKey)> {
63 match self {
64 MaliciousOperatorStatus::Registered {
65 operator_id,
66 signing_key,
67 } => Some((*operator_id, signing_key.clone())),
68 _ => None,
69 }
70 }
71
72 fn registering_signing_key(&self) -> Option<OperatorPublicKey> {
73 match self {
74 MaliciousOperatorStatus::Registering(key) => Some(key.clone()),
75 _ => None,
76 }
77 }
78}
79
80pub struct MaliciousBundleProducer<Client, CClient, TransactionPool> {
81 domain_id: DomainId,
82 sudo_account: AccountId,
83 consensus_keystore: KeystorePtr,
84 operator_keystore: KeystorePtr,
85 consensus_client: Arc<CClient>,
86 consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
87 bundle_producer: TestBundleProducer<DomainBlock, CBlock, Client, CClient, TransactionPool>,
88 malicious_bundle_tamper: MaliciousBundleTamper<DomainBlock, CBlock, Client>,
89 malicious_operator_status: MaliciousOperatorStatus,
90}
91
92impl<Client, CClient, TransactionPool> MaliciousBundleProducer<Client, CClient, TransactionPool>
93where
94 Client: HeaderBackend<DomainBlock>
95 + BlockBackend<DomainBlock>
96 + AuxStore
97 + ProvideRuntimeApi<DomainBlock>
98 + 'static,
99 Client::Api: BlockBuilder<DomainBlock>
100 + DomainCoreApi<DomainBlock>
101 + MessengerApi<DomainBlock, NumberFor<CBlock>, BlockHashFor<CBlock>>
102 + TaggedTransactionQueue<DomainBlock>,
103 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + 'static,
104 CClient::Api: DomainsApi<CBlock, HeaderFor<DomainBlock>>
105 + BundleProducerElectionApi<CBlock, Balance>
106 + AccountNonceApi<CBlock, AccountId, Nonce>,
107 TransactionPool: sc_transaction_pool_api::TransactionPool<
108 Block = DomainBlock,
109 Hash = BlockHashFor<DomainBlock>,
110 > + 'static,
111{
112 pub fn new(
113 domain_id: DomainId,
114 domain_client: Arc<Client>,
115 consensus_client: Arc<CClient>,
116 consensus_keystore: KeystorePtr,
117 consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
118 domain_transaction_pool: Arc<TransactionPool>,
119 sudo_account: AccountId,
120 ) -> Self {
121 let operator_keystore = KeystoreContainer::new(&KeystoreConfig::InMemory)
122 .expect("create in-memory keystore container must succeed")
123 .keystore();
124
125 let domain_bundle_proposer = DomainBundleProposer::new(
126 domain_id,
127 domain_client.clone(),
128 consensus_client.clone(),
129 domain_transaction_pool,
130 );
131
132 let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
133 let bundle_producer = TestBundleProducer::new(
134 domain_id,
135 consensus_client.clone(),
136 domain_client.clone(),
137 domain_bundle_proposer,
138 Arc::new(bundle_sender),
139 operator_keystore.clone(),
140 false,
142 false,
143 );
144
145 let malicious_bundle_tamper =
146 MaliciousBundleTamper::new(domain_client, operator_keystore.clone());
147
148 Self {
149 domain_id,
150 consensus_client,
151 consensus_keystore,
152 operator_keystore,
153 bundle_producer,
154 malicious_bundle_tamper,
155 malicious_operator_status: MaliciousOperatorStatus::NoStatus,
156 sudo_account,
157 consensus_offchain_tx_pool_factory,
158 }
159 }
160
161 async fn handle_new_slot(
162 &mut self,
163 operator_id: OperatorId,
164 new_slot_info: OperatorSlotInfo,
165 ) -> Option<OpaqueBundleFor<DomainBlock, CBlock>> {
166 let slot = new_slot_info.slot;
167 self.bundle_producer
168 .produce_bundle(operator_id, new_slot_info)
169 .unwrap_or_else(move |error| {
170 tracing::error!(
171 ?slot,
172 ?operator_id,
173 ?error,
174 "Error at malicious operator producing bundle"
175 );
176 None
177 })
178 .await
179 .and_then(|res| res.into_opaque_bundle())
180 }
181
182 pub async fn start<NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static>(
183 mut self,
184 new_slot_notification_stream: NSNS,
185 ) {
186 let mut new_slot_notification_stream = Box::pin(new_slot_notification_stream);
187 while let Some((slot, proof_of_time)) = new_slot_notification_stream.next().await {
188 if let Some((operator_id, signing_key)) =
189 self.malicious_operator_status.registered_operator()
190 {
191 let maybe_opaque_bundle = self
192 .handle_new_slot(
193 operator_id,
194 OperatorSlotInfo {
195 slot,
196 proof_of_time,
197 },
198 )
199 .await;
200
201 if let Some(mut opaque_bundle) = maybe_opaque_bundle {
202 if let Err(err) = self
203 .malicious_bundle_tamper
204 .maybe_tamper_bundle(&mut opaque_bundle, &signing_key)
205 {
206 tracing::error!(?err, "Got error when try to tamper bundle");
207 }
208 if let Err(err) = self.submit_bundle(opaque_bundle) {
209 tracing::info!(?err, "Malicious operator failed to submit bundle");
210 }
211 }
212 }
213
214 if u64::from(slot) % 10 == 0
216 && let Err(err) = self.update_malicious_operator_status()
217 {
218 tracing::error!(?err, "Failed to update malicious operator status");
219 }
220 }
221 }
222
223 fn update_malicious_operator_status(&mut self) -> Result<(), Box<dyn Error>> {
224 let consensus_best_hash = self.consensus_client.info().best_hash;
225 let (mut current_operators, next_operators) = self
226 .consensus_client
227 .runtime_api()
228 .domain_operators(consensus_best_hash, self.domain_id)?
229 .ok_or_else(|| {
230 sp_blockchain::Error::Application(
231 format!("Operator set for domain {} not found", self.domain_id).into(),
232 )
233 })?;
234
235 if let Some((malicious_operator_id, _)) =
236 self.malicious_operator_status.registered_operator()
237 {
238 if next_operators.contains(&malicious_operator_id) {
239 return Ok(());
240 } else {
241 tracing::info!(
242 ?malicious_operator_id,
243 "Current malicious operator is missing from next operator set, probably got slashed"
244 );
245 current_operators.remove(&malicious_operator_id);
249 self.malicious_operator_status.no_status();
250 }
251 }
252
253 let signing_key = match &self.malicious_operator_status.registering_signing_key() {
254 Some(k) => k.clone(),
255 None => {
256 let public_key: OperatorPublicKey = self
257 .operator_keystore
258 .sr25519_generate_new(OperatorPublicKey::ID, None)?
259 .into();
260
261 self.malicious_operator_status
262 .registering(public_key.clone());
263
264 tracing::info!(?public_key, "Start register new malicious operator");
265
266 public_key
267 }
268 };
269
270 let mut maybe_operator_id = None;
271 for operator_id in current_operators.keys().chain(next_operators.iter()) {
272 if let Some((operator_signing_key, _)) = self
273 .consensus_client
274 .runtime_api()
275 .operator(consensus_best_hash, *operator_id)?
276 && operator_signing_key == signing_key
277 {
278 maybe_operator_id = Some(*operator_id);
279 break;
280 }
281 }
282
283 match maybe_operator_id {
286 None => {
287 let nonce = self.sudo_acccount_nonce()?;
288 let current_total_stake: Balance = current_operators.into_values().sum();
289 self.submit_register_operator(
290 nonce,
291 signing_key,
292 MALICIOUS_OPR_STAKE_MULTIPLIER * current_total_stake,
294 )?;
295 self.submit_force_staking_epoch_transition(nonce + 1)?;
296 }
297 Some(operator_id) => {
298 if !next_operators.contains(&operator_id) {
299 self.malicious_operator_status.no_status();
303 } else if !current_operators.contains_key(&operator_id) {
304 self.submit_force_staking_epoch_transition(self.sudo_acccount_nonce()?)?;
305 } else {
306 tracing::info!(
307 ?operator_id,
308 ?signing_key,
309 "Registered a new malicious operator"
310 );
311 self.malicious_operator_status
312 .registered(operator_id, signing_key);
313 }
314 }
315 }
316
317 Ok(())
318 }
319
320 fn sudo_acccount_nonce(&self) -> Result<Nonce, Box<dyn Error>> {
321 Ok(self.consensus_client.runtime_api().account_nonce(
322 self.consensus_client.info().best_hash,
323 self.sudo_account.clone(),
324 )?)
325 }
326
327 fn submit_bundle(
328 &self,
329 opaque_bundle: OpaqueBundleFor<DomainBlock, CBlock>,
330 ) -> Result<(), Box<dyn Error>> {
331 let call = pallet_domains::Call::submit_bundle { opaque_bundle };
332 self.submit_consensus_extrinsic(None, call.into())
333 }
334
335 fn submit_register_operator(
336 &self,
337 nonce: Nonce,
338 signing_key: OperatorPublicKey,
339 staking_amount: Balance,
340 ) -> Result<(), Box<dyn Error>> {
341 let call = pallet_domains::Call::register_operator {
342 domain_id: self.domain_id,
343 amount: staking_amount,
344 config: OperatorConfig {
345 signing_key,
346 minimum_nominator_stake: Balance::MAX,
347 nomination_tax: Default::default(),
348 },
349 };
350 self.submit_consensus_extrinsic(Some(nonce), call.into())
351 }
352
353 fn submit_force_staking_epoch_transition(&self, nonce: Nonce) -> Result<(), Box<dyn Error>> {
354 let call = pallet_sudo::Call::sudo {
355 call: Box::new(RuntimeCall::Domains(
356 pallet_domains::Call::force_staking_epoch_transition {
357 domain_id: self.domain_id,
358 },
359 )),
360 };
361 self.submit_consensus_extrinsic(Some(nonce), call.into())
362 }
363
364 fn submit_consensus_extrinsic(
365 &self,
366 maybe_nonce: Option<Nonce>,
367 call: RuntimeCall,
368 ) -> Result<(), Box<dyn Error>> {
369 let etx = match maybe_nonce {
370 Some(nonce) => construct_signed_extrinsic(
371 &self.consensus_keystore,
372 self.consensus_client.info(),
373 call.clone(),
374 self.sudo_account.clone(),
375 nonce,
376 )?,
377 None => {
378 let extra =
380 get_singed_extra(self.consensus_client.info().best_number.into(), true, 0);
381 UncheckedExtrinsic::new_transaction(call.clone(), extra)
382 }
383 };
384
385 self.consensus_offchain_tx_pool_factory
386 .offchain_transaction_pool(self.consensus_client.info().best_hash)
387 .submit_transaction(etx.encode())
388 .map_err(|err| {
389 sp_blockchain::Error::Application(
390 format!("Failed to submit consensus extrinsic, call {call:?}, err {err:?}")
391 .into(),
392 )
393 })?;
394
395 Ok(())
396 }
397}
398
399fn get_singed_extra(best_number: u64, immortal: bool, nonce: Nonce) -> SignedExtra {
400 let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
401 .checked_next_power_of_two()
402 .map(|c| c / 2)
403 .unwrap_or(2);
404 (
405 frame_system::CheckNonZeroSender::<Runtime>::new(),
406 frame_system::CheckSpecVersion::<Runtime>::new(),
407 frame_system::CheckTxVersion::<Runtime>::new(),
408 frame_system::CheckGenesis::<Runtime>::new(),
409 frame_system::CheckMortality::<Runtime>::from(if immortal {
410 generic::Era::Immortal
411 } else {
412 generic::Era::mortal(period, best_number)
413 }),
414 frame_system::CheckNonce::<Runtime>::from(nonce.into()),
415 frame_system::CheckWeight::<Runtime>::new(),
416 pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(0u128),
417 DisablePallets,
418 pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
419 pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
420 pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
421 )
422}
423
424pub fn construct_signed_extrinsic(
425 consensus_keystore: &KeystorePtr,
426 consensus_chain_info: Info<CBlock>,
427 call: RuntimeCall,
428 caller: AccountId,
429 nonce: Nonce,
430) -> Result<UncheckedExtrinsic, Box<dyn Error>> {
431 let extra = get_singed_extra(consensus_chain_info.best_number.into(), false, nonce);
432 let raw_payload = generic::SignedPayload::<RuntimeCall, SignedExtra>::from_raw(
433 call.clone(),
434 extra.clone(),
435 (
436 (),
437 subspace_runtime::VERSION.spec_version,
438 subspace_runtime::VERSION.transaction_version,
439 consensus_chain_info.genesis_hash,
440 consensus_chain_info.best_hash,
441 (),
442 (),
443 (),
444 (),
445 (),
446 (),
447 (),
448 ),
449 );
450
451 let signature = match Sr25519Keyring::from_account_id(&caller) {
452 Some(keyring) => raw_payload.using_encoded(|e| keyring.sign(e)),
453 None => {
454 let public_key =
455 sp_core::sr25519::Public::unchecked_from(<AccountId as Into<[u8; 32]>>::into(
456 caller.clone(),
457 ));
458 raw_payload
459 .using_encoded(|e| {
460 consensus_keystore
461 .sr25519_sign(OperatorPublicKey::ID, &public_key, e)
462 })?
463 .ok_or(format!(
464 "Failed to sign extrinsic, sudo key pair missing from keystore?, public_key {public_key:?}"
465 ))?
466 }
467 };
468
469 Ok(UncheckedExtrinsic::new_signed(
470 call,
471 sp_runtime::MultiAddress::Id(caller),
472 signature.into(),
473 extra,
474 ))
475}