1use crate::bundle_processor::BundleProcessor;
18use crate::domain_bundle_producer::{BundleProducer, DomainProposal};
19use crate::utils::{BlockInfo, OperatorSlotInfo};
20use crate::{NewSlotNotification, OperatorStreams};
21use futures::channel::mpsc;
22use futures::{SinkExt, Stream, StreamExt};
23use sc_client_api::{
24 AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider, Finalizer,
25 ProofProvider,
26};
27use sc_executor::RuntimeVersionOf;
28use sc_transaction_pool_api::OffchainTransactionPoolFactory;
29use sp_api::{ApiExt, ProvideRuntimeApi};
30use sp_block_builder::BlockBuilder;
31use sp_blockchain::{HeaderBackend, HeaderMetadata};
32use sp_core::H256;
33use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
34use sp_domains::bundle::OpaqueBundle;
35use sp_domains::core_api::DomainCoreApi;
36use sp_domains::{BundleProducerElectionApi, DomainsApi, OperatorId};
37use sp_domains_fraud_proof::FraudProofApi;
38use sp_messenger::MessengerApi;
39use sp_mmr_primitives::MmrApi;
40use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
41use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
42use std::pin::{Pin, pin};
43use std::sync::Arc;
44use std::task::{Context, Poll};
45use subspace_runtime_primitives::{Balance, BlockHashFor, HeaderFor};
46use tracing::{Instrument, info};
47
48pub type OpaqueBundleFor<Block, CBlock> =
49 OpaqueBundle<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
50
51#[allow(clippy::type_complexity, clippy::too_many_arguments)]
52pub(super) async fn start_worker<
53 Block,
54 CBlock,
55 Client,
56 CClient,
57 Backend,
58 IBNS,
59 CIBNS,
60 NSNS,
61 ASS,
62 E,
63>(
64 spawn_essential: Box<dyn SpawnEssentialNamed>,
65 consensus_client: Arc<CClient>,
66 consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
67 maybe_operator_id: Option<OperatorId>,
68 mut bundle_producer: Box<dyn BundleProducer<Block, CBlock> + Send>,
69 bundle_processor: BundleProcessor<Block, CBlock, Client, CClient, Backend, E>,
70 operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
71) where
72 Block: BlockT,
73 Block::Hash: Into<H256>,
74 CBlock: BlockT,
75 NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
76 CBlock::Hash: From<Block::Hash>,
77 Client: HeaderBackend<Block>
78 + BlockBackend<Block>
79 + AuxStore
80 + ProvideRuntimeApi<Block>
81 + ProofProvider<Block>
82 + Finalizer<Block, Backend>
83 + ExecutorProvider<Block>
84 + 'static,
85 Client::Api: DomainCoreApi<Block>
86 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
87 + BlockBuilder<Block>
88 + sp_api::ApiExt<Block>
89 + TaggedTransactionQueue<Block>,
90 CClient: HeaderBackend<CBlock>
91 + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
92 + BlockBackend<CBlock>
93 + ProofProvider<CBlock>
94 + ProvideRuntimeApi<CBlock>
95 + BlockchainEvents<CBlock>
96 + 'static,
97 CClient::Api: DomainsApi<CBlock, Block::Header>
98 + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
99 + BundleProducerElectionApi<CBlock, Balance>
100 + FraudProofApi<CBlock, Block::Header>
101 + MmrApi<CBlock, H256, NumberFor<CBlock>>,
102 Backend: sc_client_api::Backend<Block> + 'static,
103 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + 'static,
104 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + 'static,
105 NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
106 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
107 E: CodeExecutor + RuntimeVersionOf,
108{
109 let span = tracing::Span::current();
110
111 let OperatorStreams {
112 consensus_block_import_throttling_buffer_size,
113 block_importing_notification_stream,
114 imported_block_notification_stream,
115 new_slot_notification_stream,
116 acknowledgement_sender_stream,
117 _phantom,
118 } = operator_streams;
119
120 let mut throttled_block_import_notification_stream =
121 throttling_block_import_notifications::<Block, _, _, _, _>(
122 spawn_essential,
123 consensus_client.clone(),
124 Box::pin(block_importing_notification_stream),
125 Box::pin(imported_block_notification_stream),
126 consensus_block_import_throttling_buffer_size,
127 );
128
129 if let Some(operator_id) = maybe_operator_id {
130 info!("👷 Running as Operator[{operator_id}]...");
131 let mut latest_slot_notification_stream =
132 LatestItemStream::new(new_slot_notification_stream);
133 let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream);
134 loop {
135 tokio::select! {
136 biased;
139
140 Some((slot, proof_of_time)) = latest_slot_notification_stream.next() => {
141 let res = bundle_producer
142 .produce_bundle(
143 operator_id,
144 OperatorSlotInfo {
145 slot,
146 proof_of_time,
147 },
148 )
149 .instrument(span.clone())
150 .await;
151 match res {
152 Err(err) => {
153 tracing::error!(?slot, ?err, "Error at producing bundle.");
154 }
155 Ok(Some(domain_proposal)) => {
156 let best_hash = consensus_client.info().best_hash;
157 let mut runtime_api = consensus_client.runtime_api();
158 runtime_api.register_extension(consensus_offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
159 match domain_proposal {
160 DomainProposal::Bundle(opaque_bundle) => {
161 if let Err(err) = runtime_api.submit_bundle_unsigned(best_hash, opaque_bundle) {
162 tracing::error!(?slot, ?err, "Error at submitting bundle.");
163 }
164 },
165 DomainProposal::Receipt(singleton_receipt) => {
166 if let Err(err) = runtime_api.submit_receipt_unsigned(best_hash, singleton_receipt) {
167 tracing::error!(?slot, ?err, "Error at submitting receipt.");
168 }
169 },
170 }
171 }
172 Ok(None) => {}
173 }
174 }
175 Some(maybe_block_info) = throttled_block_import_notification_stream.next() => {
176 if let Some(block_info) = maybe_block_info
177 && let Err(error) = bundle_processor
178 .clone()
179 .process_bundles((
180 block_info.hash,
181 block_info.number,
182 block_info.is_new_best,
183 ))
184 .instrument(span.clone())
185 .await
186 {
187 tracing::error!(?error, "Failed to process consensus block");
188 break;
191 }
192 }
193 Some(mut acknowledgement_sender) = acknowledgement_sender_stream.next() => {
196 if let Err(err) = acknowledgement_sender.send(()).await {
197 tracing::error!(
198 ?err,
199 "Failed to send acknowledgement"
200 );
201 }
202 }
203 else => { break }
204 }
205 }
206 } else {
207 info!("🧑 Running as Full node...");
208 drop(new_slot_notification_stream);
209 drop(acknowledgement_sender_stream);
210 while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await {
211 if let Some(block_info) = maybe_block_info
212 && let Err(error) = bundle_processor
213 .clone()
214 .process_bundles((block_info.hash, block_info.number, block_info.is_new_best))
215 .instrument(span.clone())
216 .await
217 {
218 tracing::error!(?error, "Failed to process consensus block");
219 break;
222 }
223 }
224 }
225}
226
227#[allow(clippy::too_many_arguments)]
232fn throttling_block_import_notifications<Block, CBlock, CClient, BlocksImporting, BlocksImported>(
233 spawn_essential: Box<dyn SpawnEssentialNamed>,
234 consensus_client: Arc<CClient>,
235 mut blocks_importing: BlocksImporting,
236 mut blocks_imported: BlocksImported,
237 consensus_block_import_throttling_buffer_size: u32,
238) -> mpsc::Receiver<Option<BlockInfo<CBlock>>>
239where
240 Block: BlockT,
241 CBlock: BlockT,
242 CClient: HeaderBackend<CBlock>
243 + BlockBackend<CBlock>
244 + ProvideRuntimeApi<CBlock>
245 + BlockchainEvents<CBlock>
246 + 'static,
247 CClient::Api: DomainsApi<CBlock, Block::Header>,
248 BlocksImporting: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Unpin + Send + 'static,
249 BlocksImported: Stream<Item = BlockImportNotification<CBlock>> + Unpin + Send + 'static,
250{
251 let (mut block_info_sender, block_info_receiver) = mpsc::channel::<Option<BlockInfo<CBlock>>>(
256 consensus_block_import_throttling_buffer_size as usize,
257 );
258
259 spawn_essential.spawn_essential(
260 "consensus-block-import-throttler",
261 None,
262 Box::pin(async move {
263 loop {
264 tokio::select! {
265 biased;
272
273 maybe_block_imported = blocks_imported.next() => {
274 let block_imported = match maybe_block_imported {
275 Some(block_imported) => block_imported,
276 None => {
277 break;
279 }
280 };
281 let header = match consensus_client.header(block_imported.hash) {
282 Ok(Some(header)) => header,
283 res => {
284 tracing::error!(
285 result = ?res,
286 header = ?block_imported.header,
287 "Imported consensus block header not found",
288 );
289 return;
290 }
291 };
292 let block_info = BlockInfo {
293 hash: header.hash(),
294 number: *header.number(),
295 is_new_best: block_imported.is_new_best,
296 };
297 let _ = block_info_sender.feed(Some(block_info)).await;
298 }
299 maybe_block_importing = blocks_importing.next() => {
300 let (_block_number, mut acknowledgement_sender) =
302 match maybe_block_importing {
303 Some(block_importing) => block_importing,
304 None => {
305 break;
307 }
308 };
309 let _ = block_info_sender.feed(None).await;
311 let _ = acknowledgement_sender.send(()).await;
312 }
313 }
314 }
315 }),
316 );
317
318 block_info_receiver
319}
320
321struct LatestItemStream<S: Stream> {
322 inner: Pin<Box<S>>,
323}
324
325impl<S: Stream> LatestItemStream<S> {
326 fn new(stream: S) -> Self {
327 Self {
328 inner: Box::pin(stream),
329 }
330 }
331}
332
333impl<S> Stream for LatestItemStream<S>
334where
335 S: Stream,
336{
337 type Item = S::Item;
338
339 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
340 let mut last_item = None;
341 while let Poll::Ready(poll) = self.inner.as_mut().poll_next(cx) {
342 match poll {
343 Some(item) => {
344 last_item = Some(item);
345 }
346 None => {
347 return Poll::Ready(last_item);
348 }
349 }
350 }
351
352 if last_item.is_some() {
353 Poll::Ready(last_item)
354 } else {
355 Poll::Pending
356 }
357 }
358}