1use crate::bundle_processor::BundleProcessor;
18use crate::domain_bundle_producer::{BundleProducer, DomainProposal};
19use crate::utils::{BlockInfo, OperatorSlotInfo};
20use crate::{NewSlotNotification, OperatorStreams};
21use futures::channel::mpsc;
22use futures::{SinkExt, Stream, StreamExt};
23use sc_client_api::{
24 AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider, Finalizer,
25 ProofProvider,
26};
27use sc_executor::RuntimeVersionOf;
28use sc_transaction_pool_api::OffchainTransactionPoolFactory;
29use sp_api::{ApiExt, ProvideRuntimeApi};
30use sp_block_builder::BlockBuilder;
31use sp_blockchain::{HeaderBackend, HeaderMetadata};
32use sp_core::H256;
33use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
34use sp_domains::core_api::DomainCoreApi;
35use sp_domains::{BundleProducerElectionApi, DomainsApi, OpaqueBundle, OperatorId};
36use sp_domains_fraud_proof::FraudProofApi;
37use sp_messenger::MessengerApi;
38use sp_mmr_primitives::MmrApi;
39use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
40use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
41use std::pin::{Pin, pin};
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use subspace_runtime_primitives::{Balance, BlockHashFor, HeaderFor};
45use tracing::{Instrument, info};
46
47pub type OpaqueBundleFor<Block, CBlock> =
48 OpaqueBundle<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
49
50#[allow(clippy::type_complexity, clippy::too_many_arguments)]
51pub(super) async fn start_worker<
52 Block,
53 CBlock,
54 Client,
55 CClient,
56 Backend,
57 IBNS,
58 CIBNS,
59 NSNS,
60 ASS,
61 E,
62>(
63 spawn_essential: Box<dyn SpawnEssentialNamed>,
64 consensus_client: Arc<CClient>,
65 consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
66 maybe_operator_id: Option<OperatorId>,
67 mut bundle_producer: Box<dyn BundleProducer<Block, CBlock> + Send>,
68 bundle_processor: BundleProcessor<Block, CBlock, Client, CClient, Backend, E>,
69 operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
70) where
71 Block: BlockT,
72 Block::Hash: Into<H256>,
73 CBlock: BlockT,
74 NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
75 CBlock::Hash: From<Block::Hash>,
76 Client: HeaderBackend<Block>
77 + BlockBackend<Block>
78 + AuxStore
79 + ProvideRuntimeApi<Block>
80 + ProofProvider<Block>
81 + Finalizer<Block, Backend>
82 + ExecutorProvider<Block>
83 + 'static,
84 Client::Api: DomainCoreApi<Block>
85 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
86 + BlockBuilder<Block>
87 + sp_api::ApiExt<Block>
88 + TaggedTransactionQueue<Block>,
89 CClient: HeaderBackend<CBlock>
90 + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
91 + BlockBackend<CBlock>
92 + ProofProvider<CBlock>
93 + ProvideRuntimeApi<CBlock>
94 + BlockchainEvents<CBlock>
95 + 'static,
96 CClient::Api: DomainsApi<CBlock, Block::Header>
97 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
98 + BundleProducerElectionApi<CBlock, Balance>
99 + FraudProofApi<CBlock, Block::Header>
100 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
101 Backend: sc_client_api::Backend<Block> + 'static,
102 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + 'static,
103 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + 'static,
104 NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
105 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
106 E: CodeExecutor + RuntimeVersionOf,
107{
108 let span = tracing::Span::current();
109
110 let OperatorStreams {
111 consensus_block_import_throttling_buffer_size,
112 block_importing_notification_stream,
113 imported_block_notification_stream,
114 new_slot_notification_stream,
115 acknowledgement_sender_stream,
116 _phantom,
117 } = operator_streams;
118
119 let mut throttled_block_import_notification_stream =
120 throttling_block_import_notifications::<Block, _, _, _, _>(
121 spawn_essential,
122 consensus_client.clone(),
123 Box::pin(block_importing_notification_stream),
124 Box::pin(imported_block_notification_stream),
125 consensus_block_import_throttling_buffer_size,
126 );
127
128 if let Some(operator_id) = maybe_operator_id {
129 info!("👷 Running as Operator[{operator_id}]...");
130 let mut latest_slot_notification_stream =
131 LatestItemStream::new(new_slot_notification_stream);
132 let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream);
133 loop {
134 tokio::select! {
135 biased;
138
139 Some((slot, proof_of_time)) = latest_slot_notification_stream.next() => {
140 let res = bundle_producer
141 .produce_bundle(
142 operator_id,
143 OperatorSlotInfo {
144 slot,
145 proof_of_time,
146 },
147 )
148 .instrument(span.clone())
149 .await;
150 match res {
151 Err(err) => {
152 tracing::error!(?slot, ?err, "Error at producing bundle.");
153 }
154 Ok(Some(domain_proposal)) => {
155 let best_hash = consensus_client.info().best_hash;
156 let mut runtime_api = consensus_client.runtime_api();
157 runtime_api.register_extension(consensus_offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
158
159 match domain_proposal {
160 DomainProposal::Bundle(opaque_bundle) => {
161 if let Err(err) = runtime_api.submit_bundle_unsigned(best_hash, opaque_bundle) {
162 tracing::error!(?slot, ?err, "Error at submitting bundle.");
163 }
164 },
165 DomainProposal::Receipt(singleton_receipt) => {
166 if let Err(err) = runtime_api.submit_receipt_unsigned(best_hash, singleton_receipt) {
167 tracing::error!(?slot, ?err, "Error at submitting receipt.");
168 }
169 },
170 }
171 }
172 Ok(None) => {}
173 }
174 }
175 Some(maybe_block_info) = throttled_block_import_notification_stream.next() => {
176 if let Some(block_info) = maybe_block_info
177 && let Err(error) = bundle_processor
178 .clone()
179 .process_bundles((
180 block_info.hash,
181 block_info.number,
182 block_info.is_new_best,
183 ))
184 .instrument(span.clone())
185 .await
186 {
187 tracing::error!(?error, "Failed to process consensus block");
188 break;
191 }
192 }
193 Some(mut acknowledgement_sender) = acknowledgement_sender_stream.next() => {
196 if let Err(err) = acknowledgement_sender.send(()).await {
197 tracing::error!(
198 ?err,
199 "Failed to send acknowledgement"
200 );
201 }
202 }
203 else => { break }
204 }
205 }
206 } else {
207 info!("🧑 Running as Full node...");
208 drop(new_slot_notification_stream);
209 drop(acknowledgement_sender_stream);
210 while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await {
211 if let Some(block_info) = maybe_block_info
212 && let Err(error) = bundle_processor
213 .clone()
214 .process_bundles((block_info.hash, block_info.number, block_info.is_new_best))
215 .instrument(span.clone())
216 .await
217 {
218 tracing::error!(?error, "Failed to process consensus block");
219 break;
222 }
223 }
224 }
225}
226
227#[allow(clippy::too_many_arguments)]
232fn throttling_block_import_notifications<Block, CBlock, CClient, BlocksImporting, BlocksImported>(
233 spawn_essential: Box<dyn SpawnEssentialNamed>,
234 consensus_client: Arc<CClient>,
235 mut blocks_importing: BlocksImporting,
236 mut blocks_imported: BlocksImported,
237 consensus_block_import_throttling_buffer_size: u32,
238) -> mpsc::Receiver<Option<BlockInfo<CBlock>>>
239where
240 Block: BlockT,
241 CBlock: BlockT,
242 CClient: HeaderBackend<CBlock>
243 + BlockBackend<CBlock>
244 + ProvideRuntimeApi<CBlock>
245 + BlockchainEvents<CBlock>
246 + 'static,
247 CClient::Api: DomainsApi<CBlock, Block::Header>,
248 BlocksImporting: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Unpin + Send + 'static,
249 BlocksImported: Stream<Item = BlockImportNotification<CBlock>> + Unpin + Send + 'static,
250{
251 let (mut block_info_sender, block_info_receiver) = mpsc::channel::<Option<BlockInfo<CBlock>>>(
256 consensus_block_import_throttling_buffer_size as usize,
257 );
258
259 spawn_essential.spawn_essential(
260 "consensus-block-import-throttler",
261 None,
262 Box::pin(async move {
263 loop {
264 tokio::select! {
265 biased;
272
273 maybe_block_imported = blocks_imported.next() => {
274 let block_imported = match maybe_block_imported {
275 Some(block_imported) => block_imported,
276 None => {
277 break;
279 }
280 };
281 let header = match consensus_client.header(block_imported.hash) {
282 Ok(Some(header)) => header,
283 res => {
284 tracing::error!(
285 result = ?res,
286 header = ?block_imported.header,
287 "Imported consensus block header not found",
288 );
289 return;
290 }
291 };
292 let block_info = BlockInfo {
293 hash: header.hash(),
294 number: *header.number(),
295 is_new_best: block_imported.is_new_best,
296 };
297 let _ = block_info_sender.feed(Some(block_info)).await;
298 }
299 maybe_block_importing = blocks_importing.next() => {
300 let (_block_number, mut acknowledgement_sender) =
302 match maybe_block_importing {
303 Some(block_importing) => block_importing,
304 None => {
305 break;
307 }
308 };
309 let _ = block_info_sender.feed(None).await;
311 let _ = acknowledgement_sender.send(()).await;
312 }
313 }
314 }
315 }),
316 );
317
318 block_info_receiver
319}
320
321struct LatestItemStream<S: Stream> {
322 inner: Pin<Box<S>>,
323}
324
325impl<S: Stream> LatestItemStream<S> {
326 fn new(stream: S) -> Self {
327 Self {
328 inner: Box::pin(stream),
329 }
330 }
331}
332
333impl<S> Stream for LatestItemStream<S>
334where
335 S: Stream,
336{
337 type Item = S::Item;
338
339 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
340 let mut last_item = None;
341 while let Poll::Ready(poll) = self.inner.as_mut().poll_next(cx) {
342 match poll {
343 Some(item) => {
344 last_item = Some(item);
345 }
346 None => {
347 return Poll::Ready(last_item);
348 }
349 }
350 }
351
352 if last_item.is_some() {
353 Poll::Ready(last_item)
354 } else {
355 Poll::Pending
356 }
357 }
358}