1use crate::ExecutionReceiptFor;
2use crate::domain_bundle_producer::BundleHeaderFor;
3use domain_runtime_primitives::CheckExtrinsicsValidityError;
4use futures::{FutureExt, select};
5use parity_scale_codec::Encode;
6use sc_client_api::{AuxStore, BlockBackend};
7use sc_transaction_pool_api::InPoolTransaction;
8use sp_api::{ApiExt, ProvideRuntimeApi};
9use sp_block_builder::BlockBuilder;
10use sp_blockchain::HeaderBackend;
11use sp_domains::bundle::BundleVersion;
12use sp_domains::bundle::bundle_v0::BundleHeaderV0;
13use sp_domains::core_api::DomainCoreApi;
14use sp_domains::execution_receipt::ExecutionReceiptVersion;
15use sp_domains::{DomainId, DomainsApi, HeaderHashingFor, OperatorId, ProofOfElection};
16use sp_messenger::MessengerApi;
17use sp_runtime::Percent;
18use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor, One, Zero};
19use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
20use sp_weights::Weight;
21use std::collections::HashSet;
22use std::marker::PhantomData;
23use std::sync::Arc;
24use std::time;
25use subspace_core_primitives::U256;
26use subspace_runtime_primitives::ExtrinsicFor;
27
28const MAX_SKIPPED_TRANSACTIONS: usize = 8;
31
32const BUNDLE_UTILIZATION_THRESHOLD: Percent = Percent::from_percent(95);
33
34struct PreviousBundledTx<Block: BlockT, CBlock: BlockT> {
37 bundled_at: CBlock::Hash,
38 tx_hashes: HashSet<Block::Hash>,
39}
40
41impl<Block: BlockT, CBlock: BlockT> PreviousBundledTx<Block, CBlock> {
42 fn new() -> Self {
43 PreviousBundledTx {
44 bundled_at: Default::default(),
45 tx_hashes: HashSet::new(),
46 }
47 }
48
49 fn already_bundled(&self, tx_hash: &Block::Hash) -> bool {
50 self.tx_hashes.contains(tx_hash)
51 }
52
53 fn maybe_clear(&mut self, consensus_hash: CBlock::Hash) {
54 if self.bundled_at != consensus_hash {
55 self.bundled_at = consensus_hash;
56 self.tx_hashes.clear();
57 }
58 }
59
60 fn add_bundled(&mut self, tx_hash: Block::Hash) {
61 self.tx_hashes.insert(tx_hash);
62 }
63}
64
65pub struct DomainBundleProposer<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> {
66 domain_id: DomainId,
67 client: Arc<Client>,
68 consensus_client: Arc<CClient>,
69 transaction_pool: Arc<TransactionPool>,
70 previous_bundled_tx: PreviousBundledTx<Block, CBlock>,
71 _phantom_data: PhantomData<(Block, CBlock)>,
72}
73
74impl<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> Clone
75 for DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
76{
77 fn clone(&self) -> Self {
78 Self {
79 domain_id: self.domain_id,
80 client: self.client.clone(),
81 consensus_client: self.consensus_client.clone(),
82 transaction_pool: self.transaction_pool.clone(),
83 previous_bundled_tx: PreviousBundledTx::new(),
84 _phantom_data: self._phantom_data,
85 }
86 }
87}
88
89pub(super) type ProposeBundleOutput<Block, CBlock> =
90 (BundleHeaderFor<Block, CBlock>, Vec<ExtrinsicFor<Block>>);
91
92impl<Block, Client, CBlock, CClient, TransactionPool>
93 DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
94where
95 Block: BlockT,
96 CBlock: BlockT,
97 NumberFor<Block>: Into<NumberFor<CBlock>>,
98 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
99 Client::Api: BlockBuilder<Block>
100 + DomainCoreApi<Block>
101 + TaggedTransactionQueue<Block>
102 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
103 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
104 CClient::Api: DomainsApi<CBlock, Block::Header>,
105 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
106{
107 pub fn new(
108 domain_id: DomainId,
109 client: Arc<Client>,
110 consensus_client: Arc<CClient>,
111 transaction_pool: Arc<TransactionPool>,
112 ) -> Self {
113 Self {
114 domain_id,
115 client,
116 consensus_client,
117 transaction_pool,
118 previous_bundled_tx: PreviousBundledTx::new(),
119 _phantom_data: PhantomData,
120 }
121 }
122
123 pub(crate) async fn propose_bundle_at(
124 &mut self,
125 proof_of_election: ProofOfElection,
126 tx_range: U256,
127 operator_id: OperatorId,
128 receipt: ExecutionReceiptFor<Block, CBlock>,
129 bundle_version: BundleVersion,
130 ) -> sp_blockchain::Result<ProposeBundleOutput<Block, CBlock>> {
131 let (parent_number, parent_hash) =
134 (*receipt.domain_block_number(), *receipt.domain_block_hash());
135 let consensus_best_hash = self.consensus_client.info().best_hash;
136
137 let mut t1 = self.transaction_pool.ready_at(parent_hash).fuse();
138 let mut t2 = futures_timer::Delay::new(time::Duration::from_micros(100)).fuse();
140
141 let pending_iterator = select! {
142 res = t1 => res,
143 _ = t2 => {
144 tracing::warn!(
145 "Timeout fired waiting for transaction pool at #{parent_number},{parent_hash}, \
146 proceeding with bundle production."
147 );
148 self.transaction_pool.ready()
149 }
150 };
151
152 self.previous_bundled_tx.maybe_clear(consensus_best_hash);
156
157 let (domain_bundle_limit, storage_fund_balance, transaction_byte_fee) = {
158 let consensus_runtime_api = self.consensus_client.runtime_api();
159
160 let domain_bundle_limit = consensus_runtime_api
161 .domain_bundle_limit(consensus_best_hash, self.domain_id)?
162 .ok_or_else(|| {
163 sp_blockchain::Error::Application(
164 format!("Domain bundle limit for {:?} not found", self.domain_id).into(),
165 )
166 })?;
167
168 let storage_fund_balance = consensus_runtime_api
169 .storage_fund_account_balance(consensus_best_hash, operator_id)?;
170
171 let transaction_byte_fee =
172 consensus_runtime_api.consensus_transaction_byte_fee(consensus_best_hash)?;
173
174 (
175 domain_bundle_limit,
176 storage_fund_balance,
177 transaction_byte_fee,
178 )
179 };
180
181 let bundle_vrf_hash = U256::from_be_bytes(*proof_of_election.vrf_hash());
182
183 let header_size = receipt.encoded_size()
184 + proof_of_election.encoded_size()
185 + domain_bundle_limit.max_bundle_weight.encoded_size()
186 + 32
188 + 64;
190
191 let mut extrinsics = Vec::new();
192 let mut estimated_bundle_weight = Weight::default();
193 let mut bundle_size = 0u32;
194 let mut skipped = 0;
195
196 {
198 let runtime_api_instance = self.client.runtime_api();
201 for pending_tx in pending_iterator {
202 let pending_tx_data = pending_tx.data();
203
204 let is_within_tx_range = runtime_api_instance
205 .is_within_tx_range(parent_hash, pending_tx_data, &bundle_vrf_hash, &tx_range)
206 .map_err(|err| {
207 tracing::error!(
208 ?err,
209 ?pending_tx_data,
210 "Error occurred in locating the tx range"
211 );
212 })
213 .unwrap_or(false);
214 if !is_within_tx_range {
215 continue;
216 }
217
218 if self
220 .previous_bundled_tx
221 .already_bundled(&self.transaction_pool.hash_of(pending_tx_data))
222 {
223 continue;
224 }
225
226 let tx_weight = runtime_api_instance
227 .extrinsic_weight(parent_hash, pending_tx_data)
228 .map_err(|error| {
229 sp_blockchain::Error::Application(Box::from(format!(
230 "Error getting extrinsic weight: {error}"
231 )))
232 })?;
233 let next_estimated_bundle_weight =
234 estimated_bundle_weight.saturating_add(tx_weight);
235 if next_estimated_bundle_weight.any_gt(domain_bundle_limit.max_bundle_weight) {
236 if skipped < MAX_SKIPPED_TRANSACTIONS
237 && Percent::from_rational(
238 estimated_bundle_weight.ref_time(),
239 domain_bundle_limit.max_bundle_weight.ref_time(),
240 ) < BUNDLE_UTILIZATION_THRESHOLD
241 {
242 skipped += 1;
243 continue;
244 } else {
245 break;
246 }
247 }
248
249 let next_bundle_size = bundle_size + pending_tx_data.encoded_size() as u32;
250 if next_bundle_size > domain_bundle_limit.max_bundle_size {
251 if skipped < MAX_SKIPPED_TRANSACTIONS
252 && Percent::from_rational(bundle_size, domain_bundle_limit.max_bundle_size)
253 < BUNDLE_UTILIZATION_THRESHOLD
254 {
255 skipped += 1;
256 continue;
257 } else {
258 break;
259 }
260 }
261
262 let next_bundle_storage_fee =
263 (header_size as u32 + next_bundle_size) as u128 * transaction_byte_fee;
264 if next_bundle_storage_fee > storage_fund_balance {
265 tracing::warn!(
266 ?next_bundle_storage_fee,
267 ?storage_fund_balance,
268 "Insufficient storage fund balance to pay for the bundle storage fee"
269 );
270 break;
271 }
272
273 if let Some(false) =
275 runtime_api_instance.is_xdm_mmr_proof_valid(parent_hash, pending_tx_data)?
276 {
277 continue;
278 }
279
280 let transaction_validity_result =
290 runtime_api_instance.execute_in_transaction(|api| {
291 let transaction_validity_result = api.check_extrinsics_and_do_pre_dispatch(
292 parent_hash,
293 vec![pending_tx_data.as_ref().clone()],
298 parent_number,
299 parent_hash,
300 );
301 if let Ok(Ok(_)) = transaction_validity_result {
303 sp_api::TransactionOutcome::Commit(transaction_validity_result)
304 } else {
305 sp_api::TransactionOutcome::Rollback(transaction_validity_result)
306 }
307 })?;
308 if transaction_validity_result.is_err() {
309 continue;
310 }
311
312 estimated_bundle_weight = next_estimated_bundle_weight;
313 bundle_size = next_bundle_size;
314 extrinsics.push(pending_tx_data.as_ref().clone());
315
316 self.previous_bundled_tx
317 .add_bundled(self.transaction_pool.hash_of(pending_tx_data));
318 }
319 }
320
321 if let Err(CheckExtrinsicsValidityError {
324 extrinsic_index,
325 transaction_validity_error,
326 }) = self
327 .client
328 .runtime_api()
329 .check_extrinsics_and_do_pre_dispatch(
330 parent_hash,
331 extrinsics.clone(),
332 parent_number,
333 parent_hash,
334 )?
335 {
336 tracing::warn!(
337 "Unexpected error when validating all the extrinsics at once: {transaction_validity_error:?}"
338 );
339
340 extrinsics.truncate(extrinsic_index as usize);
343 }
344
345 let extrinsics_root = HeaderHashingFor::<Block::Header>::ordered_trie_root(
346 extrinsics.iter().map(|xt| xt.encode()).collect(),
347 sp_core::storage::StateVersion::V1,
348 );
349
350 let header = match bundle_version {
351 BundleVersion::V0 => BundleHeaderFor::V0(BundleHeaderV0 {
352 proof_of_election,
353 receipt,
354 estimated_bundle_weight,
355 bundle_extrinsics_root: extrinsics_root,
356 }),
357 };
358
359 Ok((header, extrinsics))
360 }
361
362 pub fn load_next_receipt(
364 &self,
365 domain_best_number_onchain: NumberFor<Block>,
366 head_receipt_number: NumberFor<Block>,
367 execution_receipt_version: ExecutionReceiptVersion,
368 ) -> sp_blockchain::Result<ExecutionReceiptFor<Block, CBlock>> {
369 tracing::trace!(
370 ?domain_best_number_onchain,
371 ?head_receipt_number,
372 "Collecting receipt"
373 );
374
375 if domain_best_number_onchain.is_zero() && head_receipt_number.is_zero() {
378 let genesis_hash = self.client.info().genesis_hash;
379 let genesis_header = self.client.header(genesis_hash)?.ok_or_else(|| {
380 sp_blockchain::Error::Backend(format!(
381 "Domain block header for #{genesis_hash:?} not found",
382 ))
383 })?;
384
385 return Ok(ExecutionReceiptFor::<Block, CBlock>::genesis(
386 *genesis_header.state_root(),
387 *genesis_header.extrinsics_root(),
388 genesis_hash,
389 execution_receipt_version,
390 ));
391 }
392
393 let receipt_number = head_receipt_number + One::one();
395
396 let domain_hash = self.client.hash(receipt_number)?.ok_or_else(|| {
398 sp_blockchain::Error::Backend(format!(
399 "Domain block hash for #{receipt_number:?} not found"
400 ))
401 })?;
402
403 crate::load_execution_receipt_by_domain_hash::<Block, CBlock, _>(
404 &*self.client,
405 domain_hash,
406 receipt_number,
407 )
408 }
409}