1use crate::bundle_processor::BundleProcessor;
2use crate::domain_block_processor::{DomainBlockProcessor, ReceiptsChecker};
3use crate::domain_bundle_producer::{
4 uses_default_bundle_producer_params, BundleProducer, DomainBundleProducer, TestBundleProducer,
5};
6use crate::domain_bundle_proposer::DomainBundleProposer;
7use crate::fraud_proof::FraudProofGenerator;
8use crate::snap_sync::{snap_sync, SyncParams, LOG_TARGET};
9use crate::{NewSlotNotification, OperatorParams};
10use futures::channel::mpsc;
11use futures::future::pending;
12use futures::{FutureExt, SinkExt, Stream, StreamExt};
13use sc_client_api::{
14 AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider, Finalizer,
15 ProofProvider,
16};
17use sc_consensus::BlockImport;
18use sc_executor::RuntimeVersionOf;
19use sp_api::ProvideRuntimeApi;
20use sp_blockchain::{HeaderBackend, HeaderMetadata};
21use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
22use sp_core::H256;
23use sp_domains::core_api::DomainCoreApi;
24use sp_domains::{BundleProducerElectionApi, DomainsApi};
25use sp_domains_fraud_proof::FraudProofApi;
26use sp_keystore::KeystorePtr;
27use sp_messenger::MessengerApi;
28use sp_mmr_primitives::MmrApi;
29use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
30use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
31use std::sync::Arc;
32use subspace_runtime_primitives::Balance;
33use tracing::{error, info, trace, warn};
34
35pub struct Operator<Block, CBlock, Client, CClient, TransactionPool, Backend, E>
37where
38 Block: BlockT,
39 CBlock: BlockT,
40{
41 consensus_client: Arc<CClient>,
42 client: Arc<Client>,
43 pub transaction_pool: Arc<TransactionPool>,
44 backend: Arc<Backend>,
45 fraud_proof_generator: FraudProofGenerator<Block, CBlock, Client, CClient, Backend, E>,
46 bundle_processor: BundleProcessor<Block, CBlock, Client, CClient, Backend, E>,
47 domain_block_processor: DomainBlockProcessor<Block, CBlock, Client, CClient, Backend, E>,
48 pub keystore: KeystorePtr,
49}
50
51impl<Block, CBlock, Client, CClient, TransactionPool, Backend, E> Clone
52 for Operator<Block, CBlock, Client, CClient, TransactionPool, Backend, E>
53where
54 Block: BlockT,
55 CBlock: BlockT,
56{
57 fn clone(&self) -> Self {
58 Self {
59 consensus_client: self.consensus_client.clone(),
60 client: self.client.clone(),
61 transaction_pool: self.transaction_pool.clone(),
62 backend: self.backend.clone(),
63 fraud_proof_generator: self.fraud_proof_generator.clone(),
64 bundle_processor: self.bundle_processor.clone(),
65 domain_block_processor: self.domain_block_processor.clone(),
66 keystore: self.keystore.clone(),
67 }
68 }
69}
70
71impl<Block, CBlock, Client, CClient, TransactionPool, Backend, E>
72 Operator<Block, CBlock, Client, CClient, TransactionPool, Backend, E>
73where
74 Block: BlockT,
75 Block::Hash: Into<H256>,
76 CBlock: BlockT,
77 NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
78 CBlock::Hash: From<Block::Hash>,
79 Client: HeaderBackend<Block>
80 + BlockBackend<Block>
81 + AuxStore
82 + ProvideRuntimeApi<Block>
83 + ProofProvider<Block>
84 + Finalizer<Block, Backend>
85 + BlockImport<Block>
86 + BlockchainEvents<Block>
87 + ExecutorProvider<Block>
88 + 'static,
89 for<'a> &'a Client: BlockImport<Block>,
90 Client::Api: DomainCoreApi<Block>
91 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
92 + sp_block_builder::BlockBuilder<Block>
93 + sp_api::ApiExt<Block>
94 + TaggedTransactionQueue<Block>,
95 CClient: HeaderBackend<CBlock>
96 + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
97 + BlockBackend<CBlock>
98 + ProvideRuntimeApi<CBlock>
99 + ProofProvider<CBlock>
100 + BlockchainEvents<CBlock>
101 + Send
102 + Sync
103 + 'static,
104 CClient::Api: DomainsApi<CBlock, Block::Header>
105 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
106 + BundleProducerElectionApi<CBlock, Balance>
107 + FraudProofApi<CBlock, Block::Header>
108 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
109 Backend: sc_client_api::Backend<Block> + Send + Sync + 'static,
110 TransactionPool:
111 sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash> + 'static,
112 E: CodeExecutor + RuntimeVersionOf,
113{
114 #[allow(clippy::type_complexity)]
116 pub async fn new<IBNS, CIBNS, NSNS, ASS>(
117 spawn_essential: Box<dyn SpawnEssentialNamed>,
118 mut params: OperatorParams<
119 Block,
120 CBlock,
121 Client,
122 CClient,
123 TransactionPool,
124 Backend,
125 E,
126 IBNS,
127 CIBNS,
128 NSNS,
129 ASS,
130 >,
131 ) -> Result<Self, sp_consensus::Error>
132 where
133 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + Unpin + 'static,
134 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + Unpin + 'static,
135 NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
136 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
137 {
138 let domain_bundle_proposer = DomainBundleProposer::<Block, _, CBlock, _, _>::new(
139 params.domain_id,
140 params.client.clone(),
141 params.consensus_client.clone(),
142 params.transaction_pool.clone(),
143 );
144
145 let bundle_producer = if uses_default_bundle_producer_params(
146 params.skip_empty_bundle_production,
147 params.skip_out_of_order_slot,
148 ) {
149 Box::new(DomainBundleProducer::new(
150 params.domain_id,
151 params.consensus_client.clone(),
152 params.client.clone(),
153 domain_bundle_proposer,
154 params.bundle_sender,
155 params.keystore.clone(),
156 )) as Box<dyn BundleProducer<Block, CBlock> + Send>
157 } else {
158 warn!("Using test bundle producer...");
160 Box::new(TestBundleProducer::new(
161 params.domain_id,
162 params.consensus_client.clone(),
163 params.client.clone(),
164 domain_bundle_proposer,
165 params.bundle_sender,
166 params.keystore.clone(),
167 params.skip_empty_bundle_production,
168 params.skip_out_of_order_slot,
169 )) as Box<dyn BundleProducer<Block, CBlock> + Send>
170 };
171
172 let fraud_proof_generator = FraudProofGenerator::new(
173 params.client.clone(),
174 params.consensus_client.clone(),
175 params.backend.clone(),
176 params.code_executor.clone(),
177 );
178
179 let domain_block_processor = DomainBlockProcessor {
180 domain_id: params.domain_id,
181 domain_created_at: params.domain_created_at,
182 client: params.client.clone(),
183 consensus_client: params.consensus_client.clone(),
184 backend: params.backend.clone(),
185 block_import: params.block_import,
186 import_notification_sinks: Default::default(),
187 domain_sync_oracle: params.domain_sync_oracle.clone(),
188 domain_executor: params.code_executor.clone(),
189 challenge_period: params.challenge_period,
190 };
191
192 let receipts_checker = ReceiptsChecker {
193 domain_id: params.domain_id,
194 client: params.client.clone(),
195 consensus_client: params.consensus_client.clone(),
196 fraud_proof_generator: fraud_proof_generator.clone(),
197 domain_sync_oracle: params.domain_sync_oracle,
198 consensus_offchain_tx_pool_factory: params.consensus_offchain_tx_pool_factory.clone(),
199 };
200
201 let bundle_processor = BundleProcessor::new(
202 params.domain_id,
203 params.consensus_client.clone(),
204 params.client.clone(),
205 params.backend.clone(),
206 receipts_checker,
207 domain_block_processor.clone(),
208 params.consensus_confirmation_depth_k,
209 );
210
211 let target_block_number = params
212 .consensus_chain_sync_params
213 .as_ref()
214 .map(|p| p.last_domain_block_er.consensus_block_number);
215
216 let sync_params = params
217 .consensus_chain_sync_params
218 .map(|consensus_sync_params| SyncParams {
219 domain_client: params.client.clone(),
220 domain_network_service_handle: params.domain_network_service_handle,
221 sync_service: params.sync_service,
222 domain_block_downloader: params.block_downloader.clone(),
223 consensus_chain_sync_params: consensus_sync_params,
224 domain_fork_id: params.domain_fork_id,
225 challenge_period: params.challenge_period,
226 });
227
228 if let Some(sync_params) = sync_params {
229 let domain_sync_task = {
230 async move {
231 let info = sync_params.domain_client.info();
232 if info.best_hash == info.genesis_hash {
236 info!(target: LOG_TARGET, "Starting domain snap sync...");
237
238 let result = snap_sync(sync_params).await;
239
240 match result {
241 Ok(_) => {
242 info!(target: LOG_TARGET, "Domain snap sync completed.");
243 }
244 Err(err) => {
245 error!(target: LOG_TARGET, %err, "Domain snap sync failed.");
246 info!(target: LOG_TARGET, "Wipe the DB and restart the application with --sync=full.");
247
248 return;
250 }
251 };
252 } else {
253 error!(target: LOG_TARGET, "Snap sync can only work with genesis state.");
254 info!(target: LOG_TARGET, "Wipe the DB and restart the application with --sync=full.");
255
256 return;
258 }
259
260 pending().await
262 }
263 };
264
265 spawn_essential.spawn_essential("domain-sync", None, Box::pin(domain_sync_task));
266 }
267
268 let start_worker_task = {
269 let consensus_client = params.consensus_client.clone();
270 let spawn_essential = spawn_essential.clone();
271 let bundle_processor = bundle_processor.clone();
272 async move {
273 if let Some(target_block_number) = target_block_number {
275 let block_importing_notification_stream =
277 &mut params.operator_streams.block_importing_notification_stream;
278
279 while let Some((block_number, mut acknowledgement_sender)) =
280 block_importing_notification_stream.next().await
281 {
282 trace!(%block_number, "Acknowledged block import from consensus chain.");
283 if acknowledgement_sender.send(()).await.is_err() {
284 error!("Can't acknowledge block import #{}", block_number);
285 return Err(());
286 }
287
288 if block_number >= target_block_number {
289 break;
290 }
291 }
292
293 let imported_block_notification_stream =
295 &mut params.operator_streams.imported_block_notification_stream;
296
297 while let Some(import_notification) =
298 imported_block_notification_stream.next().await
299 {
300 let block_number = *import_notification.header.number();
301 trace!(%block_number, "Block imported from consensus chain.");
302
303 if block_number >= target_block_number {
304 break;
305 }
306 }
307 }
308
309 crate::domain_worker::start_worker(
310 spawn_essential.clone(),
311 consensus_client,
312 params.consensus_offchain_tx_pool_factory.clone(),
313 params.maybe_operator_id,
314 bundle_producer,
315 bundle_processor.clone(),
316 params.operator_streams,
317 )
318 .await;
319
320 Ok(())
321 }
322 };
323
324 spawn_essential.spawn_essential_blocking(
325 "domain-operator-worker",
326 None,
327 Box::pin(start_worker_task.map(|_| ())),
328 );
329
330 Ok(Self {
331 consensus_client: params.consensus_client,
332 client: params.client,
333 transaction_pool: params.transaction_pool,
334 backend: params.backend,
335 fraud_proof_generator,
336 bundle_processor,
337 domain_block_processor,
338 keystore: params.keystore,
339 })
340 }
341
342 #[doc(hidden)]
346 pub async fn process_bundles(
347 self,
348 consensus_block_info: (CBlock::Hash, NumberFor<CBlock>, bool),
349 ) {
350 if let Err(err) = self
351 .bundle_processor
352 .process_bundles(consensus_block_info)
353 .await
354 {
355 tracing::error!(?consensus_block_info, ?err, "Error at processing bundles.");
356 }
357 }
358}