subspace_malicious_operator/
malicious_bundle_producer.rs

1use crate::malicious_bundle_tamper::MaliciousBundleTamper;
2use domain_client_operator::domain_bundle_producer::{BundleProducer, TestBundleProducer};
3use domain_client_operator::domain_bundle_proposer::DomainBundleProposer;
4use domain_client_operator::{OpaqueBundleFor, OperatorSlotInfo};
5use domain_runtime_primitives::opaque::Block as DomainBlock;
6use frame_system_rpc_runtime_api::AccountNonceApi;
7use futures::{Stream, StreamExt, TryFutureExt};
8use pallet_domains::OperatorConfig;
9use parity_scale_codec::Encode;
10use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
11use sc_service::KeystoreContainer;
12use sc_service::config::KeystoreConfig;
13use sc_transaction_pool_api::OffchainTransactionPoolFactory;
14use sc_utils::mpsc::tracing_unbounded;
15use sp_api::ProvideRuntimeApi;
16use sp_block_builder::BlockBuilder;
17use sp_blockchain::Info;
18use sp_consensus_slots::Slot;
19use sp_core::crypto::UncheckedFrom;
20use sp_domains::core_api::DomainCoreApi;
21use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey};
22use sp_keyring::Sr25519Keyring;
23use sp_keystore::{Keystore, KeystorePtr};
24use sp_messenger::MessengerApi;
25use sp_runtime::traits::NumberFor;
26use sp_runtime::{RuntimeAppPublic, generic};
27use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
28use std::error::Error;
29use std::sync::Arc;
30use subspace_core_primitives::pot::PotOutput;
31use subspace_runtime::{DisablePallets, Runtime, RuntimeCall, SignedExtra, UncheckedExtrinsic};
32use subspace_runtime_primitives::opaque::Block as CBlock;
33use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, HeaderFor, Nonce};
34
35const MALICIOUS_OPR_STAKE_MULTIPLIER: Balance = 3;
36
37enum MaliciousOperatorStatus {
38    Registering(OperatorPublicKey),
39    Registered {
40        operator_id: OperatorId,
41        signing_key: OperatorPublicKey,
42    },
43    NoStatus,
44}
45
46impl MaliciousOperatorStatus {
47    fn registering(&mut self, signing_key: OperatorPublicKey) {
48        *self = MaliciousOperatorStatus::Registering(signing_key)
49    }
50
51    fn registered(&mut self, operator_id: OperatorId, signing_key: OperatorPublicKey) {
52        *self = MaliciousOperatorStatus::Registered {
53            operator_id,
54            signing_key,
55        }
56    }
57
58    fn no_status(&mut self) {
59        *self = MaliciousOperatorStatus::NoStatus
60    }
61
62    fn registered_operator(&self) -> Option<(OperatorId, OperatorPublicKey)> {
63        match self {
64            MaliciousOperatorStatus::Registered {
65                operator_id,
66                signing_key,
67            } => Some((*operator_id, signing_key.clone())),
68            _ => None,
69        }
70    }
71
72    fn registering_signing_key(&self) -> Option<OperatorPublicKey> {
73        match self {
74            MaliciousOperatorStatus::Registering(key) => Some(key.clone()),
75            _ => None,
76        }
77    }
78}
79
80pub struct MaliciousBundleProducer<Client, CClient, TransactionPool> {
81    domain_id: DomainId,
82    sudo_account: AccountId,
83    consensus_keystore: KeystorePtr,
84    operator_keystore: KeystorePtr,
85    consensus_client: Arc<CClient>,
86    consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
87    bundle_producer: TestBundleProducer<DomainBlock, CBlock, Client, CClient, TransactionPool>,
88    malicious_bundle_tamper: MaliciousBundleTamper<DomainBlock, CBlock, Client>,
89    malicious_operator_status: MaliciousOperatorStatus,
90}
91
92impl<Client, CClient, TransactionPool> MaliciousBundleProducer<Client, CClient, TransactionPool>
93where
94    Client: HeaderBackend<DomainBlock>
95        + BlockBackend<DomainBlock>
96        + AuxStore
97        + ProvideRuntimeApi<DomainBlock>
98        + 'static,
99    Client::Api: BlockBuilder<DomainBlock>
100        + DomainCoreApi<DomainBlock>
101        + MessengerApi<DomainBlock, NumberFor<CBlock>, BlockHashFor<CBlock>>
102        + TaggedTransactionQueue<DomainBlock>,
103    CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + 'static,
104    CClient::Api: DomainsApi<CBlock, HeaderFor<DomainBlock>>
105        + BundleProducerElectionApi<CBlock, Balance>
106        + AccountNonceApi<CBlock, AccountId, Nonce>,
107    TransactionPool: sc_transaction_pool_api::TransactionPool<
108            Block = DomainBlock,
109            Hash = BlockHashFor<DomainBlock>,
110        > + 'static,
111{
112    pub fn new(
113        domain_id: DomainId,
114        domain_client: Arc<Client>,
115        consensus_client: Arc<CClient>,
116        consensus_keystore: KeystorePtr,
117        consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
118        domain_transaction_pool: Arc<TransactionPool>,
119        sudo_account: AccountId,
120    ) -> Self {
121        let operator_keystore = KeystoreContainer::new(&KeystoreConfig::InMemory)
122            .expect("create in-memory keystore container must succeed")
123            .keystore();
124
125        let domain_bundle_proposer = DomainBundleProposer::new(
126            domain_id,
127            domain_client.clone(),
128            consensus_client.clone(),
129            domain_transaction_pool,
130        );
131
132        let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
133        let bundle_producer = TestBundleProducer::new(
134            domain_id,
135            consensus_client.clone(),
136            domain_client.clone(),
137            domain_bundle_proposer,
138            Arc::new(bundle_sender),
139            operator_keystore.clone(),
140            // The malicious operator doesn't skip empty bundle
141            false,
142            false,
143        );
144
145        let malicious_bundle_tamper =
146            MaliciousBundleTamper::new(domain_client, operator_keystore.clone());
147
148        Self {
149            domain_id,
150            consensus_client,
151            consensus_keystore,
152            operator_keystore,
153            bundle_producer,
154            malicious_bundle_tamper,
155            malicious_operator_status: MaliciousOperatorStatus::NoStatus,
156            sudo_account,
157            consensus_offchain_tx_pool_factory,
158        }
159    }
160
161    async fn handle_new_slot(
162        &mut self,
163        operator_id: OperatorId,
164        new_slot_info: OperatorSlotInfo,
165    ) -> Option<OpaqueBundleFor<DomainBlock, CBlock>> {
166        let slot = new_slot_info.slot;
167        self.bundle_producer
168            .produce_bundle(operator_id, new_slot_info)
169            .unwrap_or_else(move |error| {
170                tracing::error!(
171                    ?slot,
172                    ?operator_id,
173                    ?error,
174                    "Error at malicious operator producing bundle"
175                );
176                None
177            })
178            .await
179            .and_then(|res| res.into_opaque_bundle())
180    }
181
182    pub async fn start<NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static>(
183        mut self,
184        new_slot_notification_stream: NSNS,
185    ) {
186        let mut new_slot_notification_stream = Box::pin(new_slot_notification_stream);
187        while let Some((slot, proof_of_time)) = new_slot_notification_stream.next().await {
188            if let Some((operator_id, signing_key)) =
189                self.malicious_operator_status.registered_operator()
190            {
191                let maybe_opaque_bundle = self
192                    .handle_new_slot(
193                        operator_id,
194                        OperatorSlotInfo {
195                            slot,
196                            proof_of_time,
197                        },
198                    )
199                    .await;
200
201                if let Some(mut opaque_bundle) = maybe_opaque_bundle {
202                    if let Err(err) = self
203                        .malicious_bundle_tamper
204                        .maybe_tamper_bundle(&mut opaque_bundle, &signing_key)
205                    {
206                        tracing::error!(?err, "Got error when try to tamper bundle");
207                    }
208                    if let Err(err) = self.submit_bundle(opaque_bundle) {
209                        tracing::info!(?err, "Malicious operator failed to submit bundle");
210                    }
211                }
212            }
213
214            // Periodically check the malicious operator status
215            if u64::from(slot) % 10 == 0
216                && let Err(err) = self.update_malicious_operator_status()
217            {
218                tracing::error!(?err, "Failed to update malicious operator status");
219            }
220        }
221    }
222
223    fn update_malicious_operator_status(&mut self) -> Result<(), Box<dyn Error>> {
224        let consensus_best_hash = self.consensus_client.info().best_hash;
225        let (mut current_operators, next_operators) = self
226            .consensus_client
227            .runtime_api()
228            .domain_operators(consensus_best_hash, self.domain_id)?
229            .ok_or_else(|| {
230                sp_blockchain::Error::Application(
231                    format!("Operator set for domain {} not found", self.domain_id).into(),
232                )
233            })?;
234
235        if let Some((malicious_operator_id, _)) =
236            self.malicious_operator_status.registered_operator()
237        {
238            if next_operators.contains(&malicious_operator_id) {
239                return Ok(());
240            } else {
241                tracing::info!(
242                    ?malicious_operator_id,
243                    "Current malicious operator is missing from next operator set, probably got slashed"
244                );
245                // Remove the current malicious operator to not account its stake toward
246                // `current_total_stake` otherwise the next malicious operator will stake
247                // more and more fund
248                current_operators.remove(&malicious_operator_id);
249                self.malicious_operator_status.no_status();
250            }
251        }
252
253        let signing_key = match &self.malicious_operator_status.registering_signing_key() {
254            Some(k) => k.clone(),
255            None => {
256                let public_key: OperatorPublicKey = self
257                    .operator_keystore
258                    .sr25519_generate_new(OperatorPublicKey::ID, None)?
259                    .into();
260
261                self.malicious_operator_status
262                    .registering(public_key.clone());
263
264                tracing::info!(?public_key, "Start register new malicious operator");
265
266                public_key
267            }
268        };
269
270        let mut maybe_operator_id = None;
271        for operator_id in current_operators.keys().chain(next_operators.iter()) {
272            if let Some((operator_signing_key, _)) = self
273                .consensus_client
274                .runtime_api()
275                .operator(consensus_best_hash, *operator_id)?
276                && operator_signing_key == signing_key
277            {
278                maybe_operator_id = Some(*operator_id);
279                break;
280            }
281        }
282
283        // If the `signing_key` is linked to a operator, the previous registration request succeeded,
284        // otherwise we need to retry
285        match maybe_operator_id {
286            None => {
287                let nonce = self.sudo_acccount_nonce()?;
288                let current_total_stake: Balance = current_operators.into_values().sum();
289                self.submit_register_operator(
290                    nonce,
291                    signing_key,
292                    // Ideally we should use the `next_total_stake` but it is tricky to get
293                    MALICIOUS_OPR_STAKE_MULTIPLIER * current_total_stake,
294                )?;
295                self.submit_force_staking_epoch_transition(nonce + 1)?;
296            }
297            Some(operator_id) => {
298                if !next_operators.contains(&operator_id) {
299                    // The operator id not present in `next_operators` means the operator is deregistered
300                    // or slashed, which should not happen since we haven't use this operator to submit bad
301                    // ER yet. But just set `malicious_operator_status` to `NoStatus` to register a new operator.
302                    self.malicious_operator_status.no_status();
303                } else if !current_operators.contains_key(&operator_id) {
304                    self.submit_force_staking_epoch_transition(self.sudo_acccount_nonce()?)?;
305                } else {
306                    tracing::info!(
307                        ?operator_id,
308                        ?signing_key,
309                        "Registered a new malicious operator"
310                    );
311                    self.malicious_operator_status
312                        .registered(operator_id, signing_key);
313                }
314            }
315        }
316
317        Ok(())
318    }
319
320    fn sudo_acccount_nonce(&self) -> Result<Nonce, Box<dyn Error>> {
321        Ok(self.consensus_client.runtime_api().account_nonce(
322            self.consensus_client.info().best_hash,
323            self.sudo_account.clone(),
324        )?)
325    }
326
327    fn submit_bundle(
328        &self,
329        opaque_bundle: OpaqueBundleFor<DomainBlock, CBlock>,
330    ) -> Result<(), Box<dyn Error>> {
331        let call = pallet_domains::Call::submit_bundle { opaque_bundle };
332        self.submit_consensus_extrinsic(None, call.into())
333    }
334
335    fn submit_register_operator(
336        &self,
337        nonce: Nonce,
338        signing_key: OperatorPublicKey,
339        staking_amount: Balance,
340    ) -> Result<(), Box<dyn Error>> {
341        let call = pallet_domains::Call::register_operator {
342            domain_id: self.domain_id,
343            amount: staking_amount,
344            config: OperatorConfig {
345                signing_key,
346                minimum_nominator_stake: Balance::MAX,
347                nomination_tax: Default::default(),
348            },
349        };
350        self.submit_consensus_extrinsic(Some(nonce), call.into())
351    }
352
353    fn submit_force_staking_epoch_transition(&self, nonce: Nonce) -> Result<(), Box<dyn Error>> {
354        let call = pallet_sudo::Call::sudo {
355            call: Box::new(RuntimeCall::Domains(
356                pallet_domains::Call::force_staking_epoch_transition {
357                    domain_id: self.domain_id,
358                },
359            )),
360        };
361        self.submit_consensus_extrinsic(Some(nonce), call.into())
362    }
363
364    fn submit_consensus_extrinsic(
365        &self,
366        maybe_nonce: Option<Nonce>,
367        call: RuntimeCall,
368    ) -> Result<(), Box<dyn Error>> {
369        let etx = match maybe_nonce {
370            Some(nonce) => construct_signed_extrinsic(
371                &self.consensus_keystore,
372                self.consensus_client.info(),
373                call.clone(),
374                self.sudo_account.clone(),
375                nonce,
376            )?,
377            None => {
378                // for general unsigned, nonce does not matter.
379                let extra =
380                    get_singed_extra(self.consensus_client.info().best_number.into(), true, 0);
381                UncheckedExtrinsic::new_transaction(call.clone(), extra)
382            }
383        };
384
385        self.consensus_offchain_tx_pool_factory
386            .offchain_transaction_pool(self.consensus_client.info().best_hash)
387            .submit_transaction(etx.encode())
388            .map_err(|err| {
389                sp_blockchain::Error::Application(
390                    format!("Failed to submit consensus extrinsic, call {call:?}, err {err:?}")
391                        .into(),
392                )
393            })?;
394
395        Ok(())
396    }
397}
398
399fn get_singed_extra(best_number: u64, immortal: bool, nonce: Nonce) -> SignedExtra {
400    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
401        .checked_next_power_of_two()
402        .map(|c| c / 2)
403        .unwrap_or(2);
404    (
405        frame_system::CheckNonZeroSender::<Runtime>::new(),
406        frame_system::CheckSpecVersion::<Runtime>::new(),
407        frame_system::CheckTxVersion::<Runtime>::new(),
408        frame_system::CheckGenesis::<Runtime>::new(),
409        frame_system::CheckMortality::<Runtime>::from(if immortal {
410            generic::Era::Immortal
411        } else {
412            generic::Era::mortal(period, best_number)
413        }),
414        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
415        frame_system::CheckWeight::<Runtime>::new(),
416        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(0u128),
417        DisablePallets,
418        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
419        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
420        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
421    )
422}
423
424pub fn construct_signed_extrinsic(
425    consensus_keystore: &KeystorePtr,
426    consensus_chain_info: Info<CBlock>,
427    call: RuntimeCall,
428    caller: AccountId,
429    nonce: Nonce,
430) -> Result<UncheckedExtrinsic, Box<dyn Error>> {
431    let extra = get_singed_extra(consensus_chain_info.best_number.into(), false, nonce);
432    let raw_payload = generic::SignedPayload::<RuntimeCall, SignedExtra>::from_raw(
433        call.clone(),
434        extra.clone(),
435        (
436            (),
437            subspace_runtime::VERSION.spec_version,
438            subspace_runtime::VERSION.transaction_version,
439            consensus_chain_info.genesis_hash,
440            consensus_chain_info.best_hash,
441            (),
442            (),
443            (),
444            (),
445            (),
446            (),
447            (),
448        ),
449    );
450
451    let signature = match Sr25519Keyring::from_account_id(&caller) {
452        Some(keyring) => raw_payload.using_encoded(|e| keyring.sign(e)),
453        None => {
454            let public_key =
455                sp_core::sr25519::Public::unchecked_from(<AccountId as Into<[u8; 32]>>::into(
456                    caller.clone(),
457                ));
458            raw_payload
459                .using_encoded(|e| {
460                    consensus_keystore
461                        .sr25519_sign(OperatorPublicKey::ID, &public_key, e)
462                })?
463                .ok_or(format!(
464                    "Failed to sign extrinsic, sudo key pair missing from keystore?, public_key {public_key:?}"
465                ))?
466        }
467    };
468
469    Ok(UncheckedExtrinsic::new_signed(
470        call,
471        sp_runtime::MultiAddress::Id(caller),
472        signature.into(),
473        extra,
474    ))
475}