1use crate::domain_block_processor::{
2 DomainBlockProcessor, PendingConsensusBlocks, ReceiptsChecker,
3};
4use crate::ExecutionReceiptFor;
5use domain_block_preprocessor::DomainBlockPreprocessor;
6use sc_client_api::{AuxStore, BlockBackend, ExecutorProvider, Finalizer, ProofProvider};
7use sc_consensus::{BlockImportParams, ForkChoiceStrategy, StateAction};
8use sc_executor::RuntimeVersionOf;
9use sp_api::ProvideRuntimeApi;
10use sp_blockchain::{HeaderBackend, HeaderMetadata};
11use sp_consensus::BlockOrigin;
12use sp_core::traits::CodeExecutor;
13use sp_core::H256;
14use sp_domain_digests::AsPredigest;
15use sp_domains::core_api::DomainCoreApi;
16use sp_domains::{DomainId, DomainsApi, ReceiptValidity};
17use sp_domains_fraud_proof::FraudProofApi;
18use sp_messenger::MessengerApi;
19use sp_mmr_primitives::MmrApi;
20use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor, Zero};
21use sp_runtime::{Digest, DigestItem};
22use sp_weights::constants::WEIGHT_REF_TIME_PER_MILLIS;
23use std::sync::Arc;
24use std::time::Instant;
25
26const SLOW_PREPROCESS_MILLIS: u64 = 500;
28
29fn slow_domain_block_execution_threshold(reference_duration_ms: u64) -> u64 {
33 reference_duration_ms + (reference_duration_ms / 5) + 200
34}
35
36const SLOW_POST_DOMAIN_BLOCK_EXECUTION_MILLIS: u64 = 250;
38
39type DomainReceiptsChecker<Block, CBlock, Client, CClient, Backend, E> =
40 ReceiptsChecker<Block, Client, CBlock, CClient, Backend, E>;
41
42pub(crate) struct BundleProcessor<Block, CBlock, Client, CClient, Backend, E>
43where
44 Block: BlockT,
45 CBlock: BlockT,
46{
47 domain_id: DomainId,
48 consensus_client: Arc<CClient>,
49 client: Arc<Client>,
50 backend: Arc<Backend>,
51 domain_receipts_checker: DomainReceiptsChecker<Block, CBlock, Client, CClient, Backend, E>,
52 domain_block_preprocessor: DomainBlockPreprocessor<
53 Block,
54 CBlock,
55 Client,
56 CClient,
57 E,
58 Backend,
59 ReceiptValidator<Client>,
60 >,
61 domain_block_processor: DomainBlockProcessor<Block, CBlock, Client, CClient, Backend, E>,
62 confirmation_depth_k: NumberFor<CBlock>,
63}
64
65impl<Block, CBlock, Client, CClient, Backend, E> Clone
66 for BundleProcessor<Block, CBlock, Client, CClient, Backend, E>
67where
68 Block: BlockT,
69 CBlock: BlockT,
70{
71 fn clone(&self) -> Self {
72 Self {
73 domain_id: self.domain_id,
74 consensus_client: self.consensus_client.clone(),
75 client: self.client.clone(),
76 backend: self.backend.clone(),
77 domain_receipts_checker: self.domain_receipts_checker.clone(),
78 domain_block_preprocessor: self.domain_block_preprocessor.clone(),
79 domain_block_processor: self.domain_block_processor.clone(),
80 confirmation_depth_k: self.confirmation_depth_k,
81 }
82 }
83}
84
85struct ReceiptValidator<Client> {
86 client: Arc<Client>,
87}
88
89impl<Client> Clone for ReceiptValidator<Client> {
90 fn clone(&self) -> Self {
91 Self {
92 client: self.client.clone(),
93 }
94 }
95}
96
97impl<Client> ReceiptValidator<Client> {
98 pub fn new(client: Arc<Client>) -> Self {
99 Self { client }
100 }
101}
102
103impl<Block, CBlock, Client> domain_block_preprocessor::ValidateReceipt<Block, CBlock>
104 for ReceiptValidator<Client>
105where
106 Block: BlockT,
107 CBlock: BlockT,
108 Client: AuxStore,
109{
110 fn validate_receipt(
111 &self,
112 receipt: &ExecutionReceiptFor<Block, CBlock>,
113 ) -> sp_blockchain::Result<ReceiptValidity> {
114 if receipt.domain_block_number.is_zero() {
116 return Ok(ReceiptValidity::Valid);
117 }
118
119 let consensus_block_hash = receipt.consensus_block_hash;
120 let _local_receipt = crate::aux_schema::load_execution_receipt::<_, Block, CBlock>(
121 &*self.client,
122 consensus_block_hash,
123 )?
124 .ok_or_else(|| {
125 sp_blockchain::Error::Backend(format!(
126 "Receipt for consensus block {consensus_block_hash} not found"
127 ))
128 })?;
129
130 Ok(ReceiptValidity::Valid)
131 }
132}
133
134impl<Block, CBlock, Client, CClient, Backend, E>
135 BundleProcessor<Block, CBlock, Client, CClient, Backend, E>
136where
137 Block: BlockT,
138 Block::Hash: Into<H256>,
139 CBlock: BlockT,
140 NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
141 CBlock::Hash: From<Block::Hash>,
142 Client: HeaderBackend<Block>
143 + BlockBackend<Block>
144 + AuxStore
145 + ProvideRuntimeApi<Block>
146 + ProofProvider<Block>
147 + ExecutorProvider<Block>
148 + Finalizer<Block, Backend>
149 + 'static,
150 Client::Api: DomainCoreApi<Block>
151 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
152 + sp_block_builder::BlockBuilder<Block>
153 + sp_api::ApiExt<Block>,
154 CClient: HeaderBackend<CBlock>
155 + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
156 + BlockBackend<CBlock>
157 + ProofProvider<CBlock>
158 + ProvideRuntimeApi<CBlock>
159 + 'static,
160 CClient::Api: DomainsApi<CBlock, Block::Header>
161 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
162 + FraudProofApi<CBlock, Block::Header>
163 + MmrApi<CBlock, H256, NumberFor<CBlock>>
164 + 'static,
165 Backend: sc_client_api::Backend<Block> + 'static,
166 E: CodeExecutor + RuntimeVersionOf,
167{
168 pub(crate) fn new(
169 domain_id: DomainId,
170 consensus_client: Arc<CClient>,
171 client: Arc<Client>,
172 backend: Arc<Backend>,
173 domain_receipts_checker: DomainReceiptsChecker<Block, CBlock, Client, CClient, Backend, E>,
174 domain_block_processor: DomainBlockProcessor<Block, CBlock, Client, CClient, Backend, E>,
175 confirmation_depth_k: NumberFor<CBlock>,
176 ) -> Self {
177 let domain_block_preprocessor = DomainBlockPreprocessor::new(
178 domain_id,
179 client.clone(),
180 consensus_client.clone(),
181 domain_block_processor.domain_executor.clone(),
182 domain_block_processor.backend.clone(),
183 ReceiptValidator::new(client.clone()),
184 );
185 Self {
186 domain_id,
187 consensus_client,
188 client,
189 backend,
190 domain_receipts_checker,
191 domain_block_preprocessor,
192 domain_block_processor,
193 confirmation_depth_k,
194 }
195 }
196
197 pub(crate) async fn process_bundles(
199 self,
200 consensus_block_info: (CBlock::Hash, NumberFor<CBlock>, bool),
201 ) -> sp_blockchain::Result<()> {
202 let (consensus_block_hash, consensus_block_number, is_new_best) = consensus_block_info;
203
204 if !is_new_best {
207 return Ok(());
208 }
209
210 tracing::debug!(
211 "Processing consensus block #{consensus_block_number},{consensus_block_hash}"
212 );
213
214 let maybe_pending_consensus_blocks = self
215 .domain_block_processor
216 .pending_imported_consensus_blocks(consensus_block_hash, consensus_block_number)?;
217
218 if let Some(PendingConsensusBlocks {
219 initial_parent,
220 consensus_imports,
221 }) = maybe_pending_consensus_blocks
222 {
223 tracing::trace!(
224 ?initial_parent,
225 ?consensus_imports,
226 "Pending consensus blocks to process"
227 );
228
229 let mut domain_parent = initial_parent;
230
231 for consensus_info in consensus_imports {
232 if let Some(next_domain_parent) = self
233 .process_bundles_at((consensus_info.hash, consensus_info.number), domain_parent)
234 .await?
235 {
236 domain_parent = next_domain_parent;
237 }
238 }
239
240 let domain_tip = domain_parent.0;
247 if self.client.info().best_hash != domain_tip {
248 let header = self.client.header(domain_tip)?.ok_or_else(|| {
249 sp_blockchain::Error::Backend(format!("Header for #{:?} not found", domain_tip))
250 })?;
251 let block_origin = if self
252 .domain_block_processor
253 .domain_sync_oracle
254 .is_major_syncing()
255 {
256 BlockOrigin::NetworkInitialSync
259 } else {
260 BlockOrigin::Own
261 };
262 let block_import_params = {
263 let mut import_block = BlockImportParams::new(block_origin, header);
264 import_block.import_existing = true;
265 import_block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
266 import_block.state_action = StateAction::Skip;
267 import_block
268 };
269 self.domain_block_processor
270 .import_domain_block(block_import_params)
271 .await?;
272 assert_eq!(domain_tip, self.client.info().best_hash);
273
274 self.finalize_domain_block((consensus_block_hash, consensus_block_number))?;
276 }
277
278 self.domain_receipts_checker
281 .maybe_submit_fraud_proof(consensus_block_hash)?;
282 }
283
284 Ok(())
285 }
286
287 fn finalize_domain_block(
289 &self,
290 consensus_block_info: (CBlock::Hash, NumberFor<CBlock>),
291 ) -> sp_blockchain::Result<()> {
292 if let Some(confirmed_consensus_block) = consensus_block_info
293 .1
294 .checked_sub(&self.confirmation_depth_k)
295 {
296 let runtime_api = self.consensus_client.runtime_api();
297 let Some(confirmed_consensus_block_hash) =
298 self.consensus_client.hash(confirmed_consensus_block)?
299 else {
300 return Ok(());
304 };
305
306 let finalized_domain_block_number = self.client.info().finalized_number;
307 if let Some(confirmed_domain_block) = runtime_api
308 .latest_confirmed_domain_block(confirmed_consensus_block_hash, self.domain_id)?
309 && confirmed_domain_block.0 > finalized_domain_block_number
310 {
311 self.client
312 .finalize_block(confirmed_domain_block.1, None, true)?;
313 }
314 }
315 Ok(())
316 }
317
318 async fn process_bundles_at(
319 &self,
320 consensus_block_info: (CBlock::Hash, NumberFor<CBlock>),
321 parent_info: (Block::Hash, NumberFor<Block>),
322 ) -> sp_blockchain::Result<Option<(Block::Hash, NumberFor<Block>)>> {
323 let (consensus_block_hash, consensus_block_number) = consensus_block_info;
324 let (parent_hash, parent_number) = parent_info;
325 let start = Instant::now();
326
327 tracing::debug!(
328 "Building a new domain block from consensus block #{consensus_block_number},{consensus_block_hash} \
329 on top of parent block #{parent_number},{parent_hash}"
330 );
331
332 let maybe_preprocess_result = self
333 .domain_block_preprocessor
334 .preprocess_consensus_block(consensus_block_hash, (parent_hash, parent_number))?;
335
336 let preprocess_took = start.elapsed().as_millis();
337 if preprocess_took >= SLOW_PREPROCESS_MILLIS.into() {
338 tracing::warn!(
339 ?consensus_block_info,
340 "Slow consensus block preprocessing, took {preprocess_took}ms"
341 );
342 }
343
344 let Some(preprocess_result) = maybe_preprocess_result else {
345 tracing::debug!(
346 "Skip building new domain block, no bundles and runtime upgrade for this domain \
347 in consensus block #{consensus_block_number:?},{consensus_block_hash}"
348 );
349 self.domain_block_processor
350 .on_consensus_block_processed(consensus_block_hash, None)?;
351 return Ok(None);
352 };
353
354 let inherent_digests = Digest {
355 logs: vec![DigestItem::consensus_block_info(consensus_block_hash)],
356 };
357
358 let domain_block_result = self
359 .domain_block_processor
360 .process_domain_block(
361 (consensus_block_hash, consensus_block_number),
362 (parent_hash, parent_number),
363 preprocess_result,
364 inherent_digests,
365 )
366 .await?;
367
368 let head_receipt_number = self
369 .consensus_client
370 .runtime_api()
371 .head_receipt_number(consensus_block_hash, self.domain_id)?;
372 assert!(
373 domain_block_result.header_number > head_receipt_number,
374 "Domain chain number must larger than the head number of the receipt chain \
375 (which is maintained on the consensus chain) by at least 1"
376 );
377
378 let built_block_info = (
379 domain_block_result.header_hash,
380 domain_block_result.header_number,
381 );
382
383 let block_execution_took = start.elapsed().as_millis().saturating_sub(preprocess_took);
384
385 let reference_block_execution_duration_ms = self
386 .client
387 .runtime_api()
388 .block_weight(domain_block_result.header_hash)?
389 .ref_time()
390 / WEIGHT_REF_TIME_PER_MILLIS;
391 if block_execution_took
392 >= slow_domain_block_execution_threshold(reference_block_execution_duration_ms).into()
393 {
394 tracing::warn!(
395 ?consensus_block_info,
396 ?built_block_info,
397 ?reference_block_execution_duration_ms,
398 "Slow domain block execution, took {block_execution_took}ms"
399 );
400 }
401
402 self.domain_block_processor
403 .on_consensus_block_processed(consensus_block_hash, Some(domain_block_result))?;
404
405 let post_block_execution_took = start
406 .elapsed()
407 .as_millis()
408 .saturating_sub(preprocess_took + block_execution_took);
409 if post_block_execution_took >= SLOW_POST_DOMAIN_BLOCK_EXECUTION_MILLIS.into() {
410 tracing::warn!(
411 ?consensus_block_info,
412 ?built_block_info,
413 "Slow post domain block execution, took {post_block_execution_took}ms"
414 );
415 }
416
417 Ok(Some(built_block_info))
418 }
419}