1use crate::ExecutionReceiptFor;
2use domain_runtime_primitives::CheckExtrinsicsValidityError;
3use futures::{select, FutureExt};
4use parity_scale_codec::Encode;
5use sc_client_api::{AuxStore, BlockBackend};
6use sc_transaction_pool_api::InPoolTransaction;
7use sp_api::{ApiExt, ProvideRuntimeApi};
8use sp_block_builder::BlockBuilder;
9use sp_blockchain::HeaderBackend;
10use sp_domains::core_api::DomainCoreApi;
11use sp_domains::{
12 BundleHeader, DomainId, DomainsApi, ExecutionReceipt, HeaderHashingFor, OperatorId,
13 ProofOfElection,
14};
15use sp_messenger::MessengerApi;
16use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor, One, Zero};
17use sp_runtime::Percent;
18use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
19use sp_weights::Weight;
20use std::collections::HashSet;
21use std::marker::PhantomData;
22use std::sync::Arc;
23use std::time;
24use subspace_core_primitives::U256;
25use subspace_runtime_primitives::{Balance, BlockHashFor, ExtrinsicFor, HeaderFor};
26
27const MAX_SKIPPED_TRANSACTIONS: usize = 8;
30
31const BUNDLE_UTILIZATION_THRESHOLD: Percent = Percent::from_percent(95);
32
33struct PreviousBundledTx<Block: BlockT, CBlock: BlockT> {
36 bundled_at: CBlock::Hash,
37 tx_hashes: HashSet<Block::Hash>,
38}
39
40impl<Block: BlockT, CBlock: BlockT> PreviousBundledTx<Block, CBlock> {
41 fn new() -> Self {
42 PreviousBundledTx {
43 bundled_at: Default::default(),
44 tx_hashes: HashSet::new(),
45 }
46 }
47
48 fn already_bundled(&self, tx_hash: &Block::Hash) -> bool {
49 self.tx_hashes.contains(tx_hash)
50 }
51
52 fn maybe_clear(&mut self, consensus_hash: CBlock::Hash) {
53 if self.bundled_at != consensus_hash {
54 self.bundled_at = consensus_hash;
55 self.tx_hashes.clear();
56 }
57 }
58
59 fn add_bundled(&mut self, tx_hash: Block::Hash) {
60 self.tx_hashes.insert(tx_hash);
61 }
62}
63
64pub struct DomainBundleProposer<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> {
65 domain_id: DomainId,
66 client: Arc<Client>,
67 consensus_client: Arc<CClient>,
68 transaction_pool: Arc<TransactionPool>,
69 previous_bundled_tx: PreviousBundledTx<Block, CBlock>,
70 _phantom_data: PhantomData<(Block, CBlock)>,
71}
72
73impl<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> Clone
74 for DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
75{
76 fn clone(&self) -> Self {
77 Self {
78 domain_id: self.domain_id,
79 client: self.client.clone(),
80 consensus_client: self.consensus_client.clone(),
81 transaction_pool: self.transaction_pool.clone(),
82 previous_bundled_tx: PreviousBundledTx::new(),
83 _phantom_data: self._phantom_data,
84 }
85 }
86}
87
88pub(super) type ProposeBundleOutput<Block, CBlock> = (
89 BundleHeader<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>,
90 Vec<ExtrinsicFor<Block>>,
91);
92
93impl<Block, Client, CBlock, CClient, TransactionPool>
94 DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
95where
96 Block: BlockT,
97 CBlock: BlockT,
98 NumberFor<Block>: Into<NumberFor<CBlock>>,
99 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
100 Client::Api: BlockBuilder<Block>
101 + DomainCoreApi<Block>
102 + TaggedTransactionQueue<Block>
103 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
104 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
105 CClient::Api: DomainsApi<CBlock, Block::Header>,
106 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
107{
108 pub fn new(
109 domain_id: DomainId,
110 client: Arc<Client>,
111 consensus_client: Arc<CClient>,
112 transaction_pool: Arc<TransactionPool>,
113 ) -> Self {
114 Self {
115 domain_id,
116 client,
117 consensus_client,
118 transaction_pool,
119 previous_bundled_tx: PreviousBundledTx::new(),
120 _phantom_data: PhantomData,
121 }
122 }
123
124 pub(crate) async fn propose_bundle_at(
125 &mut self,
126 proof_of_election: ProofOfElection,
127 tx_range: U256,
128 operator_id: OperatorId,
129 receipt: ExecutionReceiptFor<Block, CBlock>,
130 ) -> sp_blockchain::Result<ProposeBundleOutput<Block, CBlock>> {
131 let (parent_number, parent_hash) = (receipt.domain_block_number, receipt.domain_block_hash);
134 let consensus_best_hash = self.consensus_client.info().best_hash;
135
136 let mut t1 = self.transaction_pool.ready_at(parent_hash).fuse();
137 let mut t2 = futures_timer::Delay::new(time::Duration::from_micros(100)).fuse();
139
140 let pending_iterator = select! {
141 res = t1 => res,
142 _ = t2 => {
143 tracing::warn!(
144 "Timeout fired waiting for transaction pool at #{parent_number},{parent_hash}, \
145 proceeding with bundle production."
146 );
147 self.transaction_pool.ready()
148 }
149 };
150
151 self.previous_bundled_tx.maybe_clear(consensus_best_hash);
155
156 let (domain_bundle_limit, storage_fund_balance, transaction_byte_fee) = {
157 let consensus_runtime_api = self.consensus_client.runtime_api();
158 let domains_api_version = consensus_runtime_api
162 .api_version::<dyn DomainsApi<CBlock, CBlock::Header>>(consensus_best_hash)?
163 .unwrap_or(1);
165
166 let domain_bundle_limit = consensus_runtime_api
167 .domain_bundle_limit(consensus_best_hash, self.domain_id)?
168 .ok_or_else(|| {
169 sp_blockchain::Error::Application(
170 format!("Domain bundle limit for {:?} not found", self.domain_id).into(),
171 )
172 })?;
173
174 let storage_fund_balance = consensus_runtime_api
175 .storage_fund_account_balance(consensus_best_hash, operator_id)?;
176
177 let transaction_byte_fee = if domains_api_version >= 3 {
178 consensus_runtime_api.consensus_transaction_byte_fee(consensus_best_hash)?
179 } else {
180 #[allow(deprecated)]
181 consensus_runtime_api.consensus_chain_byte_fee(consensus_best_hash)?
182 };
183
184 (
185 domain_bundle_limit,
186 storage_fund_balance,
187 transaction_byte_fee,
188 )
189 };
190
191 let bundle_vrf_hash = U256::from_be_bytes(*proof_of_election.vrf_hash());
192
193 let header_size = receipt.encoded_size()
194 + proof_of_election.encoded_size()
195 + domain_bundle_limit.max_bundle_weight.encoded_size()
196 + 32
198 + 64;
200
201 let mut extrinsics = Vec::new();
202 let mut estimated_bundle_weight = Weight::default();
203 let mut bundle_size = 0u32;
204 let mut skipped = 0;
205
206 {
208 let runtime_api_instance = self.client.runtime_api();
211 for pending_tx in pending_iterator {
212 let pending_tx_data = pending_tx.data();
213
214 let is_within_tx_range = runtime_api_instance
215 .is_within_tx_range(parent_hash, pending_tx_data, &bundle_vrf_hash, &tx_range)
216 .map_err(|err| {
217 tracing::error!(
218 ?err,
219 ?pending_tx_data,
220 "Error occurred in locating the tx range"
221 );
222 })
223 .unwrap_or(false);
224 if !is_within_tx_range {
225 continue;
226 }
227
228 if self
230 .previous_bundled_tx
231 .already_bundled(&self.transaction_pool.hash_of(pending_tx_data))
232 {
233 continue;
234 }
235
236 let tx_weight = runtime_api_instance
237 .extrinsic_weight(parent_hash, pending_tx_data)
238 .map_err(|error| {
239 sp_blockchain::Error::Application(Box::from(format!(
240 "Error getting extrinsic weight: {error}"
241 )))
242 })?;
243 let next_estimated_bundle_weight =
244 estimated_bundle_weight.saturating_add(tx_weight);
245 if next_estimated_bundle_weight.any_gt(domain_bundle_limit.max_bundle_weight) {
246 if skipped < MAX_SKIPPED_TRANSACTIONS
247 && Percent::from_rational(
248 estimated_bundle_weight.ref_time(),
249 domain_bundle_limit.max_bundle_weight.ref_time(),
250 ) < BUNDLE_UTILIZATION_THRESHOLD
251 {
252 skipped += 1;
253 continue;
254 } else {
255 break;
256 }
257 }
258
259 let next_bundle_size = bundle_size + pending_tx_data.encoded_size() as u32;
260 if next_bundle_size > domain_bundle_limit.max_bundle_size {
261 if skipped < MAX_SKIPPED_TRANSACTIONS
262 && Percent::from_rational(bundle_size, domain_bundle_limit.max_bundle_size)
263 < BUNDLE_UTILIZATION_THRESHOLD
264 {
265 skipped += 1;
266 continue;
267 } else {
268 break;
269 }
270 }
271
272 let next_bundle_storage_fee =
273 (header_size as u32 + next_bundle_size) as u128 * transaction_byte_fee;
274 if next_bundle_storage_fee > storage_fund_balance {
275 tracing::warn!(
276 ?next_bundle_storage_fee,
277 ?storage_fund_balance,
278 "Insufficient storage fund balance to pay for the bundle storage fee"
279 );
280 break;
281 }
282
283 if let Some(false) =
285 runtime_api_instance.is_xdm_mmr_proof_valid(parent_hash, pending_tx_data)?
286 {
287 continue;
288 }
289
290 let transaction_validity_result =
300 runtime_api_instance.execute_in_transaction(|api| {
301 let transaction_validity_result = api.check_extrinsics_and_do_pre_dispatch(
302 parent_hash,
303 vec![pending_tx_data.as_ref().clone()],
308 parent_number,
309 parent_hash,
310 );
311 if let Ok(Ok(_)) = transaction_validity_result {
313 sp_api::TransactionOutcome::Commit(transaction_validity_result)
314 } else {
315 sp_api::TransactionOutcome::Rollback(transaction_validity_result)
316 }
317 })?;
318 if transaction_validity_result.is_err() {
319 continue;
320 }
321
322 estimated_bundle_weight = next_estimated_bundle_weight;
323 bundle_size = next_bundle_size;
324 extrinsics.push(pending_tx_data.as_ref().clone());
325
326 self.previous_bundled_tx
327 .add_bundled(self.transaction_pool.hash_of(pending_tx_data));
328 }
329 }
330
331 if let Err(CheckExtrinsicsValidityError {
334 extrinsic_index,
335 transaction_validity_error,
336 }) = self
337 .client
338 .runtime_api()
339 .check_extrinsics_and_do_pre_dispatch(
340 parent_hash,
341 extrinsics.clone(),
342 parent_number,
343 parent_hash,
344 )?
345 {
346 tracing::warn!("Unexpected error when validating all the extrinsics at once: {transaction_validity_error:?}");
347
348 extrinsics.truncate(extrinsic_index as usize);
351 }
352
353 let extrinsics_root = HeaderHashingFor::<Block::Header>::ordered_trie_root(
354 extrinsics.iter().map(|xt| xt.encode()).collect(),
355 sp_core::storage::StateVersion::V1,
356 );
357
358 let header = BundleHeader {
359 proof_of_election,
360 receipt,
361 estimated_bundle_weight,
362 bundle_extrinsics_root: extrinsics_root,
363 };
364
365 Ok((header, extrinsics))
366 }
367
368 pub fn load_next_receipt(
370 &self,
371 domain_best_number_onchain: NumberFor<Block>,
372 head_receipt_number: NumberFor<Block>,
373 ) -> sp_blockchain::Result<ExecutionReceiptFor<Block, CBlock>> {
374 tracing::trace!(
375 ?domain_best_number_onchain,
376 ?head_receipt_number,
377 "Collecting receipt"
378 );
379
380 if domain_best_number_onchain.is_zero() && head_receipt_number.is_zero() {
383 let genesis_hash = self.client.info().genesis_hash;
384 let genesis_header = self.client.header(genesis_hash)?.ok_or_else(|| {
385 sp_blockchain::Error::Backend(format!(
386 "Domain block header for #{genesis_hash:?} not found",
387 ))
388 })?;
389
390 return Ok(ExecutionReceipt::genesis(
391 *genesis_header.state_root(),
392 *genesis_header.extrinsics_root(),
393 genesis_hash,
394 ));
395 }
396
397 let receipt_number = head_receipt_number + One::one();
399
400 let domain_hash = self.client.hash(receipt_number)?.ok_or_else(|| {
402 sp_blockchain::Error::Backend(format!(
403 "Domain block hash for #{receipt_number:?} not found"
404 ))
405 })?;
406
407 crate::load_execution_receipt_by_domain_hash::<Block, CBlock, _>(
408 &*self.client,
409 domain_hash,
410 receipt_number,
411 )
412 }
413}