1use crate::bundle_processor::BundleProcessor;
18use crate::domain_bundle_producer::{BundleProducer, DomainProposal};
19use crate::utils::{BlockInfo, OperatorSlotInfo};
20use crate::{NewSlotNotification, OperatorStreams};
21use futures::channel::mpsc;
22use futures::{SinkExt, Stream, StreamExt};
23use sc_client_api::{
24 AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider, Finalizer,
25 ProofProvider,
26};
27use sc_executor::RuntimeVersionOf;
28use sc_transaction_pool_api::OffchainTransactionPoolFactory;
29use sp_api::{ApiExt, ProvideRuntimeApi};
30use sp_block_builder::BlockBuilder;
31use sp_blockchain::{HeaderBackend, HeaderMetadata};
32use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
33use sp_core::H256;
34use sp_domains::core_api::DomainCoreApi;
35use sp_domains::{BundleProducerElectionApi, DomainsApi, OpaqueBundle, OperatorId};
36use sp_domains_fraud_proof::FraudProofApi;
37use sp_messenger::MessengerApi;
38use sp_mmr_primitives::MmrApi;
39use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
40use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
41use std::pin::{pin, Pin};
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use subspace_runtime_primitives::{Balance, BlockHashFor, HeaderFor};
45use tracing::{info, Instrument};
46
47pub type OpaqueBundleFor<Block, CBlock> =
48 OpaqueBundle<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
49
50#[allow(clippy::type_complexity, clippy::too_many_arguments)]
51pub(super) async fn start_worker<
52 Block,
53 CBlock,
54 Client,
55 CClient,
56 Backend,
57 IBNS,
58 CIBNS,
59 NSNS,
60 ASS,
61 E,
62>(
63 spawn_essential: Box<dyn SpawnEssentialNamed>,
64 consensus_client: Arc<CClient>,
65 consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
66 maybe_operator_id: Option<OperatorId>,
67 mut bundle_producer: Box<dyn BundleProducer<Block, CBlock> + Send>,
68 bundle_processor: BundleProcessor<Block, CBlock, Client, CClient, Backend, E>,
69 operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
70) where
71 Block: BlockT,
72 Block::Hash: Into<H256>,
73 CBlock: BlockT,
74 NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
75 CBlock::Hash: From<Block::Hash>,
76 Client: HeaderBackend<Block>
77 + BlockBackend<Block>
78 + AuxStore
79 + ProvideRuntimeApi<Block>
80 + ProofProvider<Block>
81 + Finalizer<Block, Backend>
82 + ExecutorProvider<Block>
83 + 'static,
84 Client::Api: DomainCoreApi<Block>
85 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
86 + BlockBuilder<Block>
87 + sp_api::ApiExt<Block>
88 + TaggedTransactionQueue<Block>,
89 CClient: HeaderBackend<CBlock>
90 + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
91 + BlockBackend<CBlock>
92 + ProofProvider<CBlock>
93 + ProvideRuntimeApi<CBlock>
94 + BlockchainEvents<CBlock>
95 + 'static,
96 CClient::Api: DomainsApi<CBlock, Block::Header>
97 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
98 + BundleProducerElectionApi<CBlock, Balance>
99 + FraudProofApi<CBlock, Block::Header>
100 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
101 Backend: sc_client_api::Backend<Block> + 'static,
102 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + 'static,
103 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + 'static,
104 NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
105 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
106 E: CodeExecutor + RuntimeVersionOf,
107{
108 let span = tracing::Span::current();
109
110 let OperatorStreams {
111 consensus_block_import_throttling_buffer_size,
112 block_importing_notification_stream,
113 imported_block_notification_stream,
114 new_slot_notification_stream,
115 acknowledgement_sender_stream,
116 _phantom,
117 } = operator_streams;
118
119 let mut throttled_block_import_notification_stream =
120 throttling_block_import_notifications::<Block, _, _, _, _>(
121 spawn_essential,
122 consensus_client.clone(),
123 Box::pin(block_importing_notification_stream),
124 Box::pin(imported_block_notification_stream),
125 consensus_block_import_throttling_buffer_size,
126 );
127
128 if let Some(operator_id) = maybe_operator_id {
129 info!("👷 Running as Operator[{operator_id}]...");
130 let mut latest_slot_notification_stream =
131 LatestItemStream::new(new_slot_notification_stream);
132 let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream);
133 loop {
134 tokio::select! {
135 biased;
138
139 Some((slot, proof_of_time)) = latest_slot_notification_stream.next() => {
140 let res = bundle_producer
141 .produce_bundle(
142 operator_id,
143 OperatorSlotInfo {
144 slot,
145 proof_of_time,
146 },
147 )
148 .instrument(span.clone())
149 .await;
150 match res {
151 Err(err) => {
152 tracing::error!(?slot, ?err, "Error at producing bundle.");
153 }
154 Ok(Some(domain_proposal)) => {
155 let best_hash = consensus_client.info().best_hash;
156 let mut runtime_api = consensus_client.runtime_api();
157 runtime_api.register_extension(consensus_offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
158
159 match domain_proposal {
160 DomainProposal::Bundle(opaque_bundle) => {
161 if let Err(err) = runtime_api.submit_bundle_unsigned(best_hash, opaque_bundle) {
162 tracing::error!(?slot, ?err, "Error at submitting bundle.");
163 }
164 },
165 DomainProposal::Receipt(singleton_receipt) => {
166 if let Err(err) = runtime_api.submit_receipt_unsigned(best_hash, singleton_receipt) {
167 tracing::error!(?slot, ?err, "Error at submitting receipt.");
168 }
169 },
170 }
171 }
172 Ok(None) => {}
173 }
174 }
175 Some(maybe_block_info) = throttled_block_import_notification_stream.next() => {
176 if let Some(block_info) = maybe_block_info {
177 if let Err(error) = bundle_processor
178 .clone()
179 .process_bundles((
180 block_info.hash,
181 block_info.number,
182 block_info.is_new_best,
183 ))
184 .instrument(span.clone())
185 .await
186 {
187 tracing::error!(?error, "Failed to process consensus block");
188 break;
191 }
192 }
193 }
194 Some(mut acknowledgement_sender) = acknowledgement_sender_stream.next() => {
197 if let Err(err) = acknowledgement_sender.send(()).await {
198 tracing::error!(
199 ?err,
200 "Failed to send acknowledgement"
201 );
202 }
203 }
204 else => { break }
205 }
206 }
207 } else {
208 info!("🧑 Running as Full node...");
209 drop(new_slot_notification_stream);
210 drop(acknowledgement_sender_stream);
211 while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await {
212 if let Some(block_info) = maybe_block_info {
213 if let Err(error) = bundle_processor
214 .clone()
215 .process_bundles((block_info.hash, block_info.number, block_info.is_new_best))
216 .instrument(span.clone())
217 .await
218 {
219 tracing::error!(?error, "Failed to process consensus block");
220 break;
223 }
224 }
225 }
226 }
227}
228
229#[allow(clippy::too_many_arguments)]
234fn throttling_block_import_notifications<Block, CBlock, CClient, BlocksImporting, BlocksImported>(
235 spawn_essential: Box<dyn SpawnEssentialNamed>,
236 consensus_client: Arc<CClient>,
237 mut blocks_importing: BlocksImporting,
238 mut blocks_imported: BlocksImported,
239 consensus_block_import_throttling_buffer_size: u32,
240) -> mpsc::Receiver<Option<BlockInfo<CBlock>>>
241where
242 Block: BlockT,
243 CBlock: BlockT,
244 CClient: HeaderBackend<CBlock>
245 + BlockBackend<CBlock>
246 + ProvideRuntimeApi<CBlock>
247 + BlockchainEvents<CBlock>
248 + 'static,
249 CClient::Api: DomainsApi<CBlock, Block::Header>,
250 BlocksImporting: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Unpin + Send + 'static,
251 BlocksImported: Stream<Item = BlockImportNotification<CBlock>> + Unpin + Send + 'static,
252{
253 let (mut block_info_sender, block_info_receiver) = mpsc::channel::<Option<BlockInfo<CBlock>>>(
258 consensus_block_import_throttling_buffer_size as usize,
259 );
260
261 spawn_essential.spawn_essential(
262 "consensus-block-import-throttler",
263 None,
264 Box::pin(async move {
265 loop {
266 tokio::select! {
267 biased;
274
275 maybe_block_imported = blocks_imported.next() => {
276 let block_imported = match maybe_block_imported {
277 Some(block_imported) => block_imported,
278 None => {
279 break;
281 }
282 };
283 let header = match consensus_client.header(block_imported.hash) {
284 Ok(Some(header)) => header,
285 res => {
286 tracing::error!(
287 result = ?res,
288 header = ?block_imported.header,
289 "Imported consensus block header not found",
290 );
291 return;
292 }
293 };
294 let block_info = BlockInfo {
295 hash: header.hash(),
296 number: *header.number(),
297 is_new_best: block_imported.is_new_best,
298 };
299 let _ = block_info_sender.feed(Some(block_info)).await;
300 }
301 maybe_block_importing = blocks_importing.next() => {
302 let (_block_number, mut acknowledgement_sender) =
304 match maybe_block_importing {
305 Some(block_importing) => block_importing,
306 None => {
307 break;
309 }
310 };
311 let _ = block_info_sender.feed(None).await;
313 let _ = acknowledgement_sender.send(()).await;
314 }
315 }
316 }
317 }),
318 );
319
320 block_info_receiver
321}
322
323struct LatestItemStream<S: Stream> {
324 inner: Pin<Box<S>>,
325}
326
327impl<S: Stream> LatestItemStream<S> {
328 fn new(stream: S) -> Self {
329 Self {
330 inner: Box::pin(stream),
331 }
332 }
333}
334
335impl<S> Stream for LatestItemStream<S>
336where
337 S: Stream,
338{
339 type Item = S::Item;
340
341 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
342 let mut last_item = None;
343 while let Poll::Ready(poll) = self.inner.as_mut().poll_next(cx) {
344 match poll {
345 Some(item) => {
346 last_item = Some(item);
347 }
348 None => {
349 return Poll::Ready(last_item);
350 }
351 }
352 }
353
354 if last_item.is_some() {
355 Poll::Ready(last_item)
356 } else {
357 Poll::Pending
358 }
359 }
360}