subspace_malicious_operator/
malicious_bundle_producer.rs

1use crate::malicious_bundle_tamper::MaliciousBundleTamper;
2use domain_client_operator::domain_bundle_producer::{BundleProducer, TestBundleProducer};
3use domain_client_operator::domain_bundle_proposer::DomainBundleProposer;
4use domain_client_operator::{OpaqueBundleFor, OperatorSlotInfo};
5use domain_runtime_primitives::opaque::Block as DomainBlock;
6use frame_system_rpc_runtime_api::AccountNonceApi;
7use futures::{Stream, StreamExt, TryFutureExt};
8use pallet_domains::OperatorConfig;
9use parity_scale_codec::Encode;
10use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
11use sc_service::KeystoreContainer;
12use sc_service::config::KeystoreConfig;
13use sc_transaction_pool_api::OffchainTransactionPoolFactory;
14use sc_utils::mpsc::tracing_unbounded;
15use sp_api::ProvideRuntimeApi;
16use sp_block_builder::BlockBuilder;
17use sp_blockchain::Info;
18use sp_consensus_slots::Slot;
19use sp_core::crypto::UncheckedFrom;
20use sp_domains::core_api::DomainCoreApi;
21use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey};
22use sp_keyring::Sr25519Keyring;
23use sp_keystore::{Keystore, KeystorePtr};
24use sp_messenger::MessengerApi;
25use sp_runtime::traits::NumberFor;
26use sp_runtime::{RuntimeAppPublic, generic};
27use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
28use std::error::Error;
29use std::sync::Arc;
30use subspace_core_primitives::pot::PotOutput;
31use subspace_runtime::{Runtime, RuntimeCall, SignedExtra, UncheckedExtrinsic};
32use subspace_runtime_primitives::extension::BalanceTransferCheckExtension;
33use subspace_runtime_primitives::opaque::Block as CBlock;
34use subspace_runtime_primitives::{AccountId, Balance, BlockHashFor, HeaderFor, Nonce};
35
36const MALICIOUS_OPR_STAKE_MULTIPLIER: Balance = 3;
37
38enum MaliciousOperatorStatus {
39    Registering(OperatorPublicKey),
40    Registered {
41        operator_id: OperatorId,
42        signing_key: OperatorPublicKey,
43    },
44    NoStatus,
45}
46
47impl MaliciousOperatorStatus {
48    fn registering(&mut self, signing_key: OperatorPublicKey) {
49        *self = MaliciousOperatorStatus::Registering(signing_key)
50    }
51
52    fn registered(&mut self, operator_id: OperatorId, signing_key: OperatorPublicKey) {
53        *self = MaliciousOperatorStatus::Registered {
54            operator_id,
55            signing_key,
56        }
57    }
58
59    fn no_status(&mut self) {
60        *self = MaliciousOperatorStatus::NoStatus
61    }
62
63    fn registered_operator(&self) -> Option<(OperatorId, OperatorPublicKey)> {
64        match self {
65            MaliciousOperatorStatus::Registered {
66                operator_id,
67                signing_key,
68            } => Some((*operator_id, signing_key.clone())),
69            _ => None,
70        }
71    }
72
73    fn registering_signing_key(&self) -> Option<OperatorPublicKey> {
74        match self {
75            MaliciousOperatorStatus::Registering(key) => Some(key.clone()),
76            _ => None,
77        }
78    }
79}
80
81pub struct MaliciousBundleProducer<Client, CClient, TransactionPool> {
82    domain_id: DomainId,
83    sudo_account: AccountId,
84    consensus_keystore: KeystorePtr,
85    operator_keystore: KeystorePtr,
86    consensus_client: Arc<CClient>,
87    consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
88    bundle_producer: TestBundleProducer<DomainBlock, CBlock, Client, CClient, TransactionPool>,
89    malicious_bundle_tamper: MaliciousBundleTamper<DomainBlock, CBlock, Client>,
90    malicious_operator_status: MaliciousOperatorStatus,
91}
92
93impl<Client, CClient, TransactionPool> MaliciousBundleProducer<Client, CClient, TransactionPool>
94where
95    Client: HeaderBackend<DomainBlock>
96        + BlockBackend<DomainBlock>
97        + AuxStore
98        + ProvideRuntimeApi<DomainBlock>
99        + 'static,
100    Client::Api: BlockBuilder<DomainBlock>
101        + DomainCoreApi<DomainBlock>
102        + MessengerApi<DomainBlock, NumberFor<CBlock>, BlockHashFor<CBlock>>
103        + TaggedTransactionQueue<DomainBlock>,
104    CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock> + 'static,
105    CClient::Api: DomainsApi<CBlock, HeaderFor<DomainBlock>>
106        + BundleProducerElectionApi<CBlock, Balance>
107        + AccountNonceApi<CBlock, AccountId, Nonce>,
108    TransactionPool: sc_transaction_pool_api::TransactionPool<
109            Block = DomainBlock,
110            Hash = BlockHashFor<DomainBlock>,
111        > + 'static,
112{
113    pub fn new(
114        domain_id: DomainId,
115        domain_client: Arc<Client>,
116        consensus_client: Arc<CClient>,
117        consensus_keystore: KeystorePtr,
118        consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
119        domain_transaction_pool: Arc<TransactionPool>,
120        sudo_account: AccountId,
121    ) -> Self {
122        let operator_keystore = KeystoreContainer::new(&KeystoreConfig::InMemory)
123            .expect("create in-memory keystore container must succeed")
124            .keystore();
125
126        let domain_bundle_proposer = DomainBundleProposer::new(
127            domain_id,
128            domain_client.clone(),
129            consensus_client.clone(),
130            domain_transaction_pool,
131        );
132
133        let (bundle_sender, _bundle_receiver) = tracing_unbounded("domain_bundle_stream", 100);
134        let bundle_producer = TestBundleProducer::new(
135            domain_id,
136            consensus_client.clone(),
137            domain_client.clone(),
138            domain_bundle_proposer,
139            Arc::new(bundle_sender),
140            operator_keystore.clone(),
141            // The malicious operator doesn't skip empty bundle
142            false,
143            false,
144        );
145
146        let malicious_bundle_tamper =
147            MaliciousBundleTamper::new(domain_client, operator_keystore.clone());
148
149        Self {
150            domain_id,
151            consensus_client,
152            consensus_keystore,
153            operator_keystore,
154            bundle_producer,
155            malicious_bundle_tamper,
156            malicious_operator_status: MaliciousOperatorStatus::NoStatus,
157            sudo_account,
158            consensus_offchain_tx_pool_factory,
159        }
160    }
161
162    async fn handle_new_slot(
163        &mut self,
164        operator_id: OperatorId,
165        new_slot_info: OperatorSlotInfo,
166    ) -> Option<OpaqueBundleFor<DomainBlock, CBlock>> {
167        let slot = new_slot_info.slot;
168        self.bundle_producer
169            .produce_bundle(operator_id, new_slot_info)
170            .unwrap_or_else(move |error| {
171                tracing::error!(
172                    ?slot,
173                    ?operator_id,
174                    ?error,
175                    "Error at malicious operator producing bundle"
176                );
177                None
178            })
179            .await
180            .and_then(|res| res.into_opaque_bundle())
181    }
182
183    pub async fn start<NSNS: Stream<Item = (Slot, PotOutput)> + Send + 'static>(
184        mut self,
185        new_slot_notification_stream: NSNS,
186    ) {
187        let mut new_slot_notification_stream = Box::pin(new_slot_notification_stream);
188        while let Some((slot, proof_of_time)) = new_slot_notification_stream.next().await {
189            if let Some((operator_id, signing_key)) =
190                self.malicious_operator_status.registered_operator()
191            {
192                let maybe_opaque_bundle = self
193                    .handle_new_slot(
194                        operator_id,
195                        OperatorSlotInfo {
196                            slot,
197                            proof_of_time,
198                        },
199                    )
200                    .await;
201
202                if let Some(mut opaque_bundle) = maybe_opaque_bundle {
203                    if let Err(err) = self
204                        .malicious_bundle_tamper
205                        .maybe_tamper_bundle(&mut opaque_bundle, &signing_key)
206                    {
207                        tracing::error!(?err, "Got error when try to tamper bundle");
208                    }
209                    if let Err(err) = self.submit_bundle(opaque_bundle) {
210                        tracing::info!(?err, "Malicious operator failed to submit bundle");
211                    }
212                }
213            }
214
215            // Periodically check the malicious operator status
216            if u64::from(slot) % 10 == 0
217                && let Err(err) = self.update_malicious_operator_status()
218            {
219                tracing::error!(?err, "Failed to update malicious operator status");
220            }
221        }
222    }
223
224    fn update_malicious_operator_status(&mut self) -> Result<(), Box<dyn Error>> {
225        let consensus_best_hash = self.consensus_client.info().best_hash;
226        let (mut current_operators, next_operators) = self
227            .consensus_client
228            .runtime_api()
229            .domain_operators(consensus_best_hash, self.domain_id)?
230            .ok_or_else(|| {
231                sp_blockchain::Error::Application(
232                    format!("Operator set for domain {} not found", self.domain_id).into(),
233                )
234            })?;
235
236        if let Some((malicious_operator_id, _)) =
237            self.malicious_operator_status.registered_operator()
238        {
239            if next_operators.contains(&malicious_operator_id) {
240                return Ok(());
241            } else {
242                tracing::info!(
243                    ?malicious_operator_id,
244                    "Current malicious operator is missing from next operator set, probably got slashed"
245                );
246                // Remove the current malicious operator to not account its stake toward
247                // `current_total_stake` otherwise the next malicious operator will stake
248                // more and more fund
249                current_operators.remove(&malicious_operator_id);
250                self.malicious_operator_status.no_status();
251            }
252        }
253
254        let signing_key = match &self.malicious_operator_status.registering_signing_key() {
255            Some(k) => k.clone(),
256            None => {
257                let public_key: OperatorPublicKey = self
258                    .operator_keystore
259                    .sr25519_generate_new(OperatorPublicKey::ID, None)?
260                    .into();
261
262                self.malicious_operator_status
263                    .registering(public_key.clone());
264
265                tracing::info!(?public_key, "Start register new malicious operator");
266
267                public_key
268            }
269        };
270
271        let mut maybe_operator_id = None;
272        for operator_id in current_operators.keys().chain(next_operators.iter()) {
273            if let Some((operator_signing_key, _)) = self
274                .consensus_client
275                .runtime_api()
276                .operator(consensus_best_hash, *operator_id)?
277                && operator_signing_key == signing_key
278            {
279                maybe_operator_id = Some(*operator_id);
280                break;
281            }
282        }
283
284        // If the `signing_key` is linked to a operator, the previous registration request succeeded,
285        // otherwise we need to retry
286        match maybe_operator_id {
287            None => {
288                let nonce = self.sudo_acccount_nonce()?;
289                let current_total_stake: Balance = current_operators.into_values().sum();
290                self.submit_register_operator(
291                    nonce,
292                    signing_key,
293                    // Ideally we should use the `next_total_stake` but it is tricky to get
294                    MALICIOUS_OPR_STAKE_MULTIPLIER * current_total_stake,
295                )?;
296                self.submit_force_staking_epoch_transition(nonce + 1)?;
297            }
298            Some(operator_id) => {
299                if !next_operators.contains(&operator_id) {
300                    // The operator id not present in `next_operators` means the operator is deregistered
301                    // or slashed, which should not happen since we haven't use this operator to submit bad
302                    // ER yet. But just set `malicious_operator_status` to `NoStatus` to register a new operator.
303                    self.malicious_operator_status.no_status();
304                } else if !current_operators.contains_key(&operator_id) {
305                    self.submit_force_staking_epoch_transition(self.sudo_acccount_nonce()?)?;
306                } else {
307                    tracing::info!(
308                        ?operator_id,
309                        ?signing_key,
310                        "Registered a new malicious operator"
311                    );
312                    self.malicious_operator_status
313                        .registered(operator_id, signing_key);
314                }
315            }
316        }
317
318        Ok(())
319    }
320
321    fn sudo_acccount_nonce(&self) -> Result<Nonce, Box<dyn Error>> {
322        Ok(self.consensus_client.runtime_api().account_nonce(
323            self.consensus_client.info().best_hash,
324            self.sudo_account.clone(),
325        )?)
326    }
327
328    fn submit_bundle(
329        &self,
330        opaque_bundle: OpaqueBundleFor<DomainBlock, CBlock>,
331    ) -> Result<(), Box<dyn Error>> {
332        let call = pallet_domains::Call::submit_bundle { opaque_bundle };
333        self.submit_consensus_extrinsic(None, call.into())
334    }
335
336    fn submit_register_operator(
337        &self,
338        nonce: Nonce,
339        signing_key: OperatorPublicKey,
340        staking_amount: Balance,
341    ) -> Result<(), Box<dyn Error>> {
342        let call = pallet_domains::Call::register_operator {
343            domain_id: self.domain_id,
344            amount: staking_amount,
345            config: OperatorConfig {
346                signing_key,
347                minimum_nominator_stake: Balance::MAX,
348                nomination_tax: Default::default(),
349            },
350        };
351        self.submit_consensus_extrinsic(Some(nonce), call.into())
352    }
353
354    fn submit_force_staking_epoch_transition(&self, nonce: Nonce) -> Result<(), Box<dyn Error>> {
355        let call = pallet_sudo::Call::sudo {
356            call: Box::new(RuntimeCall::Domains(
357                pallet_domains::Call::force_staking_epoch_transition {
358                    domain_id: self.domain_id,
359                },
360            )),
361        };
362        self.submit_consensus_extrinsic(Some(nonce), call.into())
363    }
364
365    fn submit_consensus_extrinsic(
366        &self,
367        maybe_nonce: Option<Nonce>,
368        call: RuntimeCall,
369    ) -> Result<(), Box<dyn Error>> {
370        let etx = match maybe_nonce {
371            Some(nonce) => construct_signed_extrinsic(
372                &self.consensus_keystore,
373                self.consensus_client.info(),
374                call.clone(),
375                self.sudo_account.clone(),
376                nonce,
377            )?,
378            None => {
379                // for general unsigned, nonce does not matter.
380                let extra =
381                    get_singed_extra(self.consensus_client.info().best_number.into(), true, 0);
382                UncheckedExtrinsic::new_transaction(call.clone(), extra)
383            }
384        };
385
386        self.consensus_offchain_tx_pool_factory
387            .offchain_transaction_pool(self.consensus_client.info().best_hash)
388            .submit_transaction(etx.encode())
389            .map_err(|err| {
390                sp_blockchain::Error::Application(
391                    format!("Failed to submit consensus extrinsic, call {call:?}, err {err:?}")
392                        .into(),
393                )
394            })?;
395
396        Ok(())
397    }
398}
399
400fn get_singed_extra(best_number: u64, immortal: bool, nonce: Nonce) -> SignedExtra {
401    let period = u64::from(<<Runtime as frame_system::Config>::BlockHashCount>::get())
402        .checked_next_power_of_two()
403        .map(|c| c / 2)
404        .unwrap_or(2);
405    (
406        frame_system::CheckNonZeroSender::<Runtime>::new(),
407        frame_system::CheckSpecVersion::<Runtime>::new(),
408        frame_system::CheckTxVersion::<Runtime>::new(),
409        frame_system::CheckGenesis::<Runtime>::new(),
410        frame_system::CheckMortality::<Runtime>::from(if immortal {
411            generic::Era::Immortal
412        } else {
413            generic::Era::mortal(period, best_number)
414        }),
415        frame_system::CheckNonce::<Runtime>::from(nonce.into()),
416        frame_system::CheckWeight::<Runtime>::new(),
417        pallet_transaction_payment::ChargeTransactionPayment::<Runtime>::from(0u128),
418        BalanceTransferCheckExtension::<Runtime>::default(),
419        pallet_subspace::extensions::SubspaceExtension::<Runtime>::new(),
420        pallet_domains::extensions::DomainsExtension::<Runtime>::new(),
421        pallet_messenger::extensions::MessengerExtension::<Runtime>::new(),
422    )
423}
424
425pub fn construct_signed_extrinsic(
426    consensus_keystore: &KeystorePtr,
427    consensus_chain_info: Info<CBlock>,
428    call: RuntimeCall,
429    caller: AccountId,
430    nonce: Nonce,
431) -> Result<UncheckedExtrinsic, Box<dyn Error>> {
432    let extra = get_singed_extra(consensus_chain_info.best_number.into(), false, nonce);
433    let raw_payload = generic::SignedPayload::<RuntimeCall, SignedExtra>::from_raw(
434        call.clone(),
435        extra.clone(),
436        (
437            (),
438            subspace_runtime::VERSION.spec_version,
439            subspace_runtime::VERSION.transaction_version,
440            consensus_chain_info.genesis_hash,
441            consensus_chain_info.best_hash,
442            (),
443            (),
444            (),
445            (),
446            (),
447            (),
448            (),
449        ),
450    );
451
452    let signature = match Sr25519Keyring::from_account_id(&caller) {
453        Some(keyring) => raw_payload.using_encoded(|e| keyring.sign(e)),
454        None => {
455            let public_key =
456                sp_core::sr25519::Public::unchecked_from(<AccountId as Into<[u8; 32]>>::into(
457                    caller.clone(),
458                ));
459            raw_payload
460                .using_encoded(|e| {
461                    consensus_keystore
462                        .sr25519_sign(OperatorPublicKey::ID, &public_key, e)
463                })?
464                .ok_or(format!(
465                    "Failed to sign extrinsic, sudo key pair missing from keystore?, public_key {public_key:?}"
466                ))?
467        }
468    };
469
470    Ok(UncheckedExtrinsic::new_signed(
471        call,
472        sp_runtime::MultiAddress::Id(caller),
473        signature.into(),
474        extra,
475    ))
476}