1use crate::malicious_bundle_tamper::MaliciousBundleTamper;
2use domain_client_operator::domain_bundle_producer::{BundleProducer, TestBundleProducer};
3use domain_client_operator::domain_bundle_proposer::DomainBundleProposer;
4use domain_client_operator::{OpaqueBundleFor, OperatorSlotInfo};
5use domain_runtime_primitives::opaque::Block as DomainBlock;
6use frame_system_rpc_runtime_api::AccountNonceApi;
7use futures::{Stream, StreamExt, TryFutureExt};
8use pallet_domains::OperatorConfig;
9use parity_scale_codec::Encode;
10use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
11use sc_service::KeystoreContainer;
12use sc_service::config::KeystoreConfig;
13use sc_transaction_pool_api::OffchainTransactionPoolFactory;
14use sc_utils::mpsc::tracing_unbounded;
15use sp_api::ProvideRuntimeApi;
16use sp_block_builder::BlockBuilder;
17use sp_blockchain::Info;
18use sp_consensus_slots::Slot;
19use sp_core::crypto::UncheckedFrom;
20use sp_domains::core_api::DomainCoreApi;
21use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey};
22use sp_keyring::Sr25519Keyring;
23use sp_keystore::{Keystore, KeystorePtr};
24use sp_messenger::MessengerApi;
25use sp_runtime::traits::NumberFor;
26use sp_runtime::{RuntimeAppPublic, generic};
27use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
28use std::error::Error;
29use std::sync::Arc;
30use subspace_core_primitives::pot::PotOutput;
31use subspace_runtime::{Runtime, RuntimeCall, SignedExtra, UncheckedExtrinsic};
32use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
33use subspace_runtime_primitives::opaque::Block as CBlock;
34use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, HeaderFor, Nonce};
35
36const MALICIOUS_OPR_STAKE_MULTIPLIER: Balance = 3;
37
38enum MaliciousOperatorStatus {
39 Registering(OperatorPublicKey),
40 Registered {
41 operator_id: OperatorId,
42 signing_key: OperatorPublicKey,
43 },
44 NoStatus,
45}
46
47impl MaliciousOperatorStatus {
48 fn registering(&mut self, signing_key: OperatorPublicKey) {
49 *self = MaliciousOperatorStatus::Registering(signing_key)
50 }
51
52 fn registered(&mut self, operator_id: OperatorId, signing_key: OperatorPublicKey) {
53 *self = MaliciousOperatorStatus::Registered {
54 operator_id,
55 signing_key,
56 }
57 }
58
59 fn no_status(&mut self) {
60 *self = MaliciousOperatorStatus::NoStatus
61 }
62
63 fn registered_operator(&self) -> Option<(OperatorId, OperatorPublicKey)> {
64 match self {
65 MaliciousOperatorStatus::Registered {
66 operator_id,
67 signing_key,
68 } => Some((*operator_id, signing_key.clone())),
69 _ => None,
70 }
71 }
72
73 fn registering_signing_key(&self) -> Option<OperatorPublicKey> {
74 match self {
75 MaliciousOperatorStatus::Registering(key) => Some(key.clone()),
76 _ => None,
77 }
78 }
79}
80
81pub struct MaliciousBundleProducer<Client, CClient, TransactionPool> {
82 domain_id: DomainId,
83 sudo_account: AccountId,
84 consensus_keystore: KeystorePtr,
85 operator_keystore: KeystorePtr,
86 consensus_client: Arc<CClient>,
87 consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
88 bundle_producer: TestBundleProducer<DomainBlock, CBlock, Client, CClient, TransactionPool>,
89 malicious_bundle_tamper: MaliciousBundleTamper<DomainBlock, CBlock, Client>,
90 malicious_operator_status: MaliciousOperatorStatus,
91}
92
93impl<Client, CClient, TransactionPool> MaliciousBundleProducer<Client, CClient, TransactionPool>
94where
95 Client: HeaderBackend<DomainBlock>
96 + BlockBackend<DomainBlock>
97 + AuxStore
98 + ProvideRuntimeApi<DomainBlock>
99 + 'static,
100 Client::Api: BlockBuilder<DomainBlock>
101 + DomainCoreApi<DomainBlock>
102 + MessengerApi<DomainBlock, NumberFor<CBlock>, BlockHashFor<CBlock>>
103 + TaggedTransactionQueue<DomainBlock>,
104 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + 'static,
105 CClient::Api: DomainsApi<CBlock, HeaderFor<DomainBlock>>
106 + BundleProducerElectionApi<CBlock, Balance>
107 + AccountNonceApi<CBlock, AccountId, Nonce>,
108 TransactionPool: sc_transaction_pool_api::TransactionPool<
109 Block = DomainBlock,
110 Hash = BlockHashFor<DomainBlock>,
111 > + 'static,
112{
113 pub fn new(
114 domain_id: DomainId,
115 domain_client: Arc<Client>,
116 consensus_client: Arc<CClient>,
117 consensus_keystore: KeystorePtr,
118 consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
119 domain_transaction_pool: Arc<TransactionPool>,
120 sudo_account: AccountId,
121 ) -> Self {
122 let operator_keystore = KeystoreContainer::new(&KeystoreConfig::InMemory)
123 .expect("create in-memory keystore container must succeed")
124 .keystore();
125
126 let domain_bundle_proposer = DomainBundleProposer::new(
127 domain_id,
128 domain_client.clone(),
129 consensus_client.clone(),
130 domain_transaction_pool,
131 );
132
133 let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
134 let bundle_producer = TestBundleProducer::new(
135 domain_id,
136 consensus_client.clone(),
137 domain_client.clone(),
138 domain_bundle_proposer,
139 Arc::new(bundle_sender),
140 operator_keystore.clone(),
141 false,
143 false,
144 );
145
146 let malicious_bundle_tamper =
147 MaliciousBundleTamper::new(domain_client, operator_keystore.clone());
148
149 Self {
150 domain_id,
151 consensus_client,
152 consensus_keystore,
153 operator_keystore,
154 bundle_producer,
155 malicious_bundle_tamper,
156 malicious_operator_status: MaliciousOperatorStatus::NoStatus,
157 sudo_account,
158 consensus_offchain_tx_pool_factory,
159 }
160 }
161
162 async fn handle_new_slot(
163 &mut self,
164 operator_id: OperatorId,
165 new_slot_info: OperatorSlotInfo,
166 ) -> Option<OpaqueBundleFor<DomainBlock, CBlock>> {
167 let slot = new_slot_info.slot;
168 self.bundle_producer
169 .produce_bundle(operator_id, new_slot_info)
170 .unwrap_or_else(move |error| {
171 tracing::error!(
172 ?slot,
173 ?operator_id,
174 ?error,
175 "Error at malicious operator producing bundle"
176 );
177 None
178 })
179 .await
180 .and_then(|res| res.into_opaque_bundle())
181 }
182
183 pub async fn start<NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static>(
184 mut self,
185 new_slot_notification_stream: NSNS,
186 ) {
187 let mut new_slot_notification_stream = Box::pin(new_slot_notification_stream);
188 while let Some((slot, proof_of_time)) = new_slot_notification_stream.next().await {
189 if let Some((operator_id, signing_key)) =
190 self.malicious_operator_status.registered_operator()
191 {
192 let maybe_opaque_bundle = self
193 .handle_new_slot(
194 operator_id,
195 OperatorSlotInfo {
196 slot,
197 proof_of_time,
198 },
199 )
200 .await;
201
202 if let Some(mut opaque_bundle) = maybe_opaque_bundle {
203 if let Err(err) = self
204 .malicious_bundle_tamper
205 .maybe_tamper_bundle(&mut opaque_bundle, &signing_key)
206 {
207 tracing::error!(?err, "Got error when try to tamper bundle");
208 }
209 if let Err(err) = self.submit_bundle(opaque_bundle) {
210 tracing::info!(?err, "Malicious operator failed to submit bundle");
211 }
212 }
213 }
214
215 if u64::from(slot) % 10 == 0
217 && let Err(err) = self.update_malicious_operator_status()
218 {
219 tracing::error!(?err, "Failed to update malicious operator status");
220 }
221 }
222 }
223
224 fn update_malicious_operator_status(&mut self) -> Result<(), Box<dyn Error>> {
225 let consensus_best_hash = self.consensus_client.info().best_hash;
226 let (mut current_operators, next_operators) = self
227 .consensus_client
228 .runtime_api()
229 .domain_operators(consensus_best_hash, self.domain_id)?
230 .ok_or_else(|| {
231 sp_blockchain::Error::Application(
232 format!("Operator set for domain {} not found", self.domain_id).into(),
233 )
234 })?;
235
236 if let Some((malicious_operator_id, _)) =
237 self.malicious_operator_status.registered_operator()
238 {
239 if next_operators.contains(&malicious_operator_id) {
240 return Ok(());
241 } else {
242 tracing::info!(
243 ?malicious_operator_id,
244 "Current malicious operator is missing from next operator set, probably got slashed"
245 );
246 current_operators.remove(&malicious_operator_id);
250 self.malicious_operator_status.no_status();
251 }
252 }
253
254 let signing_key = match &self.malicious_operator_status.registering_signing_key() {
255 Some(k) => k.clone(),
256 None => {
257 let public_key: OperatorPublicKey = self
258 .operator_keystore
259 .sr25519_generate_new(OperatorPublicKey::ID, None)?
260 .into();
261
262 self.malicious_operator_status
263 .registering(public_key.clone());
264
265 tracing::info!(?public_key, "Start register new malicious operator");
266
267 public_key
268 }
269 };
270
271 let mut maybe_operator_id = None;
272 for operator_id in current_operators.keys().chain(next_operators.iter()) {
273 if let Some((operator_signing_key, _)) = self
274 .consensus_client
275 .runtime_api()
276 .operator(consensus_best_hash, *operator_id)?
277 && operator_signing_key == signing_key
278 {
279 maybe_operator_id = Some(*operator_id);
280 break;
281 }
282 }
283
284 match maybe_operator_id {
287 None => {
288 let nonce = self.sudo_acccount_nonce()?;
289 let current_total_stake: Balance = current_operators.into_values().sum();
290 self.submit_register_operator(
291 nonce,
292 signing_key,
293 MALICIOUS_OPR_STAKE_MULTIPLIER * current_total_stake,
295 )?;
296 self.submit_force_staking_epoch_transition(nonce + 1)?;
297 }
298 Some(operator_id) => {
299 if !next_operators.contains(&operator_id) {
300 self.malicious_operator_status.no_status();
304 } else if !current_operators.contains_key(&operator_id) {
305 self.submit_force_staking_epoch_transition(self.sudo_acccount_nonce()?)?;
306 } else {
307 tracing::info!(
308 ?operator_id,
309 ?signing_key,
310 "Registered a new malicious operator"
311 );
312 self.malicious_operator_status
313 .registered(operator_id, signing_key);
314 }
315 }
316 }
317
318 Ok(())
319 }
320
321 fn sudo_acccount_nonce(&self) -> Result<Nonce, Box<dyn Error>> {
322 Ok(self.consensus_client.runtime_api().account_nonce(
323 self.consensus_client.info().best_hash,
324 self.sudo_account.clone(),
325 )?)
326 }
327
328 fn submit_bundle(
329 &self,
330 opaque_bundle: OpaqueBundleFor<DomainBlock, CBlock>,
331 ) -> Result<(), Box<dyn Error>> {
332 let call = pallet_domains::Call::submit_bundle { opaque_bundle };
333 self.submit_consensus_extrinsic(None, call.into())
334 }
335
336 fn submit_register_operator(
337 &self,
338 nonce: Nonce,
339 signing_key: OperatorPublicKey,
340 staking_amount: Balance,
341 ) -> Result<(), Box<dyn Error>> {
342 let call = pallet_domains::Call::register_operator {
343 domain_id: self.domain_id,
344 amount: staking_amount,
345 config: OperatorConfig {
346 signing_key,
347 minimum_nominator_stake: Balance::MAX,
348 nomination_tax: Default::default(),
349 },
350 };
351 self.submit_consensus_extrinsic(Some(nonce), call.into())
352 }
353
354 fn submit_force_staking_epoch_transition(&self, nonce: Nonce) -> Result<(), Box<dyn Error>> {
355 let call = pallet_sudo::Call::sudo {
356 call: Box::new(RuntimeCall::Domains(
357 pallet_domains::Call::force_staking_epoch_transition {
358 domain_id: self.domain_id,
359 },
360 )),
361 };
362 self.submit_consensus_extrinsic(Some(nonce), call.into())
363 }
364
365 fn submit_consensus_extrinsic(
366 &self,
367 maybe_nonce: Option<Nonce>,
368 call: RuntimeCall,
369 ) -> Result<(), Box<dyn Error>> {
370 let etx = match maybe_nonce {
371 Some(nonce) => construct_signed_extrinsic(
372 &self.consensus_keystore,
373 self.consensus_client.info(),
374 call.clone(),
375 self.sudo_account.clone(),
376 nonce,
377 )?,
378 None => {
379 let extra =
381 get_singed_extra(self.consensus_client.info().best_number.into(), true, 0);
382 UncheckedExtrinsic::new_transaction(call.clone(), extra)
383 }
384 };
385
386 self.consensus_offchain_tx_pool_factory
387 .offchain_transaction_pool(self.consensus_client.info().best_hash)
388 .submit_transaction(etx.encode())
389 .map_err(|err| {
390 sp_blockchain::Error::Application(
391 format!("Failed to submit consensus extrinsic, call {call:?}, err {err:?}")
392 .into(),
393 )
394 })?;
395
396 Ok(())
397 }
398}
399
400fn get_singed_extra(best_number: u64, immortal: bool, nonce: Nonce) -> SignedExtra {
401 let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
402 .checked_next_power_of_two()
403 .map(|c| c / 2)
404 .unwrap_or(2);
405 (
406 frame_system::CheckNonZeroSender::<Runtime>::new(),
407 frame_system::CheckSpecVersion::<Runtime>::new(),
408 frame_system::CheckTxVersion::<Runtime>::new(),
409 frame_system::CheckGenesis::<Runtime>::new(),
410 frame_system::CheckMortality::<Runtime>::from(if immortal {
411 generic::Era::Immortal
412 } else {
413 generic::Era::mortal(period, best_number)
414 }),
415 frame_system::CheckNonce::<Runtime>::from(nonce.into()),
416 frame_system::CheckWeight::<Runtime>::new(),
417 pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(0u128),
418 BalanceTransferCheckExtension::<Runtime>::default(),
419 pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
420 pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
421 pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
422 )
423}
424
425pub fn construct_signed_extrinsic(
426 consensus_keystore: &KeystorePtr,
427 consensus_chain_info: Info<CBlock>,
428 call: RuntimeCall,
429 caller: AccountId,
430 nonce: Nonce,
431) -> Result<UncheckedExtrinsic, Box<dyn Error>> {
432 let extra = get_singed_extra(consensus_chain_info.best_number.into(), false, nonce);
433 let raw_payload = generic::SignedPayload::<RuntimeCall, SignedExtra>::from_raw(
434 call.clone(),
435 extra.clone(),
436 (
437 (),
438 subspace_runtime::VERSION.spec_version,
439 subspace_runtime::VERSION.transaction_version,
440 consensus_chain_info.genesis_hash,
441 consensus_chain_info.best_hash,
442 (),
443 (),
444 (),
445 (),
446 (),
447 (),
448 (),
449 ),
450 );
451
452 let signature = match Sr25519Keyring::from_account_id(&caller) {
453 Some(keyring) => raw_payload.using_encoded(|e| keyring.sign(e)),
454 None => {
455 let public_key =
456 sp_core::sr25519::Public::unchecked_from(<AccountId as Into<[u8; 32]>>::into(
457 caller.clone(),
458 ));
459 raw_payload
460 .using_encoded(|e| {
461 consensus_keystore
462 .sr25519_sign(OperatorPublicKey::ID, &public_key, e)
463 })?
464 .ok_or(format!(
465 "Failed to sign extrinsic, sudo key pair missing from keystore?, public_key {public_key:?}"
466 ))?
467 }
468 };
469
470 Ok(UncheckedExtrinsic::new_signed(
471 call,
472 sp_runtime::MultiAddress::Id(caller),
473 signature.into(),
474 extra,
475 ))
476}