domain_client_operator/
domain_worker.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::bundle_processor::BundleProcessor;
18use crate::domain_bundle_producer::{BundleProducer, DomainProposal};
19use crate::utils::{BlockInfo, OperatorSlotInfo};
20use crate::{NewSlotNotification, OperatorStreams};
21use futures::channel::mpsc;
22use futures::{SinkExt, Stream, StreamExt};
23use sc_client_api::{
24    AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, ExecutorProvider, Finalizer,
25    ProofProvider,
26};
27use sc_executor::RuntimeVersionOf;
28use sc_transaction_pool_api::OffchainTransactionPoolFactory;
29use sp_api::{ApiExt, ProvideRuntimeApi};
30use sp_block_builder::BlockBuilder;
31use sp_blockchain::{HeaderBackend, HeaderMetadata};
32use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
33use sp_core::H256;
34use sp_domains::core_api::DomainCoreApi;
35use sp_domains::{BundleProducerElectionApi, DomainsApi, OpaqueBundle, OperatorId};
36use sp_domains_fraud_proof::FraudProofApi;
37use sp_messenger::MessengerApi;
38use sp_mmr_primitives::MmrApi;
39use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
40use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
41use std::pin::{pin, Pin};
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use subspace_runtime_primitives::{Balance, BlockHashFor, HeaderFor};
45use tracing::{info, Instrument};
46
47pub type OpaqueBundleFor<Block, CBlock> =
48    OpaqueBundle<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
49
50#[allow(clippy::type_complexity, clippy::too_many_arguments)]
51pub(super) async fn start_worker<
52    Block,
53    CBlock,
54    Client,
55    CClient,
56    Backend,
57    IBNS,
58    CIBNS,
59    NSNS,
60    ASS,
61    E,
62>(
63    spawn_essential: Box<dyn SpawnEssentialNamed>,
64    consensus_client: Arc<CClient>,
65    consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
66    maybe_operator_id: Option<OperatorId>,
67    mut bundle_producer: Box<dyn BundleProducer<Block, CBlock> + Send>,
68    bundle_processor: BundleProcessor<Block, CBlock, Client, CClient, Backend, E>,
69    operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
70) where
71    Block: BlockT,
72    Block::Hash: Into<H256>,
73    CBlock: BlockT,
74    NumberFor<CBlock>: From<NumberFor<Block>> + Into<NumberFor<Block>>,
75    CBlock::Hash: From<Block::Hash>,
76    Client: HeaderBackend<Block>
77        + BlockBackend<Block>
78        + AuxStore
79        + ProvideRuntimeApi<Block>
80        + ProofProvider<Block>
81        + Finalizer<Block, Backend>
82        + ExecutorProvider<Block>
83        + 'static,
84    Client::Api: DomainCoreApi<Block>
85        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>
86        + BlockBuilder<Block>
87        + sp_api::ApiExt<Block>
88        + TaggedTransactionQueue<Block>,
89    CClient: HeaderBackend<CBlock>
90        + HeaderMetadata<CBlock, Error = sp_blockchain::Error>
91        + BlockBackend<CBlock>
92        + ProofProvider<CBlock>
93        + ProvideRuntimeApi<CBlock>
94        + BlockchainEvents<CBlock>
95        + 'static,
96    CClient::Api: DomainsApi<CBlock, Block::Header>
97        + MessengerApi<CBlock, NumberFor<CBlock>, CBlock::Hash>
98        + BundleProducerElectionApi<CBlock, Balance>
99        + FraudProofApi<CBlock, Block::Header>
100        + MmrApi<CBlock, H256, NumberFor<CBlock>>,
101    Backend: sc_client_api::Backend<Block> + 'static,
102    IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + 'static,
103    CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + 'static,
104    NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
105    ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
106    E: CodeExecutor + RuntimeVersionOf,
107{
108    let span = tracing::Span::current();
109
110    let OperatorStreams {
111        consensus_block_import_throttling_buffer_size,
112        block_importing_notification_stream,
113        imported_block_notification_stream,
114        new_slot_notification_stream,
115        acknowledgement_sender_stream,
116        _phantom,
117    } = operator_streams;
118
119    let mut throttled_block_import_notification_stream =
120        throttling_block_import_notifications::<Block, _, _, _, _>(
121            spawn_essential,
122            consensus_client.clone(),
123            Box::pin(block_importing_notification_stream),
124            Box::pin(imported_block_notification_stream),
125            consensus_block_import_throttling_buffer_size,
126        );
127
128    if let Some(operator_id) = maybe_operator_id {
129        info!("👷 Running as Operator[{operator_id}]...");
130        let mut latest_slot_notification_stream =
131            LatestItemStream::new(new_slot_notification_stream);
132        let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream);
133        loop {
134            tokio::select! {
135                // Ensure any new slot/block import must handle first before the `acknowledgement_sender_stream`
136                // NOTE: this is only necessary for the test.
137                biased;
138
139                Some((slot, proof_of_time)) = latest_slot_notification_stream.next() => {
140                    let res = bundle_producer
141                        .produce_bundle(
142                            operator_id,
143                            OperatorSlotInfo {
144                                slot,
145                                proof_of_time,
146                            },
147                        )
148                        .instrument(span.clone())
149                        .await;
150                    match res {
151                        Err(err) => {
152                            tracing::error!(?slot, ?err, "Error at producing bundle.");
153                        }
154                        Ok(Some(domain_proposal)) => {
155                            let best_hash = consensus_client.info().best_hash;
156                            let mut runtime_api = consensus_client.runtime_api();
157                            runtime_api.register_extension(consensus_offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
158
159                            match domain_proposal {
160                                DomainProposal::Bundle(opaque_bundle) => {
161                                    if let Err(err) = runtime_api.submit_bundle_unsigned(best_hash, opaque_bundle) {
162                                        tracing::error!(?slot, ?err, "Error at submitting bundle.");
163                                    }
164                                },
165                                DomainProposal::Receipt(singleton_receipt) => {
166                                    if let Err(err) = runtime_api.submit_receipt_unsigned(best_hash, singleton_receipt) {
167                                        tracing::error!(?slot, ?err, "Error at submitting receipt.");
168                                    }
169                                },
170                            }
171                        }
172                        Ok(None) => {}
173                    }
174                }
175                Some(maybe_block_info) = throttled_block_import_notification_stream.next() => {
176                    if let Some(block_info) = maybe_block_info {
177                        if let Err(error) = bundle_processor
178                            .clone()
179                            .process_bundles((
180                                block_info.hash,
181                                block_info.number,
182                                block_info.is_new_best,
183                            ))
184                            .instrument(span.clone())
185                            .await
186                        {
187                            tracing::error!(?error, "Failed to process consensus block");
188                            // Bring down the service as bundles processor is an essential task.
189                            // TODO: more graceful shutdown.
190                            break;
191                        }
192                    }
193                }
194                // In production the `acknowledgement_sender_stream` is an empty stream, it only set to
195                // real stream in test
196                Some(mut acknowledgement_sender) = acknowledgement_sender_stream.next() => {
197                    if let Err(err) = acknowledgement_sender.send(()).await {
198                        tracing::error!(
199                            ?err,
200                            "Failed to send acknowledgement"
201                        );
202                    }
203                }
204                else => { break }
205            }
206        }
207    } else {
208        info!("🧑‍ Running as Full node...");
209        drop(new_slot_notification_stream);
210        drop(acknowledgement_sender_stream);
211        while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await {
212            if let Some(block_info) = maybe_block_info {
213                if let Err(error) = bundle_processor
214                    .clone()
215                    .process_bundles((block_info.hash, block_info.number, block_info.is_new_best))
216                    .instrument(span.clone())
217                    .await
218                {
219                    tracing::error!(?error, "Failed to process consensus block");
220                    // Bring down the service as bundles processor is an essential task.
221                    // TODO: more graceful shutdown.
222                    break;
223                }
224            }
225        }
226    }
227}
228
229/// Throttle the consensus block import notification based on the `consensus_block_import_throttling_buffer_size`
230/// to pause the consensus block import in case the consensus chain runs much faster than the domain.
231///
232/// Return the throttled block import notification stream
233#[allow(clippy::too_many_arguments)]
234fn throttling_block_import_notifications<Block, CBlock, CClient, BlocksImporting, BlocksImported>(
235    spawn_essential: Box<dyn SpawnEssentialNamed>,
236    consensus_client: Arc<CClient>,
237    mut blocks_importing: BlocksImporting,
238    mut blocks_imported: BlocksImported,
239    consensus_block_import_throttling_buffer_size: u32,
240) -> mpsc::Receiver<Option<BlockInfo<CBlock>>>
241where
242    Block: BlockT,
243    CBlock: BlockT,
244    CClient: HeaderBackend<CBlock>
245        + BlockBackend<CBlock>
246        + ProvideRuntimeApi<CBlock>
247        + BlockchainEvents<CBlock>
248        + 'static,
249    CClient::Api: DomainsApi<CBlock, Block::Header>,
250    BlocksImporting: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Unpin + Send + 'static,
251    BlocksImported: Stream<Item = BlockImportNotification<CBlock>> + Unpin + Send + 'static,
252{
253    // The consensus chain can be ahead of the domain by up to `block_import_throttling_buffer_size/2`
254    // blocks, for there are two notifications per block sent to this buffer (one will be actually
255    // consumed by the domain processor, the other from `sc-consensus-subspace` is used to discontinue
256    // the consensus block import in case the consensus chain runs much faster than the domain.).
257    let (mut block_info_sender, block_info_receiver) = mpsc::channel::<Option<BlockInfo<CBlock>>>(
258        consensus_block_import_throttling_buffer_size as usize,
259    );
260
261    spawn_essential.spawn_essential(
262        "consensus-block-import-throttler",
263        None,
264        Box::pin(async move {
265            loop {
266                tokio::select! {
267                    // Ensure the `blocks_imported` branch will be checked before the `blocks_importing` branch.
268                    // Currently this is only necessary for the test to ensure when both `block_imported` notification
269                    // and `blocks_importing` notification are arrived, the `block_imported` notification will be processed
270                    // first, such that we can ensure when the `blocks_importing` acknowledgement is responded, the
271                    // imported block must being processed by the executor.
272                    // Please see https://github.com/autonomys/subspace/pull/1363#discussion_r1162571291 for more details.
273                    biased;
274
275                    maybe_block_imported = blocks_imported.next() => {
276                        let block_imported = match maybe_block_imported {
277                            Some(block_imported) => block_imported,
278                            None => {
279                                // Can be None on graceful shutdown.
280                                break;
281                            }
282                        };
283                        let header = match consensus_client.header(block_imported.hash) {
284                            Ok(Some(header)) => header,
285                            res => {
286                                tracing::error!(
287                                    result = ?res,
288                                    header = ?block_imported.header,
289                                    "Imported consensus block header not found",
290                                );
291                                return;
292                            }
293                        };
294                        let block_info = BlockInfo {
295                            hash: header.hash(),
296                            number: *header.number(),
297                            is_new_best: block_imported.is_new_best,
298                        };
299                        let _ = block_info_sender.feed(Some(block_info)).await;
300                    }
301                    maybe_block_importing = blocks_importing.next() => {
302                        // TODO: remove the `block_number` from the notification since it is not used
303                        let (_block_number, mut acknowledgement_sender) =
304                            match maybe_block_importing {
305                                Some(block_importing) => block_importing,
306                                None => {
307                                    // Can be None on graceful shutdown.
308                                    break;
309                                }
310                            };
311                        // Pause the consensus block import when the sink is full.
312                        let _ = block_info_sender.feed(None).await;
313                        let _ = acknowledgement_sender.send(()).await;
314                    }
315                }
316            }
317        }),
318    );
319
320    block_info_receiver
321}
322
323struct LatestItemStream<S: Stream> {
324    inner: Pin<Box<S>>,
325}
326
327impl<S: Stream> LatestItemStream<S> {
328    fn new(stream: S) -> Self {
329        Self {
330            inner: Box::pin(stream),
331        }
332    }
333}
334
335impl<S> Stream for LatestItemStream<S>
336where
337    S: Stream,
338{
339    type Item = S::Item;
340
341    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
342        let mut last_item = None;
343        while let Poll::Ready(poll) = self.inner.as_mut().poll_next(cx) {
344            match poll {
345                Some(item) => {
346                    last_item = Some(item);
347                }
348                None => {
349                    return Poll::Ready(last_item);
350                }
351            }
352        }
353
354        if last_item.is_some() {
355            Poll::Ready(last_item)
356        } else {
357            Poll::Pending
358        }
359    }
360}