domain_client_operator/
bundle_processor.rs

1use crate::domain_block_processor::{
2    DomainBlockProcessor, PendingConsensusBlocks, ReceiptsChecker,
3};
4use crate::ExecutionReceiptFor;
5use domain_block_preprocessor::DomainBlockPreprocessor;
6use sc_client_api::{AuxStore, BlockBackend, ExecutorProvider, Finalizer, ProofProvider};
7use sc_consensus::{BlockImportParams, ForkChoiceStrategy, StateAction};
8use sc_executor::RuntimeVersionOf;
9use sp_api::ProvideRuntimeApi;
10use sp_blockchain::{HeaderBackend, HeaderMetadata};
11use sp_consensus::BlockOrigin;
12use sp_core::traits::CodeExecutor;
13use sp_core::H256;
14use sp_domain_digests::AsPredigest;
15use sp_domains::core_api::DomainCoreApi;
16use sp_domains::{DomainId, DomainsApi, ReceiptValidity};
17use sp_domains_fraud_proof::FraudProofApi;
18use sp_messenger::MessengerApi;
19use sp_mmr_primitives::MmrApi;
20use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor, Zero};
21use sp_runtime::{Digest, DigestItem};
22use sp_weights::constants::WEIGHT_REF_TIME_PER_MILLIS;
23use std::sync::Arc;
24use std::time::Instant;
25
26// The slow log threshold for consensus block preprocessing
27const SLOW_PREPROCESS_MILLIS: u64 = 500;
28
29// The slow log threshold for domain block execution: `reference_duration_ms * 1.2 + 200ms`,
30// where `reference_duration_ms * 0.2` as buffer of the slow extrinsic execution (i.e. slower
31// machine than the reference machine) and 200ms as buffer of the rest of the processing.
32fn slow_domain_block_execution_threshold(reference_duration_ms: u64) -> u64 {
33    reference_duration_ms + (reference_duration_ms / 5) + 200
34}
35
36// The slow log threshold for post domain block execution
37const SLOW_POST_DOMAIN_BLOCK_EXECUTION_MILLIS: u64 = 250;
38
39type DomainReceiptsChecker<Block, CBlock, Client, CClient, Backend, E> =
40    ReceiptsChecker<Block, Client, CBlock, CClient, Backend, E>;
41
42pub(crate) struct BundleProcessor<Block, CBlock, Client, CClient, Backend, E>
43where
44    Block: BlockT,
45    CBlock: BlockT,
46{
47    domain_id: DomainId,
48    consensus_client: Arc<CClient>,
49    client: Arc<Client>,
50    backend: Arc<Backend>,
51    domain_receipts_checker: DomainReceiptsChecker<Block, CBlock, Client, CClient, Backend, E>,
52    domain_block_preprocessor: DomainBlockPreprocessor<
53        Block,
54        CBlock,
55        Client,
56        CClient,
57        E,
58        Backend,
59        ReceiptValidator<Client>,
60    >,
61    domain_block_processor: DomainBlockProcessor<Block, CBlock, Client, CClient, Backend, E>,
62    confirmation_depth_k: NumberFor<CBlock>,
63}
64
65impl<Block, CBlock, Client, CClient, Backend, E> Clone
66    for BundleProcessor<Block, CBlock, Client, CClient, Backend, E>
67where
68    Block: BlockT,
69    CBlock: BlockT,
70{
71    fn clone(&self) -> Self {
72        Self {
73            domain_id: self.domain_id,
74            consensus_client: self.consensus_client.clone(),
75            client: self.client.clone(),
76            backend: self.backend.clone(),
77            domain_receipts_checker: self.domain_receipts_checker.clone(),
78            domain_block_preprocessor: self.domain_block_preprocessor.clone(),
79            domain_block_processor: self.domain_block_processor.clone(),
80            confirmation_depth_k: self.confirmation_depth_k,
81        }
82    }
83}
84
85struct ReceiptValidator<Client> {
86    client: Arc<Client>,
87}
88
89impl<Client> Clone for ReceiptValidator<Client> {
90    fn clone(&self) -> Self {
91        Self {
92            client: self.client.clone(),
93        }
94    }
95}
96
97impl<Client> ReceiptValidator<Client> {
98    pub fn new(client: Arc<Client>) -> Self {
99        Self { client }
100    }
101}
102
103impl<Block, CBlock, Client> domain_block_preprocessor::ValidateReceipt<Block, CBlock>
104    for ReceiptValidator<Client>
105where
106    Block: BlockT,
107    CBlock: BlockT,
108    Client: AuxStore,
109{
110    fn validate_receipt(
111        &self,
112        receipt: &ExecutionReceiptFor<Block, CBlock>,
113    ) -> sp_blockchain::Result<ReceiptValidity> {
114        // Skip genesis receipt as it has been already verified by the consensus chain.
115        if receipt.domain_block_number.is_zero() {
116            return Ok(ReceiptValidity::Valid);
117        }
118
119        let consensus_block_hash = receipt.consensus_block_hash;
120        let _local_receipt = crate::aux_schema::load_execution_receipt::<_, Block, CBlock>(
121            &*self.client,
122            consensus_block_hash,
123        )?
124        .ok_or_else(|| {
125            sp_blockchain::Error::Backend(format!(
126                "Receipt for consensus block {consensus_block_hash} not found"
127            ))
128        })?;
129
130        Ok(ReceiptValidity::Valid)
131    }
132}
133
134impl<Block, CBlock, Client, CClient, Backend, E>
135    BundleProcessor<Block, CBlock, Client, CClient, Backend, E>
136where
137    Block: BlockT,
138    Block::Hash: Into<H256>,
139    CBlock: BlockT,
140    NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
141    CBlock::Hash: From<Block::Hash>,
142    Client: HeaderBackend<Block>
143        + BlockBackend<Block>
144        + AuxStore
145        + ProvideRuntimeApi<Block>
146        + ProofProvider<Block>
147        + ExecutorProvider<Block>
148        + Finalizer<Block, Backend>
149        + 'static,
150    Client::Api: DomainCoreApi<Block>
151        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
152        + sp_block_builder::BlockBuilder<Block>
153        + sp_api::ApiExt<Block>,
154    CClient: HeaderBackend<CBlock>
155        + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
156        + BlockBackend<CBlock>
157        + ProofProvider<CBlock>
158        + ProvideRuntimeApi<CBlock>
159        + 'static,
160    CClient::Api: DomainsApi<CBlock, Block::Header>
161        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
162        + FraudProofApi<CBlock, Block::Header>
163        + MmrApi<CBlock, H256, NumberFor<CBlock>>
164        + 'static,
165    Backend: sc_client_api::Backend<Block> + 'static,
166    E: CodeExecutor + RuntimeVersionOf,
167{
168    pub(crate) fn new(
169        domain_id: DomainId,
170        consensus_client: Arc<CClient>,
171        client: Arc<Client>,
172        backend: Arc<Backend>,
173        domain_receipts_checker: DomainReceiptsChecker<Block, CBlock, Client, CClient, Backend, E>,
174        domain_block_processor: DomainBlockProcessor<Block, CBlock, Client, CClient, Backend, E>,
175        confirmation_depth_k: NumberFor<CBlock>,
176    ) -> Self {
177        let domain_block_preprocessor = DomainBlockPreprocessor::new(
178            domain_id,
179            client.clone(),
180            consensus_client.clone(),
181            domain_block_processor.domain_executor.clone(),
182            domain_block_processor.backend.clone(),
183            ReceiptValidator::new(client.clone()),
184        );
185        Self {
186            domain_id,
187            consensus_client,
188            client,
189            backend,
190            domain_receipts_checker,
191            domain_block_preprocessor,
192            domain_block_processor,
193            confirmation_depth_k,
194        }
195    }
196
197    // TODO: Handle the returned error properly, ref to https://github.com/autonomys/subspace/pull/695#discussion_r926721185
198    pub(crate) async fn process_bundles(
199        self,
200        consensus_block_info: (CBlock::Hash, NumberFor<CBlock>, bool),
201    ) -> sp_blockchain::Result<()> {
202        let (consensus_block_hash, consensus_block_number, is_new_best) = consensus_block_info;
203
204        // Skip processing the blocks of the non-canonical chain, these blocks will be processed if
205        // the chain becomes canonical later
206        if !is_new_best {
207            return Ok(());
208        }
209
210        tracing::debug!(
211            "Processing consensus block #{consensus_block_number},{consensus_block_hash}"
212        );
213
214        let maybe_pending_consensus_blocks = self
215            .domain_block_processor
216            .pending_imported_consensus_blocks(consensus_block_hash, consensus_block_number)?;
217
218        if let Some(PendingConsensusBlocks {
219            initial_parent,
220            consensus_imports,
221        }) = maybe_pending_consensus_blocks
222        {
223            tracing::trace!(
224                ?initial_parent,
225                ?consensus_imports,
226                "Pending consensus blocks to process"
227            );
228
229            let mut domain_parent = initial_parent;
230
231            for consensus_info in consensus_imports {
232                if let Some(next_domain_parent) = self
233                    .process_bundles_at((consensus_info.hash, consensus_info.number), domain_parent)
234                    .await?
235                {
236                    domain_parent = next_domain_parent;
237                }
238            }
239
240            // The domain branch driving from the best consensus branch should also be the best domain branch even
241            // if it is no the longest domain branch. Thus re-import the tip of the best domain branch to make it
242            // the new best block if it isn't.
243            //
244            // Note: this may cause the best domain fork switch to a shorter fork or in some case the best domain
245            // block become the ancestor block of the current best block.
246            let domain_tip = domain_parent.0;
247            if self.client.info().best_hash != domain_tip {
248                let header = self.client.header(domain_tip)?.ok_or_else(|| {
249                    sp_blockchain::Error::Backend(format!("Header for #{:?} not found", domain_tip))
250                })?;
251                let block_origin = if self
252                    .domain_block_processor
253                    .domain_sync_oracle
254                    .is_major_syncing()
255                {
256                    // The domain block is derived from the consensus block, if the consensus chain is
257                    // in major sync then we should also consider the domain block is `NetworkInitialSync`
258                    BlockOrigin::NetworkInitialSync
259                } else {
260                    BlockOrigin::Own
261                };
262                let block_import_params = {
263                    let mut import_block = BlockImportParams::new(block_origin, header);
264                    import_block.import_existing = true;
265                    import_block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
266                    import_block.state_action = StateAction::Skip;
267                    import_block
268                };
269                self.domain_block_processor
270                    .import_domain_block(block_import_params)
271                    .await?;
272                assert_eq!(domain_tip, self.client.info().best_hash);
273
274                // finalize the latest confirmed domain block in the finalized Consensus block
275                self.finalize_domain_block((consensus_block_hash, consensus_block_number))?;
276            }
277
278            // Check the ER submitted to consensus chain and submit fraud proof if there is bad ER
279            // NOTE: this have to be done after the recorrect of the best domain fork happen above
280            self.domain_receipts_checker
281                .maybe_submit_fraud_proof(consensus_block_hash)?;
282        }
283
284        Ok(())
285    }
286
287    // finalize the domain block which is confirmed in the finalized consensus block.
288    fn finalize_domain_block(
289        &self,
290        consensus_block_info: (CBlock::Hash, NumberFor<CBlock>),
291    ) -> sp_blockchain::Result<()> {
292        if let Some(confirmed_consensus_block) = consensus_block_info
293            .1
294            .checked_sub(&self.confirmation_depth_k)
295        {
296            let runtime_api = self.consensus_client.runtime_api();
297            let Some(confirmed_consensus_block_hash) =
298                self.consensus_client.hash(confirmed_consensus_block)?
299            else {
300                // when the consensus node snap synced, it is possible that confirmed_consensus_block is less
301                // than the first block imported during the consensus snap sync.
302                // If so, block data is not present. So we simply skip finalization for this block.
303                return Ok(());
304            };
305
306            let finalized_domain_block_number = self.client.info().finalized_number;
307            if let Some(confirmed_domain_block) = runtime_api
308                .latest_confirmed_domain_block(confirmed_consensus_block_hash, self.domain_id)?
309                && confirmed_domain_block.0 > finalized_domain_block_number
310            {
311                self.client
312                    .finalize_block(confirmed_domain_block.1, None, true)?;
313            }
314        }
315        Ok(())
316    }
317
318    async fn process_bundles_at(
319        &self,
320        consensus_block_info: (CBlock::Hash, NumberFor<CBlock>),
321        parent_info: (Block::Hash, NumberFor<Block>),
322    ) -> sp_blockchain::Result<Option<(Block::Hash, NumberFor<Block>)>> {
323        let (consensus_block_hash, consensus_block_number) = consensus_block_info;
324        let (parent_hash, parent_number) = parent_info;
325        let start = Instant::now();
326
327        tracing::debug!(
328            "Building a new domain block from consensus block #{consensus_block_number},{consensus_block_hash} \
329            on top of parent block #{parent_number},{parent_hash}"
330        );
331
332        let maybe_preprocess_result = self
333            .domain_block_preprocessor
334            .preprocess_consensus_block(consensus_block_hash, (parent_hash, parent_number))?;
335
336        let preprocess_took = start.elapsed().as_millis();
337        if preprocess_took >= SLOW_PREPROCESS_MILLIS.into() {
338            tracing::warn!(
339                ?consensus_block_info,
340                "Slow consensus block preprocessing, took {preprocess_took}ms"
341            );
342        }
343
344        let Some(preprocess_result) = maybe_preprocess_result else {
345            tracing::debug!(
346                "Skip building new domain block, no bundles and runtime upgrade for this domain \
347                    in consensus block #{consensus_block_number:?},{consensus_block_hash}"
348            );
349            self.domain_block_processor
350                .on_consensus_block_processed(consensus_block_hash, None)?;
351            return Ok(None);
352        };
353
354        let inherent_digests = Digest {
355            logs: vec![DigestItem::consensus_block_info(consensus_block_hash)],
356        };
357
358        let domain_block_result = self
359            .domain_block_processor
360            .process_domain_block(
361                (consensus_block_hash, consensus_block_number),
362                (parent_hash, parent_number),
363                preprocess_result,
364                inherent_digests,
365            )
366            .await?;
367
368        let head_receipt_number = self
369            .consensus_client
370            .runtime_api()
371            .head_receipt_number(consensus_block_hash, self.domain_id)?;
372        assert!(
373            domain_block_result.header_number > head_receipt_number,
374            "Domain chain number must larger than the head number of the receipt chain \
375            (which is maintained on the consensus chain) by at least 1"
376        );
377
378        let built_block_info = (
379            domain_block_result.header_hash,
380            domain_block_result.header_number,
381        );
382
383        let block_execution_took = start.elapsed().as_millis().saturating_sub(preprocess_took);
384
385        let reference_block_execution_duration_ms = self
386            .client
387            .runtime_api()
388            .block_weight(domain_block_result.header_hash)?
389            .ref_time()
390            / WEIGHT_REF_TIME_PER_MILLIS;
391        if block_execution_took
392            >= slow_domain_block_execution_threshold(reference_block_execution_duration_ms).into()
393        {
394            tracing::warn!(
395                ?consensus_block_info,
396                ?built_block_info,
397                ?reference_block_execution_duration_ms,
398                "Slow domain block execution, took {block_execution_took}ms"
399            );
400        }
401
402        self.domain_block_processor
403            .on_consensus_block_processed(consensus_block_hash, Some(domain_block_result))?;
404
405        let post_block_execution_took = start
406            .elapsed()
407            .as_millis()
408            .saturating_sub(preprocess_took + block_execution_took);
409        if post_block_execution_took >= SLOW_POST_DOMAIN_BLOCK_EXECUTION_MILLIS.into() {
410            tracing::warn!(
411                ?consensus_block_info,
412                ?built_block_info,
413                "Slow post domain block execution, took {post_block_execution_took}ms"
414            );
415        }
416
417        Ok(Some(built_block_info))
418    }
419}