domain_client_operator/
domain_worker.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::bundle_processor::BundleProcessor;
18use crate::domain_bundle_producer::{BundleProducer, DomainProposal};
19use crate::utils::{BlockInfo, OperatorSlotInfo};
20use crate::{NewSlotNotification, OperatorStreams};
21use futures::channel::mpsc;
22use futures::{SinkExt, Stream, StreamExt};
23use sc_client_api::{
24    AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider, Finalizer,
25    ProofProvider,
26};
27use sc_executor::RuntimeVersionOf;
28use sc_transaction_pool_api::OffchainTransactionPoolFactory;
29use sp_api::{ApiExt, ProvideRuntimeApi};
30use sp_block_builder::BlockBuilder;
31use sp_blockchain::{HeaderBackend, HeaderMetadata};
32use sp_core::H256;
33use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
34use sp_domains::core_api::DomainCoreApi;
35use sp_domains::{BundleProducerElectionApi, DomainsApi, OpaqueBundle, OperatorId};
36use sp_domains_fraud_proof::FraudProofApi;
37use sp_messenger::MessengerApi;
38use sp_mmr_primitives::MmrApi;
39use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
40use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
41use std::pin::{Pin, pin};
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use subspace_runtime_primitives::{Balance, BlockHashFor, HeaderFor};
45use tracing::{Instrument, info};
46
47pub type OpaqueBundleFor<Block, CBlock> =
48    OpaqueBundle<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
49
50#[allow(clippy::type_complexity, clippy::too_many_arguments)]
51pub(super) async fn start_worker<
52    Block,
53    CBlock,
54    Client,
55    CClient,
56    Backend,
57    IBNS,
58    CIBNS,
59    NSNS,
60    ASS,
61    E,
62>(
63    spawn_essential: Box<dyn SpawnEssentialNamed>,
64    consensus_client: Arc<CClient>,
65    consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
66    maybe_operator_id: Option<OperatorId>,
67    mut bundle_producer: Box<dyn BundleProducer<Block, CBlock> + Send>,
68    bundle_processor: BundleProcessor<Block, CBlock, Client, CClient, Backend, E>,
69    operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
70) where
71    Block: BlockT,
72    Block::Hash: Into<H256>,
73    CBlock: BlockT,
74    NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
75    CBlock::Hash: From<Block::Hash>,
76    Client: HeaderBackend<Block>
77        + BlockBackend<Block>
78        + AuxStore
79        + ProvideRuntimeApi<Block>
80        + ProofProvider<Block>
81        + Finalizer<Block, Backend>
82        + ExecutorProvider<Block>
83        + 'static,
84    Client::Api: DomainCoreApi<Block>
85        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
86        + BlockBuilder<Block>
87        + sp_api::ApiExt<Block>
88        + TaggedTransactionQueue<Block>,
89    CClient: HeaderBackend<CBlock>
90        + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
91        + BlockBackend<CBlock>
92        + ProofProvider<CBlock>
93        + ProvideRuntimeApi<CBlock>
94        + BlockchainEvents<CBlock>
95        + 'static,
96    CClient::Api: DomainsApi<CBlock, Block::Header>
97        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
98        + BundleProducerElectionApi<CBlock, Balance>
99        + FraudProofApi<CBlock, Block::Header>
100        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
101    Backend: sc_client_api::Backend<Block> + 'static,
102    IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + 'static,
103    CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + 'static,
104    NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
105    ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
106    E: CodeExecutor + RuntimeVersionOf,
107{
108    let span = tracing::Span::current();
109
110    let OperatorStreams {
111        consensus_block_import_throttling_buffer_size,
112        block_importing_notification_stream,
113        imported_block_notification_stream,
114        new_slot_notification_stream,
115        acknowledgement_sender_stream,
116        _phantom,
117    } = operator_streams;
118
119    let mut throttled_block_import_notification_stream =
120        throttling_block_import_notifications::<Block, _, _, _, _>(
121            spawn_essential,
122            consensus_client.clone(),
123            Box::pin(block_importing_notification_stream),
124            Box::pin(imported_block_notification_stream),
125            consensus_block_import_throttling_buffer_size,
126        );
127
128    if let Some(operator_id) = maybe_operator_id {
129        info!("👷 Running as Operator[{operator_id}]...");
130        let mut latest_slot_notification_stream =
131            LatestItemStream::new(new_slot_notification_stream);
132        let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream);
133        loop {
134            tokio::select! {
135                // Ensure any new slot/block import must handle first before the `acknowledgement_sender_stream`
136                // NOTE: this is only necessary for the test.
137                biased;
138
139                Some((slot, proof_of_time)) = latest_slot_notification_stream.next() => {
140                    let res = bundle_producer
141                        .produce_bundle(
142                            operator_id,
143                            OperatorSlotInfo {
144                                slot,
145                                proof_of_time,
146                            },
147                        )
148                        .instrument(span.clone())
149                        .await;
150                    match res {
151                        Err(err) => {
152                            tracing::error!(?slot, ?err, "Error at producing bundle.");
153                        }
154                        Ok(Some(domain_proposal)) => {
155                            let best_hash = consensus_client.info().best_hash;
156                            let mut runtime_api = consensus_client.runtime_api();
157                            runtime_api.register_extension(consensus_offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
158
159                            match domain_proposal {
160                                DomainProposal::Bundle(opaque_bundle) => {
161                                    if let Err(err) = runtime_api.submit_bundle_unsigned(best_hash, opaque_bundle) {
162                                        tracing::error!(?slot, ?err, "Error at submitting bundle.");
163                                    }
164                                },
165                                DomainProposal::Receipt(singleton_receipt) => {
166                                    if let Err(err) = runtime_api.submit_receipt_unsigned(best_hash, singleton_receipt) {
167                                        tracing::error!(?slot, ?err, "Error at submitting receipt.");
168                                    }
169                                },
170                            }
171                        }
172                        Ok(None) => {}
173                    }
174                }
175                Some(maybe_block_info) = throttled_block_import_notification_stream.next() => {
176                    if let Some(block_info) = maybe_block_info
177                        && let Err(error) = bundle_processor
178                            .clone()
179                            .process_bundles((
180                                block_info.hash,
181                                block_info.number,
182                                block_info.is_new_best,
183                            ))
184                            .instrument(span.clone())
185                            .await
186                        {
187                            tracing::error!(?error, "Failed to process consensus block");
188                            // Bring down the service as bundles processor is an essential task.
189                            // TODO: more graceful shutdown.
190                            break;
191                        }
192                }
193                // In production the `acknowledgement_sender_stream` is an empty stream, it only set to
194                // real stream in test
195                Some(mut acknowledgement_sender) = acknowledgement_sender_stream.next() => {
196                    if let Err(err) = acknowledgement_sender.send(()).await {
197                        tracing::error!(
198                            ?err,
199                            "Failed to send acknowledgement"
200                        );
201                    }
202                }
203                else => { break }
204            }
205        }
206    } else {
207        info!("🧑‍ Running as Full node...");
208        drop(new_slot_notification_stream);
209        drop(acknowledgement_sender_stream);
210        while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await {
211            if let Some(block_info) = maybe_block_info
212                && let Err(error) = bundle_processor
213                    .clone()
214                    .process_bundles((block_info.hash, block_info.number, block_info.is_new_best))
215                    .instrument(span.clone())
216                    .await
217            {
218                tracing::error!(?error, "Failed to process consensus block");
219                // Bring down the service as bundles processor is an essential task.
220                // TODO: more graceful shutdown.
221                break;
222            }
223        }
224    }
225}
226
227/// Throttle the consensus block import notification based on the `consensus_block_import_throttling_buffer_size`
228/// to pause the consensus block import in case the consensus chain runs much faster than the domain.
229///
230/// Return the throttled block import notification stream
231#[allow(clippy::too_many_arguments)]
232fn throttling_block_import_notifications<Block, CBlock, CClient, BlocksImporting, BlocksImported>(
233    spawn_essential: Box<dyn SpawnEssentialNamed>,
234    consensus_client: Arc<CClient>,
235    mut blocks_importing: BlocksImporting,
236    mut blocks_imported: BlocksImported,
237    consensus_block_import_throttling_buffer_size: u32,
238) -> mpsc::Receiver<Option<BlockInfo<CBlock>>>
239where
240    Block: BlockT,
241    CBlock: BlockT,
242    CClient: HeaderBackend<CBlock>
243        + BlockBackend<CBlock>
244        + ProvideRuntimeApi<CBlock>
245        + BlockchainEvents<CBlock>
246        + 'static,
247    CClient::Api: DomainsApi<CBlock, Block::Header>,
248    BlocksImporting: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Unpin + Send + 'static,
249    BlocksImported: Stream<Item = BlockImportNotification<CBlock>> + Unpin + Send + 'static,
250{
251    // The consensus chain can be ahead of the domain by up to `block_import_throttling_buffer_size/2`
252    // blocks, for there are two notifications per block sent to this buffer (one will be actually
253    // consumed by the domain processor, the other from `sc-consensus-subspace` is used to discontinue
254    // the consensus block import in case the consensus chain runs much faster than the domain.).
255    let (mut block_info_sender, block_info_receiver) = mpsc::channel::<Option<BlockInfo<CBlock>>>(
256        consensus_block_import_throttling_buffer_size as usize,
257    );
258
259    spawn_essential.spawn_essential(
260        "consensus-block-import-throttler",
261        None,
262        Box::pin(async move {
263            loop {
264                tokio::select! {
265                    // Ensure the `blocks_imported` branch will be checked before the `blocks_importing` branch.
266                    // Currently this is only necessary for the test to ensure when both `block_imported` notification
267                    // and `blocks_importing` notification are arrived, the `block_imported` notification will be processed
268                    // first, such that we can ensure when the `blocks_importing` acknowledgement is responded, the
269                    // imported block must being processed by the executor.
270                    // Please see https://github.com/autonomys/subspace/pull/1363#discussion_r1162571291 for more details.
271                    biased;
272
273                    maybe_block_imported = blocks_imported.next() => {
274                        let block_imported = match maybe_block_imported {
275                            Some(block_imported) => block_imported,
276                            None => {
277                                // Can be None on graceful shutdown.
278                                break;
279                            }
280                        };
281                        let header = match consensus_client.header(block_imported.hash) {
282                            Ok(Some(header)) => header,
283                            res => {
284                                tracing::error!(
285                                    result = ?res,
286                                    header = ?block_imported.header,
287                                    "Imported consensus block header not found",
288                                );
289                                return;
290                            }
291                        };
292                        let block_info = BlockInfo {
293                            hash: header.hash(),
294                            number: *header.number(),
295                            is_new_best: block_imported.is_new_best,
296                        };
297                        let _ = block_info_sender.feed(Some(block_info)).await;
298                    }
299                    maybe_block_importing = blocks_importing.next() => {
300                        // TODO: remove the `block_number` from the notification since it is not used
301                        let (_block_number, mut acknowledgement_sender) =
302                            match maybe_block_importing {
303                                Some(block_importing) => block_importing,
304                                None => {
305                                    // Can be None on graceful shutdown.
306                                    break;
307                                }
308                            };
309                        // Pause the consensus block import when the sink is full.
310                        let _ = block_info_sender.feed(None).await;
311                        let _ = acknowledgement_sender.send(()).await;
312                    }
313                }
314            }
315        }),
316    );
317
318    block_info_receiver
319}
320
321struct LatestItemStream<S: Stream> {
322    inner: Pin<Box<S>>,
323}
324
325impl<S: Stream> LatestItemStream<S> {
326    fn new(stream: S) -> Self {
327        Self {
328            inner: Box::pin(stream),
329        }
330    }
331}
332
333impl<S> Stream for LatestItemStream<S>
334where
335    S: Stream,
336{
337    type Item = S::Item;
338
339    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
340        let mut last_item = None;
341        while let Poll::Ready(poll) = self.inner.as_mut().poll_next(cx) {
342            match poll {
343                Some(item) => {
344                    last_item = Some(item);
345                }
346                None => {
347                    return Poll::Ready(last_item);
348                }
349            }
350        }
351
352        if last_item.is_some() {
353            Poll::Ready(last_item)
354        } else {
355            Poll::Pending
356        }
357    }
358}