domain_client_operator/
operator.rs

1use crate::bundle_processor::BundleProcessor;
2use crate::domain_block_processor::{DomainBlockProcessor, ReceiptsChecker};
3use crate::domain_bundle_producer::{
4    uses_default_bundle_producer_params, BundleProducer, DomainBundleProducer, TestBundleProducer,
5};
6use crate::domain_bundle_proposer::DomainBundleProposer;
7use crate::fraud_proof::FraudProofGenerator;
8use crate::snap_sync::{snap_sync, SyncParams, LOG_TARGET};
9use crate::{NewSlotNotification, OperatorParams};
10use futures::channel::mpsc;
11use futures::future::pending;
12use futures::{FutureExt, SinkExt, Stream, StreamExt};
13use sc_client_api::{
14    AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider, Finalizer,
15    ProofProvider,
16};
17use sc_consensus::BlockImport;
18use sc_executor::RuntimeVersionOf;
19use sp_api::ProvideRuntimeApi;
20use sp_blockchain::{HeaderBackend, HeaderMetadata};
21use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
22use sp_core::H256;
23use sp_domains::core_api::DomainCoreApi;
24use sp_domains::{BundleProducerElectionApi, DomainsApi};
25use sp_domains_fraud_proof::FraudProofApi;
26use sp_keystore::KeystorePtr;
27use sp_messenger::MessengerApi;
28use sp_mmr_primitives::MmrApi;
29use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
30use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
31use std::sync::Arc;
32use subspace_runtime_primitives::Balance;
33use tracing::{error, info, trace, warn};
34
35/// Domain operator.
36pub struct Operator<Block, CBlock, Client, CClient, TransactionPool, Backend, E>
37where
38    Block: BlockT,
39    CBlock: BlockT,
40{
41    consensus_client: Arc<CClient>,
42    client: Arc<Client>,
43    pub transaction_pool: Arc<TransactionPool>,
44    backend: Arc<Backend>,
45    fraud_proof_generator: FraudProofGenerator<Block, CBlock, Client, CClient, Backend, E>,
46    bundle_processor: BundleProcessor<Block, CBlock, Client, CClient, Backend, E>,
47    domain_block_processor: DomainBlockProcessor<Block, CBlock, Client, CClient, Backend, E>,
48    pub keystore: KeystorePtr,
49}
50
51impl<Block, CBlock, Client, CClient, TransactionPool, Backend, E> Clone
52    for Operator<Block, CBlock, Client, CClient, TransactionPool, Backend, E>
53where
54    Block: BlockT,
55    CBlock: BlockT,
56{
57    fn clone(&self) -> Self {
58        Self {
59            consensus_client: self.consensus_client.clone(),
60            client: self.client.clone(),
61            transaction_pool: self.transaction_pool.clone(),
62            backend: self.backend.clone(),
63            fraud_proof_generator: self.fraud_proof_generator.clone(),
64            bundle_processor: self.bundle_processor.clone(),
65            domain_block_processor: self.domain_block_processor.clone(),
66            keystore: self.keystore.clone(),
67        }
68    }
69}
70
71impl<Block, CBlock, Client, CClient, TransactionPool, Backend, E>
72    Operator<Block, CBlock, Client, CClient, TransactionPool, Backend, E>
73where
74    Block: BlockT,
75    Block::Hash: Into<H256>,
76    CBlock: BlockT,
77    NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
78    CBlock::Hash: From<Block::Hash>,
79    Client: HeaderBackend<Block>
80        + BlockBackend<Block>
81        + AuxStore
82        + ProvideRuntimeApi<Block>
83        + ProofProvider<Block>
84        + Finalizer<Block, Backend>
85        + BlockImport<Block>
86        + BlockchainEvents<Block>
87        + ExecutorProvider<Block>
88        + 'static,
89    for<'a> &'a Client: BlockImport<Block>,
90    Client::Api: DomainCoreApi<Block>
91        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
92        + sp_block_builder::BlockBuilder<Block>
93        + sp_api::ApiExt<Block>
94        + TaggedTransactionQueue<Block>,
95    CClient: HeaderBackend<CBlock>
96        + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
97        + BlockBackend<CBlock>
98        + ProvideRuntimeApi<CBlock>
99        + ProofProvider<CBlock>
100        + BlockchainEvents<CBlock>
101        + Send
102        + Sync
103        + 'static,
104    CClient::Api: DomainsApi<CBlock, Block::Header>
105        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
106        + BundleProducerElectionApi<CBlock, Balance>
107        + FraudProofApi<CBlock, Block::Header>
108        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
109    Backend: sc_client_api::Backend<Block> + Send + Sync + 'static,
110    TransactionPool:
111        sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash> + 'static,
112    E: CodeExecutor + RuntimeVersionOf,
113{
114    /// Create a new instance.
115    #[allow(clippy::type_complexity)]
116    pub async fn new<IBNS, CIBNS, NSNS, ASS>(
117        spawn_essential: Box<dyn SpawnEssentialNamed>,
118        mut params: OperatorParams<
119            Block,
120            CBlock,
121            Client,
122            CClient,
123            TransactionPool,
124            Backend,
125            E,
126            IBNS,
127            CIBNS,
128            NSNS,
129            ASS,
130        >,
131    ) -> Result<Self, sp_consensus::Error>
132    where
133        IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + Unpin + 'static,
134        CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + Unpin + 'static,
135        NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
136        ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
137    {
138        let domain_bundle_proposer = DomainBundleProposer::<Block, _, CBlock, _, _>::new(
139            params.domain_id,
140            params.client.clone(),
141            params.consensus_client.clone(),
142            params.transaction_pool.clone(),
143        );
144
145        let bundle_producer = if uses_default_bundle_producer_params(
146            params.skip_empty_bundle_production,
147            params.skip_out_of_order_slot,
148        ) {
149            Box::new(DomainBundleProducer::new(
150                params.domain_id,
151                params.consensus_client.clone(),
152                params.client.clone(),
153                domain_bundle_proposer,
154                params.bundle_sender,
155                params.keystore.clone(),
156            )) as Box<dyn BundleProducer<Block, CBlock> + Send>
157        } else {
158            // TODO: only allow the test bundle producer in tests (ticket #3162)
159            warn!("Using test bundle producer...");
160            Box::new(TestBundleProducer::new(
161                params.domain_id,
162                params.consensus_client.clone(),
163                params.client.clone(),
164                domain_bundle_proposer,
165                params.bundle_sender,
166                params.keystore.clone(),
167                params.skip_empty_bundle_production,
168                params.skip_out_of_order_slot,
169            )) as Box<dyn BundleProducer<Block, CBlock> + Send>
170        };
171
172        let fraud_proof_generator = FraudProofGenerator::new(
173            params.client.clone(),
174            params.consensus_client.clone(),
175            params.backend.clone(),
176            params.code_executor.clone(),
177        );
178
179        let domain_block_processor = DomainBlockProcessor {
180            domain_id: params.domain_id,
181            domain_created_at: params.domain_created_at,
182            client: params.client.clone(),
183            consensus_client: params.consensus_client.clone(),
184            backend: params.backend.clone(),
185            block_import: params.block_import,
186            import_notification_sinks: Default::default(),
187            domain_sync_oracle: params.domain_sync_oracle.clone(),
188            domain_executor: params.code_executor.clone(),
189            challenge_period: params.challenge_period,
190        };
191
192        let receipts_checker = ReceiptsChecker {
193            domain_id: params.domain_id,
194            client: params.client.clone(),
195            consensus_client: params.consensus_client.clone(),
196            fraud_proof_generator: fraud_proof_generator.clone(),
197            domain_sync_oracle: params.domain_sync_oracle,
198            consensus_offchain_tx_pool_factory: params.consensus_offchain_tx_pool_factory.clone(),
199        };
200
201        let bundle_processor = BundleProcessor::new(
202            params.domain_id,
203            params.consensus_client.clone(),
204            params.client.clone(),
205            params.backend.clone(),
206            receipts_checker,
207            domain_block_processor.clone(),
208            params.consensus_confirmation_depth_k,
209        );
210
211        let target_block_number = params
212            .consensus_chain_sync_params
213            .as_ref()
214            .map(|p| p.last_domain_block_er.consensus_block_number);
215
216        let sync_params = params
217            .consensus_chain_sync_params
218            .map(|consensus_sync_params| SyncParams {
219                domain_client: params.client.clone(),
220                domain_network_service_handle: params.domain_network_service_handle,
221                sync_service: params.sync_service,
222                domain_block_downloader: params.block_downloader.clone(),
223                consensus_chain_sync_params: consensus_sync_params,
224                domain_fork_id: params.domain_fork_id,
225                challenge_period: params.challenge_period,
226            });
227
228        if let Some(sync_params) = sync_params {
229            let domain_sync_task = {
230                async move {
231                    let info = sync_params.domain_client.info();
232                    // Only attempt snap sync with genesis state
233                    // TODO: Support snap sync from any state once
234                    //  https://github.com/paritytech/polkadot-sdk/issues/5366 is resolved
235                    if info.best_hash == info.genesis_hash {
236                        info!(target: LOG_TARGET, "Starting domain snap sync...");
237
238                        let result = snap_sync(sync_params).await;
239
240                        match result {
241                            Ok(_) => {
242                                info!(target: LOG_TARGET, "Domain snap sync completed.");
243                            }
244                            Err(err) => {
245                                error!(target: LOG_TARGET, %err, "Domain snap sync failed.");
246                                info!(target: LOG_TARGET, "Wipe the DB and restart the application with --sync=full.");
247
248                                // essential task failed
249                                return;
250                            }
251                        };
252                    } else {
253                        error!(target: LOG_TARGET, "Snap sync can only work with genesis state.");
254                        info!(target: LOG_TARGET, "Wipe the DB and restart the application with --sync=full.");
255
256                        // essential task failed
257                        return;
258                    }
259
260                    // Don't exit essential task.
261                    pending().await
262                }
263            };
264
265            spawn_essential.spawn_essential("domain-sync", None, Box::pin(domain_sync_task));
266        }
267
268        let start_worker_task = {
269            let consensus_client = params.consensus_client.clone();
270            let spawn_essential = spawn_essential.clone();
271            let bundle_processor = bundle_processor.clone();
272            async move {
273                // Wait for the target block to import if we are snap syncing
274                if let Some(target_block_number) = target_block_number {
275                    // Wait for Subspace block importing notifications
276                    let block_importing_notification_stream =
277                        &mut params.operator_streams.block_importing_notification_stream;
278
279                    while let Some((block_number, mut acknowledgement_sender)) =
280                        block_importing_notification_stream.next().await
281                    {
282                        trace!(%block_number, "Acknowledged block import from consensus chain.");
283                        if acknowledgement_sender.send(()).await.is_err() {
284                            error!("Can't acknowledge block import #{}", block_number);
285                            return Err(());
286                        }
287
288                        if block_number >= target_block_number {
289                            break;
290                        }
291                    }
292
293                    // Drain Substrate block imported notifications
294                    let imported_block_notification_stream =
295                        &mut params.operator_streams.imported_block_notification_stream;
296
297                    while let Some(import_notification) =
298                        imported_block_notification_stream.next().await
299                    {
300                        let block_number = *import_notification.header.number();
301                        trace!(%block_number, "Block imported from consensus chain.");
302
303                        if block_number >= target_block_number {
304                            break;
305                        }
306                    }
307                }
308
309                crate::domain_worker::start_worker(
310                    spawn_essential.clone(),
311                    consensus_client,
312                    params.consensus_offchain_tx_pool_factory.clone(),
313                    params.maybe_operator_id,
314                    bundle_producer,
315                    bundle_processor.clone(),
316                    params.operator_streams,
317                )
318                .await;
319
320                Ok(())
321            }
322        };
323
324        spawn_essential.spawn_essential_blocking(
325            "domain-operator-worker",
326            None,
327            Box::pin(start_worker_task.map(|_| ())),
328        );
329
330        Ok(Self {
331            consensus_client: params.consensus_client,
332            client: params.client,
333            transaction_pool: params.transaction_pool,
334            backend: params.backend,
335            fraud_proof_generator,
336            bundle_processor,
337            domain_block_processor,
338            keystore: params.keystore,
339        })
340    }
341
342    /// Processes the bundles extracted from the consensus block.
343    // TODO: Remove this whole method, `self.bundle_processor` as a property and fix
344    // `set_new_code_should_work` test to do an actual runtime upgrade
345    #[doc(hidden)]
346    pub async fn process_bundles(
347        self,
348        consensus_block_info: (CBlock::Hash, NumberFor<CBlock>, bool),
349    ) {
350        if let Err(err) = self
351            .bundle_processor
352            .process_bundles(consensus_block_info)
353            .await
354        {
355            tracing::error!(?consensus_block_info, ?err, "Error at processing bundles.");
356        }
357    }
358}