domain_client_operator/
domain_bundle_proposer.rs

1use crate::ExecutionReceiptFor;
2use domain_runtime_primitives::CheckExtrinsicsValidityError;
3use futures::{select, FutureExt};
4use parity_scale_codec::Encode;
5use sc_client_api::{AuxStore, BlockBackend};
6use sc_transaction_pool_api::InPoolTransaction;
7use sp_api::{ApiExt, ProvideRuntimeApi};
8use sp_block_builder::BlockBuilder;
9use sp_blockchain::HeaderBackend;
10use sp_domains::core_api::DomainCoreApi;
11use sp_domains::{
12    BundleHeader, DomainId, DomainsApi, ExecutionReceipt, HeaderHashingFor, OperatorId,
13    ProofOfElection,
14};
15use sp_messenger::MessengerApi;
16use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor, One, Zero};
17use sp_runtime::Percent;
18use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
19use sp_weights::Weight;
20use std::collections::HashSet;
21use std::marker::PhantomData;
22use std::sync::Arc;
23use std::time;
24use subspace_core_primitives::U256;
25use subspace_runtime_primitives::{Balance, BlockHashFor, ExtrinsicFor, HeaderFor};
26
27/// If the bundle utilization is below `BUNDLE_UTILIZATION_THRESHOLD` we will attempt to push
28/// at most `MAX_SKIPPED_TRANSACTIONS` number of transactions before quitting for real.
29const MAX_SKIPPED_TRANSACTIONS: usize = 8;
30
31const BUNDLE_UTILIZATION_THRESHOLD: Percent = Percent::from_percent(95);
32
33// `PreviousBundledTx` used to keep track of tx that have included in previous bundle and avoid
34// to re-including these transactions in the next bundle if the consensus hash did not change.
35struct PreviousBundledTx<Block: BlockT, CBlock: BlockT> {
36    bundled_at: CBlock::Hash,
37    tx_hashes: HashSet<Block::Hash>,
38}
39
40impl<Block: BlockT, CBlock: BlockT> PreviousBundledTx<Block, CBlock> {
41    fn new() -> Self {
42        PreviousBundledTx {
43            bundled_at: Default::default(),
44            tx_hashes: HashSet::new(),
45        }
46    }
47
48    fn already_bundled(&self, tx_hash: &Block::Hash) -> bool {
49        self.tx_hashes.contains(tx_hash)
50    }
51
52    fn maybe_clear(&mut self, consensus_hash: CBlock::Hash) {
53        if self.bundled_at != consensus_hash {
54            self.bundled_at = consensus_hash;
55            self.tx_hashes.clear();
56        }
57    }
58
59    fn add_bundled(&mut self, tx_hash: Block::Hash) {
60        self.tx_hashes.insert(tx_hash);
61    }
62}
63
64pub struct DomainBundleProposer<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> {
65    domain_id: DomainId,
66    client: Arc<Client>,
67    consensus_client: Arc<CClient>,
68    transaction_pool: Arc<TransactionPool>,
69    previous_bundled_tx: PreviousBundledTx<Block, CBlock>,
70    _phantom_data: PhantomData<(Block, CBlock)>,
71}
72
73impl<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> Clone
74    for DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
75{
76    fn clone(&self) -> Self {
77        Self {
78            domain_id: self.domain_id,
79            client: self.client.clone(),
80            consensus_client: self.consensus_client.clone(),
81            transaction_pool: self.transaction_pool.clone(),
82            previous_bundled_tx: PreviousBundledTx::new(),
83            _phantom_data: self._phantom_data,
84        }
85    }
86}
87
88pub(super) type ProposeBundleOutput<Block, CBlock> = (
89    BundleHeader<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>,
90    Vec<ExtrinsicFor<Block>>,
91);
92
93impl<Block, Client, CBlock, CClient, TransactionPool>
94    DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
95where
96    Block: BlockT,
97    CBlock: BlockT,
98    NumberFor<Block>: Into<NumberFor<CBlock>>,
99    Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
100    Client::Api: BlockBuilder<Block>
101        + DomainCoreApi<Block>
102        + TaggedTransactionQueue<Block>
103        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
104    CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
105    CClient::Api: DomainsApi<CBlock, Block::Header>,
106    TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
107{
108    pub fn new(
109        domain_id: DomainId,
110        client: Arc<Client>,
111        consensus_client: Arc<CClient>,
112        transaction_pool: Arc<TransactionPool>,
113    ) -> Self {
114        Self {
115            domain_id,
116            client,
117            consensus_client,
118            transaction_pool,
119            previous_bundled_tx: PreviousBundledTx::new(),
120            _phantom_data: PhantomData,
121        }
122    }
123
124    pub(crate) async fn propose_bundle_at(
125        &mut self,
126        proof_of_election: ProofOfElection,
127        tx_range: U256,
128        operator_id: OperatorId,
129        receipt: ExecutionReceiptFor<Block, CBlock>,
130    ) -> sp_blockchain::Result<ProposeBundleOutput<Block, CBlock>> {
131        // NOTE: use the domain block that derive the ER to validate the extrinsic to be included
132        // in the bundle, so the validity of the extrinsic is committed to the ER that submited together.
133        let (parent_number, parent_hash) = (receipt.domain_block_number, receipt.domain_block_hash);
134        let consensus_best_hash = self.consensus_client.info().best_hash;
135
136        let mut t1 = self.transaction_pool.ready_at(parent_hash).fuse();
137        // TODO: proper timeout
138        let mut t2 = futures_timer::Delay::new(time::Duration::from_micros(100)).fuse();
139
140        let pending_iterator = select! {
141            res = t1 => res,
142            _ = t2 => {
143                tracing::warn!(
144                    "Timeout fired waiting for transaction pool at #{parent_number},{parent_hash}, \
145                    proceeding with bundle production."
146                );
147                self.transaction_pool.ready()
148            }
149        };
150
151        // Clear the previous bundled tx info whenever the consensus chain tip is changed,
152        // this allow the operator to retry for the previous bundled tx in case the previous
153        // bundle fail to submit to the consensus chain due to any reason.
154        self.previous_bundled_tx.maybe_clear(consensus_best_hash);
155
156        let (domain_bundle_limit, storage_fund_balance, transaction_byte_fee) = {
157            let consensus_runtime_api = self.consensus_client.runtime_api();
158            // Some APIs are only present in API versions 3 and later. On earlier versions, we need to
159            // call legacy code.
160            // TODO: remove version check before next network
161            let domains_api_version = consensus_runtime_api
162                .api_version::<dyn DomainsApi<CBlock, CBlock::Header>>(consensus_best_hash)?
163                // It is safe to return a default version of 1, since there will always be version 1.
164                .unwrap_or(1);
165
166            let domain_bundle_limit = consensus_runtime_api
167                .domain_bundle_limit(consensus_best_hash, self.domain_id)?
168                .ok_or_else(|| {
169                    sp_blockchain::Error::Application(
170                        format!("Domain bundle limit for {:?} not found", self.domain_id).into(),
171                    )
172                })?;
173
174            let storage_fund_balance = consensus_runtime_api
175                .storage_fund_account_balance(consensus_best_hash, operator_id)?;
176
177            let transaction_byte_fee = if domains_api_version >= 3 {
178                consensus_runtime_api.consensus_transaction_byte_fee(consensus_best_hash)?
179            } else {
180                #[allow(deprecated)]
181                consensus_runtime_api.consensus_chain_byte_fee(consensus_best_hash)?
182            };
183
184            (
185                domain_bundle_limit,
186                storage_fund_balance,
187                transaction_byte_fee,
188            )
189        };
190
191        let bundle_vrf_hash = U256::from_be_bytes(*proof_of_election.vrf_hash());
192
193        let header_size = receipt.encoded_size()
194            + proof_of_election.encoded_size()
195            + domain_bundle_limit.max_bundle_weight.encoded_size()
196            // Extrinsics root size
197            + 32
198            // Header signature size
199            + 64;
200
201        let mut extrinsics = Vec::new();
202        let mut estimated_bundle_weight = Weight::default();
203        let mut bundle_size = 0u32;
204        let mut skipped = 0;
205
206        // Separate code block to make sure that runtime api instance is dropped after validation is done.
207        {
208            // We are using one runtime api instance here to maintain storage changes in the instance's internal buffer
209            // between runtime calls done in this loop.
210            let runtime_api_instance = self.client.runtime_api();
211            for pending_tx in pending_iterator {
212                let pending_tx_data = pending_tx.data();
213
214                let is_within_tx_range = runtime_api_instance
215                    .is_within_tx_range(parent_hash, pending_tx_data, &bundle_vrf_hash, &tx_range)
216                    .map_err(|err| {
217                        tracing::error!(
218                            ?err,
219                            ?pending_tx_data,
220                            "Error occurred in locating the tx range"
221                        );
222                    })
223                    .unwrap_or(false);
224                if !is_within_tx_range {
225                    continue;
226                }
227
228                // Skip the tx if it is already bundled by a recent bundle
229                if self
230                    .previous_bundled_tx
231                    .already_bundled(&self.transaction_pool.hash_of(pending_tx_data))
232                {
233                    continue;
234                }
235
236                let tx_weight = runtime_api_instance
237                    .extrinsic_weight(parent_hash, pending_tx_data)
238                    .map_err(|error| {
239                        sp_blockchain::Error::Application(Box::from(format!(
240                            "Error getting extrinsic weight: {error}"
241                        )))
242                    })?;
243                let next_estimated_bundle_weight =
244                    estimated_bundle_weight.saturating_add(tx_weight);
245                if next_estimated_bundle_weight.any_gt(domain_bundle_limit.max_bundle_weight) {
246                    if skipped < MAX_SKIPPED_TRANSACTIONS
247                        && Percent::from_rational(
248                            estimated_bundle_weight.ref_time(),
249                            domain_bundle_limit.max_bundle_weight.ref_time(),
250                        ) < BUNDLE_UTILIZATION_THRESHOLD
251                    {
252                        skipped += 1;
253                        continue;
254                    } else {
255                        break;
256                    }
257                }
258
259                let next_bundle_size = bundle_size + pending_tx_data.encoded_size() as u32;
260                if next_bundle_size > domain_bundle_limit.max_bundle_size {
261                    if skipped < MAX_SKIPPED_TRANSACTIONS
262                        && Percent::from_rational(bundle_size, domain_bundle_limit.max_bundle_size)
263                            < BUNDLE_UTILIZATION_THRESHOLD
264                    {
265                        skipped += 1;
266                        continue;
267                    } else {
268                        break;
269                    }
270                }
271
272                let next_bundle_storage_fee =
273                    (header_size as u32 + next_bundle_size) as u128 * transaction_byte_fee;
274                if next_bundle_storage_fee > storage_fund_balance {
275                    tracing::warn!(
276                        ?next_bundle_storage_fee,
277                        ?storage_fund_balance,
278                        "Insufficient storage fund balance to pay for the bundle storage fee"
279                    );
280                    break;
281                }
282
283                // Double check XDM before adding it to the bundle
284                if let Some(false) =
285                    runtime_api_instance.is_xdm_mmr_proof_valid(parent_hash, pending_tx_data)?
286                {
287                    continue;
288                }
289
290                // Double check the transaction validity, because the tx pool are re-validate the transaction
291                // in pool asynchronously so there is race condition that the operator imported a domain block
292                // and start producing bundle immediately before the re-validation based on the latest block
293                // is finished, cause the bundle contains illegal tx accidentally and being considered as invalid
294                // bundle and slashing on the honest operator.
295                //
296                // This check is done in similar fashion to block builder's build block.
297                // This check needs to be the last check as otherwise if the tx won't be part of bundle due to
298                // some other checks, its side effect will still be part of RuntimeApiImpl's changes buffer.
299                let transaction_validity_result =
300                    runtime_api_instance.execute_in_transaction(|api| {
301                        let transaction_validity_result = api.check_extrinsics_and_do_pre_dispatch(
302                            parent_hash,
303                            // Ideally, we should pass the whole `extrinsics` to keep consistency with ER derivation
304                            // and FP verification but it will be constly, so instead we do another final check that
305                            // pass the whole `extrinsics` to `check_extrinsics_and_do_pre_dispatch` before returning
306                            // the `extrinsics` to construct bundle.
307                            vec![pending_tx_data.as_ref().clone()],
308                            parent_number,
309                            parent_hash,
310                        );
311                        // Only commit, if there are no errors (both ApiError and CheckTxValidityError)
312                        if let Ok(Ok(_)) = transaction_validity_result {
313                            sp_api::TransactionOutcome::Commit(transaction_validity_result)
314                        } else {
315                            sp_api::TransactionOutcome::Rollback(transaction_validity_result)
316                        }
317                    })?;
318                if transaction_validity_result.is_err() {
319                    continue;
320                }
321
322                estimated_bundle_weight = next_estimated_bundle_weight;
323                bundle_size = next_bundle_size;
324                extrinsics.push(pending_tx_data.as_ref().clone());
325
326                self.previous_bundled_tx
327                    .add_bundled(self.transaction_pool.hash_of(pending_tx_data));
328            }
329        }
330
331        // As a final check, call `check_extrinsics_and_do_pre_dispatch` with all extrinsics,
332        // this is consistent with ER derivation and FP verification
333        if let Err(CheckExtrinsicsValidityError {
334            extrinsic_index,
335            transaction_validity_error,
336        }) = self
337            .client
338            .runtime_api()
339            .check_extrinsics_and_do_pre_dispatch(
340                parent_hash,
341                extrinsics.clone(),
342                parent_number,
343                parent_hash,
344            )?
345        {
346            tracing::warn!("Unexpected error when validating all the extrinsics at once: {transaction_validity_error:?}");
347
348            // Truncate to remove the invalid extrinsic (and any extrinsic after it), so only
349            // the valid exrinsic will be used to construct bundle.
350            extrinsics.truncate(extrinsic_index as usize);
351        }
352
353        let extrinsics_root = HeaderHashingFor::<Block::Header>::ordered_trie_root(
354            extrinsics.iter().map(|xt| xt.encode()).collect(),
355            sp_core::storage::StateVersion::V1,
356        );
357
358        let header = BundleHeader {
359            proof_of_election,
360            receipt,
361            estimated_bundle_weight,
362            bundle_extrinsics_root: extrinsics_root,
363        };
364
365        Ok((header, extrinsics))
366    }
367
368    /// Returns the receipt in the next domain bundle.
369    pub fn load_next_receipt(
370        &self,
371        domain_best_number_onchain: NumberFor<Block>,
372        head_receipt_number: NumberFor<Block>,
373    ) -> sp_blockchain::Result<ExecutionReceiptFor<Block, CBlock>> {
374        tracing::trace!(
375            ?domain_best_number_onchain,
376            ?head_receipt_number,
377            "Collecting receipt"
378        );
379
380        // Both `domain_best_number_onchain` and `head_receipt_number` are zero means the domain just
381        // instantiated and nothing have submitted yet so submit the genesis receipt
382        if domain_best_number_onchain.is_zero() && head_receipt_number.is_zero() {
383            let genesis_hash = self.client.info().genesis_hash;
384            let genesis_header = self.client.header(genesis_hash)?.ok_or_else(|| {
385                sp_blockchain::Error::Backend(format!(
386                    "Domain block header for #{genesis_hash:?} not found",
387                ))
388            })?;
389
390            return Ok(ExecutionReceipt::genesis(
391                *genesis_header.state_root(),
392                *genesis_header.extrinsics_root(),
393                genesis_hash,
394            ));
395        }
396
397        // The next receipt must extend the current head receipt
398        let receipt_number = head_receipt_number + One::one();
399
400        // Get the domain block hash corresponding to `receipt_number` in the domain canonical chain
401        let domain_hash = self.client.hash(receipt_number)?.ok_or_else(|| {
402            sp_blockchain::Error::Backend(format!(
403                "Domain block hash for #{receipt_number:?} not found"
404            ))
405        })?;
406
407        crate::load_execution_receipt_by_domain_hash::<Block, CBlock, _>(
408            &*self.client,
409            domain_hash,
410            receipt_number,
411        )
412    }
413}