domain_client_operator/
domain_bundle_proposer.rs

1use crate::ExecutionReceiptFor;
2use crate::domain_bundle_producer::BundleHeaderFor;
3use domain_runtime_primitives::CheckExtrinsicsValidityError;
4use futures::{FutureExt, select};
5use parity_scale_codec::Encode;
6use sc_client_api::{AuxStore, BlockBackend};
7use sc_transaction_pool_api::InPoolTransaction;
8use sp_api::{ApiExt, ProvideRuntimeApi};
9use sp_block_builder::BlockBuilder;
10use sp_blockchain::HeaderBackend;
11use sp_domains::bundle::BundleVersion;
12use sp_domains::bundle::bundle_v0::BundleHeaderV0;
13use sp_domains::core_api::DomainCoreApi;
14use sp_domains::execution_receipt::ExecutionReceiptVersion;
15use sp_domains::{DomainId, DomainsApi, HeaderHashingFor, OperatorId, ProofOfElection};
16use sp_messenger::MessengerApi;
17use sp_runtime::Percent;
18use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor, One, Zero};
19use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
20use sp_weights::Weight;
21use std::collections::HashSet;
22use std::marker::PhantomData;
23use std::sync::Arc;
24use std::time;
25use subspace_core_primitives::U256;
26use subspace_runtime_primitives::ExtrinsicFor;
27
28/// If the bundle utilization is below `BUNDLE_UTILIZATION_THRESHOLD` we will attempt to push
29/// at most `MAX_SKIPPED_TRANSACTIONS` number of transactions before quitting for real.
30const MAX_SKIPPED_TRANSACTIONS: usize = 8;
31
32const BUNDLE_UTILIZATION_THRESHOLD: Percent = Percent::from_percent(95);
33
34// `PreviousBundledTx` used to keep track of tx that have included in previous bundle and avoid
35// to re-including these transactions in the next bundle if the consensus hash did not change.
36struct PreviousBundledTx<Block: BlockT, CBlock: BlockT> {
37    bundled_at: CBlock::Hash,
38    tx_hashes: HashSet<Block::Hash>,
39}
40
41impl<Block: BlockT, CBlock: BlockT> PreviousBundledTx<Block, CBlock> {
42    fn new() -> Self {
43        PreviousBundledTx {
44            bundled_at: Default::default(),
45            tx_hashes: HashSet::new(),
46        }
47    }
48
49    fn already_bundled(&self, tx_hash: &Block::Hash) -> bool {
50        self.tx_hashes.contains(tx_hash)
51    }
52
53    fn maybe_clear(&mut self, consensus_hash: CBlock::Hash) {
54        if self.bundled_at != consensus_hash {
55            self.bundled_at = consensus_hash;
56            self.tx_hashes.clear();
57        }
58    }
59
60    fn add_bundled(&mut self, tx_hash: Block::Hash) {
61        self.tx_hashes.insert(tx_hash);
62    }
63}
64
65pub struct DomainBundleProposer<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> {
66    domain_id: DomainId,
67    client: Arc<Client>,
68    consensus_client: Arc<CClient>,
69    transaction_pool: Arc<TransactionPool>,
70    previous_bundled_tx: PreviousBundledTx<Block, CBlock>,
71    _phantom_data: PhantomData<(Block, CBlock)>,
72}
73
74impl<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> Clone
75    for DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
76{
77    fn clone(&self) -> Self {
78        Self {
79            domain_id: self.domain_id,
80            client: self.client.clone(),
81            consensus_client: self.consensus_client.clone(),
82            transaction_pool: self.transaction_pool.clone(),
83            previous_bundled_tx: PreviousBundledTx::new(),
84            _phantom_data: self._phantom_data,
85        }
86    }
87}
88
89pub(super) type ProposeBundleOutput<Block, CBlock> =
90    (BundleHeaderFor<Block, CBlock>, Vec<ExtrinsicFor<Block>>);
91
92impl<Block, Client, CBlock, CClient, TransactionPool>
93    DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
94where
95    Block: BlockT,
96    CBlock: BlockT,
97    NumberFor<Block>: Into<NumberFor<CBlock>>,
98    Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
99    Client::Api: BlockBuilder<Block>
100        + DomainCoreApi<Block>
101        + TaggedTransactionQueue<Block>
102        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
103    CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
104    CClient::Api: DomainsApi<CBlock, Block::Header>,
105    TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
106{
107    pub fn new(
108        domain_id: DomainId,
109        client: Arc<Client>,
110        consensus_client: Arc<CClient>,
111        transaction_pool: Arc<TransactionPool>,
112    ) -> Self {
113        Self {
114            domain_id,
115            client,
116            consensus_client,
117            transaction_pool,
118            previous_bundled_tx: PreviousBundledTx::new(),
119            _phantom_data: PhantomData,
120        }
121    }
122
123    pub(crate) async fn propose_bundle_at(
124        &mut self,
125        proof_of_election: ProofOfElection,
126        tx_range: U256,
127        operator_id: OperatorId,
128        receipt: ExecutionReceiptFor<Block, CBlock>,
129        bundle_version: BundleVersion,
130    ) -> sp_blockchain::Result<ProposeBundleOutput<Block, CBlock>> {
131        // NOTE: use the domain block that derive the ER to validate the extrinsic to be included
132        // in the bundle, so the validity of the extrinsic is committed to the ER that submited together.
133        let (parent_number, parent_hash) =
134            (*receipt.domain_block_number(), *receipt.domain_block_hash());
135        let consensus_best_hash = self.consensus_client.info().best_hash;
136
137        let mut t1 = self.transaction_pool.ready_at(parent_hash).fuse();
138        // TODO: proper timeout
139        let mut t2 = futures_timer::Delay::new(time::Duration::from_micros(100)).fuse();
140
141        let pending_iterator = select! {
142            res = t1 => res,
143            _ = t2 => {
144                tracing::warn!(
145                    "Timeout fired waiting for transaction pool at #{parent_number},{parent_hash}, \
146                    proceeding with bundle production."
147                );
148                self.transaction_pool.ready()
149            }
150        };
151
152        // Clear the previous bundled tx info whenever the consensus chain tip is changed,
153        // this allow the operator to retry for the previous bundled tx in case the previous
154        // bundle fail to submit to the consensus chain due to any reason.
155        self.previous_bundled_tx.maybe_clear(consensus_best_hash);
156
157        let (domain_bundle_limit, storage_fund_balance, transaction_byte_fee) = {
158            let consensus_runtime_api = self.consensus_client.runtime_api();
159
160            let domain_bundle_limit = consensus_runtime_api
161                .domain_bundle_limit(consensus_best_hash, self.domain_id)?
162                .ok_or_else(|| {
163                    sp_blockchain::Error::Application(
164                        format!("Domain bundle limit for {:?} not found", self.domain_id).into(),
165                    )
166                })?;
167
168            let storage_fund_balance = consensus_runtime_api
169                .storage_fund_account_balance(consensus_best_hash, operator_id)?;
170
171            let transaction_byte_fee =
172                consensus_runtime_api.consensus_transaction_byte_fee(consensus_best_hash)?;
173
174            (
175                domain_bundle_limit,
176                storage_fund_balance,
177                transaction_byte_fee,
178            )
179        };
180
181        let bundle_vrf_hash = U256::from_be_bytes(*proof_of_election.vrf_hash());
182
183        let header_size = receipt.encoded_size()
184            + proof_of_election.encoded_size()
185            + domain_bundle_limit.max_bundle_weight.encoded_size()
186            // Extrinsics root size
187            + 32
188            // Header signature size
189            + 64;
190
191        let mut extrinsics = Vec::new();
192        let mut estimated_bundle_weight = Weight::default();
193        let mut bundle_size = 0u32;
194        let mut skipped = 0;
195
196        // Separate code block to make sure that runtime api instance is dropped after validation is done.
197        {
198            // We are using one runtime api instance here to maintain storage changes in the instance's internal buffer
199            // between runtime calls done in this loop.
200            let runtime_api_instance = self.client.runtime_api();
201            for pending_tx in pending_iterator {
202                let pending_tx_data = pending_tx.data();
203
204                let is_within_tx_range = runtime_api_instance
205                    .is_within_tx_range(parent_hash, pending_tx_data, &bundle_vrf_hash, &tx_range)
206                    .map_err(|err| {
207                        tracing::error!(
208                            ?err,
209                            ?pending_tx_data,
210                            "Error occurred in locating the tx range"
211                        );
212                    })
213                    .unwrap_or(false);
214                if !is_within_tx_range {
215                    continue;
216                }
217
218                // Skip the tx if it is already bundled by a recent bundle
219                if self
220                    .previous_bundled_tx
221                    .already_bundled(&self.transaction_pool.hash_of(pending_tx_data))
222                {
223                    continue;
224                }
225
226                let tx_weight = runtime_api_instance
227                    .extrinsic_weight(parent_hash, pending_tx_data)
228                    .map_err(|error| {
229                        sp_blockchain::Error::Application(Box::from(format!(
230                            "Error getting extrinsic weight: {error}"
231                        )))
232                    })?;
233                let next_estimated_bundle_weight =
234                    estimated_bundle_weight.saturating_add(tx_weight);
235                if next_estimated_bundle_weight.any_gt(domain_bundle_limit.max_bundle_weight) {
236                    if skipped < MAX_SKIPPED_TRANSACTIONS
237                        && Percent::from_rational(
238                            estimated_bundle_weight.ref_time(),
239                            domain_bundle_limit.max_bundle_weight.ref_time(),
240                        ) < BUNDLE_UTILIZATION_THRESHOLD
241                    {
242                        skipped += 1;
243                        continue;
244                    } else {
245                        break;
246                    }
247                }
248
249                let next_bundle_size = bundle_size + pending_tx_data.encoded_size() as u32;
250                if next_bundle_size > domain_bundle_limit.max_bundle_size {
251                    if skipped < MAX_SKIPPED_TRANSACTIONS
252                        && Percent::from_rational(bundle_size, domain_bundle_limit.max_bundle_size)
253                            < BUNDLE_UTILIZATION_THRESHOLD
254                    {
255                        skipped += 1;
256                        continue;
257                    } else {
258                        break;
259                    }
260                }
261
262                let next_bundle_storage_fee =
263                    (header_size as u32 + next_bundle_size) as u128 * transaction_byte_fee;
264                if next_bundle_storage_fee > storage_fund_balance {
265                    tracing::warn!(
266                        ?next_bundle_storage_fee,
267                        ?storage_fund_balance,
268                        "Insufficient storage fund balance to pay for the bundle storage fee"
269                    );
270                    break;
271                }
272
273                // Double check XDM before adding it to the bundle
274                if let Some(false) =
275                    runtime_api_instance.is_xdm_mmr_proof_valid(parent_hash, pending_tx_data)?
276                {
277                    continue;
278                }
279
280                // Double check the transaction validity, because the tx pool are re-validate the transaction
281                // in pool asynchronously so there is race condition that the operator imported a domain block
282                // and start producing bundle immediately before the re-validation based on the latest block
283                // is finished, cause the bundle contains illegal tx accidentally and being considered as invalid
284                // bundle and slashing on the honest operator.
285                //
286                // This check is done in similar fashion to block builder's build block.
287                // This check needs to be the last check as otherwise if the tx won't be part of bundle due to
288                // some other checks, its side effect will still be part of RuntimeApiImpl's changes buffer.
289                let transaction_validity_result =
290                    runtime_api_instance.execute_in_transaction(|api| {
291                        let transaction_validity_result = api.check_extrinsics_and_do_pre_dispatch(
292                            parent_hash,
293                            // Ideally, we should pass the whole `extrinsics` to keep consistency with ER derivation
294                            // and FP verification but it will be constly, so instead we do another final check that
295                            // pass the whole `extrinsics` to `check_extrinsics_and_do_pre_dispatch` before returning
296                            // the `extrinsics` to construct bundle.
297                            vec![pending_tx_data.as_ref().clone()],
298                            parent_number,
299                            parent_hash,
300                        );
301                        // Only commit, if there are no errors (both ApiError and CheckTxValidityError)
302                        if let Ok(Ok(_)) = transaction_validity_result {
303                            sp_api::TransactionOutcome::Commit(transaction_validity_result)
304                        } else {
305                            sp_api::TransactionOutcome::Rollback(transaction_validity_result)
306                        }
307                    })?;
308                if transaction_validity_result.is_err() {
309                    continue;
310                }
311
312                estimated_bundle_weight = next_estimated_bundle_weight;
313                bundle_size = next_bundle_size;
314                extrinsics.push(pending_tx_data.as_ref().clone());
315
316                self.previous_bundled_tx
317                    .add_bundled(self.transaction_pool.hash_of(pending_tx_data));
318            }
319        }
320
321        // As a final check, call `check_extrinsics_and_do_pre_dispatch` with all extrinsics,
322        // this is consistent with ER derivation and FP verification
323        if let Err(CheckExtrinsicsValidityError {
324            extrinsic_index,
325            transaction_validity_error,
326        }) = self
327            .client
328            .runtime_api()
329            .check_extrinsics_and_do_pre_dispatch(
330                parent_hash,
331                extrinsics.clone(),
332                parent_number,
333                parent_hash,
334            )?
335        {
336            tracing::warn!(
337                "Unexpected error when validating all the extrinsics at once: {transaction_validity_error:?}"
338            );
339
340            // Truncate to remove the invalid extrinsic (and any extrinsic after it), so only
341            // the valid exrinsic will be used to construct bundle.
342            extrinsics.truncate(extrinsic_index as usize);
343        }
344
345        let extrinsics_root = HeaderHashingFor::<Block::Header>::ordered_trie_root(
346            extrinsics.iter().map(|xt| xt.encode()).collect(),
347            sp_core::storage::StateVersion::V1,
348        );
349
350        let header = match bundle_version {
351            BundleVersion::V0 => BundleHeaderFor::V0(BundleHeaderV0 {
352                proof_of_election,
353                receipt,
354                estimated_bundle_weight,
355                bundle_extrinsics_root: extrinsics_root,
356            }),
357        };
358
359        Ok((header, extrinsics))
360    }
361
362    /// Returns the receipt in the next domain bundle.
363    pub fn load_next_receipt(
364        &self,
365        domain_best_number_onchain: NumberFor<Block>,
366        head_receipt_number: NumberFor<Block>,
367        execution_receipt_version: ExecutionReceiptVersion,
368    ) -> sp_blockchain::Result<ExecutionReceiptFor<Block, CBlock>> {
369        tracing::trace!(
370            ?domain_best_number_onchain,
371            ?head_receipt_number,
372            "Collecting receipt"
373        );
374
375        // Both `domain_best_number_onchain` and `head_receipt_number` are zero means the domain just
376        // instantiated and nothing have submitted yet so submit the genesis receipt
377        if domain_best_number_onchain.is_zero() && head_receipt_number.is_zero() {
378            let genesis_hash = self.client.info().genesis_hash;
379            let genesis_header = self.client.header(genesis_hash)?.ok_or_else(|| {
380                sp_blockchain::Error::Backend(format!(
381                    "Domain block header for #{genesis_hash:?} not found",
382                ))
383            })?;
384
385            return Ok(ExecutionReceiptFor::<Block, CBlock>::genesis(
386                *genesis_header.state_root(),
387                *genesis_header.extrinsics_root(),
388                genesis_hash,
389                execution_receipt_version,
390            ));
391        }
392
393        // The next receipt must extend the current head receipt
394        let receipt_number = head_receipt_number + One::one();
395
396        // Get the domain block hash corresponding to `receipt_number` in the domain canonical chain
397        let domain_hash = self.client.hash(receipt_number)?.ok_or_else(|| {
398            sp_blockchain::Error::Backend(format!(
399                "Domain block hash for #{receipt_number:?} not found"
400            ))
401        })?;
402
403        crate::load_execution_receipt_by_domain_hash::<Block, CBlock, _>(
404            &*self.client,
405            domain_hash,
406            receipt_number,
407        )
408    }
409}