domain_client_operator/
domain_bundle_producer.rs

1use crate::bundle_producer_election_solver::BundleProducerElectionSolver;
2use crate::domain_bundle_proposer::DomainBundleProposer;
3use crate::utils::OperatorSlotInfo;
4use crate::BundleSender;
5use async_trait::async_trait;
6use parity_scale_codec::Decode;
7use sc_client_api::{AuxStore, BlockBackend};
8use sp_api::ProvideRuntimeApi;
9use sp_block_builder::BlockBuilder;
10use sp_blockchain::HeaderBackend;
11use sp_consensus_slots::Slot;
12use sp_domains::core_api::DomainCoreApi;
13use sp_domains::{
14    Bundle, BundleHeader, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId,
15    OperatorPublicKey, OperatorSignature, ProofOfElection, SealedBundleHeader,
16    SealedSingletonReceipt, SingletonReceipt,
17};
18use sp_keystore::KeystorePtr;
19use sp_messenger::MessengerApi;
20use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
21use sp_runtime::{RuntimeAppPublic, Saturating};
22use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
23use std::sync::Arc;
24use subspace_runtime_primitives::{Balance, BlockHashFor, ExtrinsicFor, HeaderFor};
25use tracing::info;
26
27/// Type alias for bundle header.
28pub type BundleHeaderFor<Block, CBlock> =
29    BundleHeader<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
30
31type OpaqueBundle<Block, CBlock> =
32    sp_domains::OpaqueBundle<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
33
34type SealedSingletonReceiptFor<Block, CBlock> =
35    SealedSingletonReceipt<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
36
37pub enum DomainProposal<Block: BlockT, CBlock: BlockT> {
38    Bundle(OpaqueBundle<Block, CBlock>),
39    Receipt(SealedSingletonReceiptFor<Block, CBlock>),
40}
41
42impl<Block: BlockT, CBlock: BlockT> DomainProposal<Block, CBlock> {
43    pub fn into_opaque_bundle(self) -> Option<OpaqueBundle<Block, CBlock>> {
44        match self {
45            DomainProposal::Bundle(b) => Some(b),
46            DomainProposal::Receipt(_) => None,
47        }
48    }
49}
50
51#[async_trait]
52pub trait BundleProducer<Block, CBlock>
53where
54    Block: BlockT,
55    CBlock: BlockT,
56{
57    /// Produce a bundle for the given operator and slot.
58    async fn produce_bundle(
59        &mut self,
60        operator_id: OperatorId,
61        slot_info: OperatorSlotInfo,
62    ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>>;
63}
64
65pub struct DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
66where
67    Block: BlockT,
68    CBlock: BlockT,
69{
70    domain_id: DomainId,
71    consensus_client: Arc<CClient>,
72    client: Arc<Client>,
73    bundle_sender: Arc<BundleSender<Block, CBlock>>,
74    keystore: KeystorePtr,
75    bundle_producer_election_solver: BundleProducerElectionSolver<Block, CBlock, CClient>,
76    domain_bundle_proposer: DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>,
77}
78
79impl<Block, CBlock, Client, CClient, TransactionPool> Clone
80    for DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
81where
82    Block: BlockT,
83    CBlock: BlockT,
84{
85    fn clone(&self) -> Self {
86        Self {
87            domain_id: self.domain_id,
88            consensus_client: self.consensus_client.clone(),
89            client: self.client.clone(),
90            bundle_sender: self.bundle_sender.clone(),
91            keystore: self.keystore.clone(),
92            bundle_producer_election_solver: self.bundle_producer_election_solver.clone(),
93            domain_bundle_proposer: self.domain_bundle_proposer.clone(),
94        }
95    }
96}
97
98impl<Block, CBlock, Client, CClient, TransactionPool>
99    DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
100where
101    Block: BlockT,
102    CBlock: BlockT,
103    NumberFor<Block>: Into<NumberFor<CBlock>>,
104    NumberFor<CBlock>: Into<NumberFor<Block>>,
105    Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
106    Client::Api: BlockBuilder<Block>
107        + DomainCoreApi<Block>
108        + TaggedTransactionQueue<Block>
109        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
110    CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
111    CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
112    TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
113{
114    pub fn new(
115        domain_id: DomainId,
116        consensus_client: Arc<CClient>,
117        client: Arc<Client>,
118        domain_bundle_proposer: DomainBundleProposer<
119            Block,
120            Client,
121            CBlock,
122            CClient,
123            TransactionPool,
124        >,
125        bundle_sender: Arc<BundleSender<Block, CBlock>>,
126        keystore: KeystorePtr,
127    ) -> Self {
128        let bundle_producer_election_solver = BundleProducerElectionSolver::<Block, CBlock, _>::new(
129            keystore.clone(),
130            consensus_client.clone(),
131        );
132        Self {
133            domain_id,
134            consensus_client,
135            client,
136            bundle_sender,
137            keystore,
138            bundle_producer_election_solver,
139            domain_bundle_proposer,
140        }
141    }
142
143    fn sign(
144        &self,
145        operator_signing_key: &OperatorPublicKey,
146        msg: &[u8],
147    ) -> sp_blockchain::Result<OperatorSignature> {
148        let signature = self
149            .keystore
150            .sr25519_sign(OperatorPublicKey::ID, operator_signing_key.as_ref(), msg)
151            .map_err(|error| {
152                sp_blockchain::Error::Application(Box::from(format!(
153                    "Error occurred when signing the bundle: {error}"
154                )))
155            })?
156            .ok_or_else(|| {
157                sp_blockchain::Error::Application(Box::from(
158                    "This should not happen as the existence of key was just checked",
159                ))
160            })?;
161
162        OperatorSignature::decode(&mut signature.as_ref()).map_err(|err| {
163            sp_blockchain::Error::Application(Box::from(format!(
164                "Failed to decode the signature of bundle: {err}"
165            )))
166        })
167    }
168
169    #[expect(clippy::type_complexity)]
170    fn claim_bundle_slot(
171        &self,
172        operator_id: OperatorId,
173        slot_info: &OperatorSlotInfo,
174        domain_best_number: NumberFor<Block>,
175        consensus_chain_best_hash: BlockHashFor<CBlock>,
176    ) -> sp_blockchain::Result<
177        Option<(
178            NumberFor<Block>,
179            NumberFor<Block>,
180            ProofOfElection,
181            OperatorPublicKey,
182        )>,
183    > {
184        let OperatorSlotInfo {
185            slot,
186            proof_of_time,
187        } = slot_info;
188
189        let domain_best_number_onchain = self
190            .consensus_client
191            .runtime_api()
192            .domain_best_number(consensus_chain_best_hash, self.domain_id)?
193            .ok_or_else(|| {
194                sp_blockchain::Error::Application(
195                    format!(
196                        "Failed to get the head domain number for domain {:?} at {:?}",
197                        self.domain_id, consensus_chain_best_hash
198                    )
199                    .into(),
200                )
201            })?;
202        let head_receipt_number = self
203            .consensus_client
204            .runtime_api()
205            .head_receipt_number(consensus_chain_best_hash, self.domain_id)?;
206
207        // Operator is lagging behind the receipt chain on its parent chain as another operator
208        // already processed a block higher than the local best and submitted the receipt to
209        // the parent chain, we ought to catch up with the consensus block processing before
210        // producing new bundle.
211        let is_operator_lagging =
212            !domain_best_number.is_zero() && domain_best_number <= head_receipt_number;
213
214        if is_operator_lagging {
215            tracing::warn!(
216                ?domain_best_number,
217                "Skipping bundle production on slot {slot}"
218            );
219            return Ok(None);
220        }
221
222        if let Some((proof_of_election, operator_signing_key)) =
223            self.bundle_producer_election_solver.solve_challenge(
224                *slot,
225                consensus_chain_best_hash,
226                self.domain_id,
227                operator_id,
228                *proof_of_time,
229            )?
230        {
231            tracing::info!("📦 Claimed slot {slot}");
232
233            Ok(Some((
234                domain_best_number_onchain,
235                head_receipt_number,
236                proof_of_election,
237                operator_signing_key,
238            )))
239        } else {
240            Ok(None)
241        }
242    }
243
244    fn prepare_receipt(
245        &self,
246        slot_info: &OperatorSlotInfo,
247        domain_best_number_onchain: NumberFor<Block>,
248        head_receipt_number: NumberFor<Block>,
249        proof_of_election: &ProofOfElection,
250        operator_signing_key: &OperatorPublicKey,
251    ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
252        // When the receipt gap is greater than one, the operator needs to produce a receipt
253        // instead of a bundle
254        if domain_best_number_onchain.saturating_sub(head_receipt_number) > 1u32.into() {
255            info!(
256                ?domain_best_number_onchain,
257                ?head_receipt_number,
258                "🔖 Producing singleton receipt at slot {:?}",
259                slot_info.slot
260            );
261
262            let receipt = self
263                .domain_bundle_proposer
264                .load_next_receipt(domain_best_number_onchain, head_receipt_number)?;
265
266            let singleton_receipt = SingletonReceipt {
267                proof_of_election: proof_of_election.clone(),
268                receipt,
269            };
270
271            let signature = {
272                let to_sign: BlockHashFor<Block> = singleton_receipt.hash();
273                self.sign(operator_signing_key, to_sign.as_ref())?
274            };
275
276            let sealed_singleton_receipt: SealedSingletonReceiptFor<Block, CBlock> =
277                SealedSingletonReceipt {
278                    singleton_receipt,
279                    signature,
280                };
281
282            Ok(Some(DomainProposal::Receipt(sealed_singleton_receipt)))
283        } else {
284            Ok(None)
285        }
286    }
287
288    async fn prepare_bundle(
289        &mut self,
290        operator_id: OperatorId,
291        consensus_chain_best_hash: BlockHashFor<CBlock>,
292        domain_best_number_onchain: NumberFor<Block>,
293        head_receipt_number: NumberFor<Block>,
294        proof_of_election: ProofOfElection,
295    ) -> sp_blockchain::Result<(BundleHeaderFor<Block, CBlock>, Vec<ExtrinsicFor<Block>>)> {
296        let tx_range = self
297            .consensus_client
298            .runtime_api()
299            .domain_tx_range(consensus_chain_best_hash, self.domain_id)
300            .map_err(|error| {
301                sp_blockchain::Error::Application(Box::from(format!(
302                    "Error getting tx range: {error}"
303                )))
304            })?;
305
306        let receipt = self
307            .domain_bundle_proposer
308            .load_next_receipt(domain_best_number_onchain, head_receipt_number)?;
309
310        let (bundle_header, extrinsics) = self
311            .domain_bundle_proposer
312            .propose_bundle_at(proof_of_election.clone(), tx_range, operator_id, receipt)
313            .await?;
314
315        Ok((bundle_header, extrinsics))
316    }
317
318    fn is_bundle_empty(
319        &self,
320        consensus_chain_best_hash: BlockHashFor<CBlock>,
321        extrinsics: &[ExtrinsicFor<Block>],
322    ) -> sp_blockchain::Result<bool> {
323        let is_empty = extrinsics.is_empty()
324            && !self
325                .consensus_client
326                .runtime_api()
327                .non_empty_er_exists(consensus_chain_best_hash, self.domain_id)?;
328
329        Ok(is_empty)
330    }
331
332    fn seal_bundle(
333        &self,
334        bundle_header: BundleHeaderFor<Block, CBlock>,
335        operator_signing_key: &OperatorPublicKey,
336        extrinsics: Vec<ExtrinsicFor<Block>>,
337    ) -> sp_blockchain::Result<DomainProposal<Block, CBlock>> {
338        let signature = {
339            let to_sign = bundle_header.hash();
340            self.sign(operator_signing_key, to_sign.as_ref())?
341        };
342
343        let bundle = Bundle {
344            sealed_header: SealedBundleHeader::new(bundle_header, signature),
345            extrinsics,
346        };
347
348        // TODO: Re-enable the bundle gossip over X-Net when the compact bundle is supported.
349        // if let Err(e) = self.bundle_sender.unbounded_send(signed_bundle.clone()) {
350        // tracing::error!(error = ?e, "Failed to send transaction bundle");
351        // }
352
353        Ok(DomainProposal::Bundle(bundle.into_opaque_bundle()))
354    }
355}
356
357#[async_trait]
358impl<Block, CBlock, Client, CClient, TransactionPool> BundleProducer<Block, CBlock>
359    for DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
360where
361    Block: BlockT,
362    CBlock: BlockT,
363    NumberFor<Block>: Into<NumberFor<CBlock>>,
364    NumberFor<CBlock>: Into<NumberFor<Block>>,
365    Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
366    Client::Api: BlockBuilder<Block>
367        + DomainCoreApi<Block>
368        + TaggedTransactionQueue<Block>
369        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
370    CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
371    CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
372    TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
373{
374    async fn produce_bundle(
375        &mut self,
376        operator_id: OperatorId,
377        slot_info: OperatorSlotInfo,
378    ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
379        let domain_best_number = self.client.info().best_number;
380        let consensus_chain_best_hash = self.consensus_client.info().best_hash;
381
382        let Some((
383            domain_best_number_onchain,
384            head_receipt_number,
385            proof_of_election,
386            operator_signing_key,
387        )) = self.claim_bundle_slot(
388            operator_id,
389            &slot_info,
390            domain_best_number,
391            consensus_chain_best_hash,
392        )?
393        else {
394            return Ok(None);
395        };
396
397        if let Some(receipt) = self.prepare_receipt(
398            &slot_info,
399            domain_best_number_onchain,
400            head_receipt_number,
401            &proof_of_election,
402            &operator_signing_key,
403        )? {
404            return Ok(Some(receipt));
405        }
406
407        let (bundle_header, extrinsics) = self
408            .prepare_bundle(
409                operator_id,
410                consensus_chain_best_hash,
411                domain_best_number_onchain,
412                head_receipt_number,
413                proof_of_election,
414            )
415            .await?;
416
417        // if there are no extrinsics and no receipts to confirm, skip the bundle
418        // this is the default production behaviour
419        if self.is_bundle_empty(consensus_chain_best_hash, &extrinsics)? {
420            tracing::warn!(
421                ?domain_best_number,
422                "Skipping empty bundle production on slot {}",
423                slot_info.slot,
424            );
425
426            return Ok(None);
427        }
428
429        info!("🔖 Producing bundle at slot {:?}", slot_info.slot);
430
431        let bundle = self.seal_bundle(bundle_header, &operator_signing_key, extrinsics)?;
432
433        Ok(Some(bundle))
434    }
435}
436
437// TODO: only compile the test bundle producer in tests (ticket #3162)
438
439/// Returns true when passed the default parameters bundle producer parameters.
440pub fn uses_default_bundle_producer_params(
441    skip_empty_bundle_production: bool,
442    skip_out_of_order_slot: bool,
443) -> bool {
444    skip_empty_bundle_production && !skip_out_of_order_slot
445}
446
447pub struct TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
448where
449    Block: BlockT,
450    CBlock: BlockT,
451{
452    inner: DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>,
453    // Test-only parameters
454    skip_empty_bundle_production: bool,
455    skip_out_of_order_slot: bool,
456    last_processed_slot: Option<Slot>,
457}
458
459impl<Block, CBlock, Client, CClient, TransactionPool> Clone
460    for TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
461where
462    Block: BlockT,
463    CBlock: BlockT,
464{
465    fn clone(&self) -> Self {
466        Self {
467            inner: self.inner.clone(),
468            skip_empty_bundle_production: self.skip_empty_bundle_production,
469            skip_out_of_order_slot: self.skip_out_of_order_slot,
470            last_processed_slot: None,
471        }
472    }
473}
474
475impl<Block, CBlock, Client, CClient, TransactionPool>
476    TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
477where
478    Block: BlockT,
479    CBlock: BlockT,
480    NumberFor<Block>: Into<NumberFor<CBlock>>,
481    NumberFor<CBlock>: Into<NumberFor<Block>>,
482    Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
483    Client::Api: BlockBuilder<Block>
484        + DomainCoreApi<Block>
485        + TaggedTransactionQueue<Block>
486        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
487    CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
488    CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
489    TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
490{
491    #[expect(clippy::too_many_arguments)]
492    pub fn new(
493        domain_id: DomainId,
494        consensus_client: Arc<CClient>,
495        client: Arc<Client>,
496        domain_bundle_proposer: DomainBundleProposer<
497            Block,
498            Client,
499            CBlock,
500            CClient,
501            TransactionPool,
502        >,
503        bundle_sender: Arc<BundleSender<Block, CBlock>>,
504        keystore: KeystorePtr,
505        skip_empty_bundle_production: bool,
506        skip_out_of_order_slot: bool,
507    ) -> Self {
508        Self {
509            inner: DomainBundleProducer::new(
510                domain_id,
511                consensus_client,
512                client,
513                domain_bundle_proposer,
514                bundle_sender,
515                keystore,
516            ),
517            skip_empty_bundle_production,
518            skip_out_of_order_slot,
519            last_processed_slot: None,
520        }
521    }
522}
523
524#[async_trait]
525impl<Block, CBlock, Client, CClient, TransactionPool> BundleProducer<Block, CBlock>
526    for TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
527where
528    Block: BlockT,
529    CBlock: BlockT,
530    NumberFor<Block>: Into<NumberFor<CBlock>>,
531    NumberFor<CBlock>: Into<NumberFor<Block>>,
532    Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
533    Client::Api: BlockBuilder<Block>
534        + DomainCoreApi<Block>
535        + TaggedTransactionQueue<Block>
536        + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
537    CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
538    CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
539    TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
540{
541    async fn produce_bundle(
542        &mut self,
543        operator_id: OperatorId,
544        slot_info: OperatorSlotInfo,
545    ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
546        let domain_best_number = self.inner.client.info().best_number;
547        let consensus_chain_best_hash = self.inner.consensus_client.info().best_hash;
548
549        // Test-only behaviour: skip slot if configured to do so
550        let skip_out_of_order_slot = self.skip_out_of_order_slot
551            && self
552                .last_processed_slot
553                .map(|last_slot| last_slot >= slot_info.slot)
554                .unwrap_or(false);
555
556        if skip_out_of_order_slot {
557            tracing::warn!(
558                ?domain_best_number,
559                "Skipping out of order bundle production on slot {}",
560                slot_info.slot,
561            );
562            return Ok(None);
563        }
564
565        let Some((
566            domain_best_number_onchain,
567            head_receipt_number,
568            proof_of_election,
569            operator_signing_key,
570        )) = self.inner.claim_bundle_slot(
571            operator_id,
572            &slot_info,
573            domain_best_number,
574            consensus_chain_best_hash,
575        )?
576        else {
577            return Ok(None);
578        };
579
580        if let Some(receipt) = self.inner.prepare_receipt(
581            &slot_info,
582            domain_best_number_onchain,
583            head_receipt_number,
584            &proof_of_election,
585            &operator_signing_key,
586        )? {
587            return Ok(Some(receipt));
588        }
589
590        let (bundle_header, extrinsics) = self
591            .inner
592            .prepare_bundle(
593                operator_id,
594                consensus_chain_best_hash,
595                domain_best_number_onchain,
596                head_receipt_number,
597                proof_of_election,
598            )
599            .await?;
600
601        // if there are no extrinsics and no receipts to confirm, skip the bundle
602        // Test-only behaviour: if configured, *don't* skip empty bundles
603        if self.skip_empty_bundle_production
604            && self
605                .inner
606                .is_bundle_empty(consensus_chain_best_hash, &extrinsics)?
607        {
608            tracing::warn!(
609                ?domain_best_number,
610                "Skipping empty bundle production on slot {}",
611                slot_info.slot,
612            );
613
614            return Ok(None);
615        }
616
617        self.last_processed_slot.replace(slot_info.slot);
618
619        info!("🔖 Producing bundle at slot {:?}", slot_info.slot);
620
621        let bundle = self
622            .inner
623            .seal_bundle(bundle_header, &operator_signing_key, extrinsics)?;
624
625        Ok(Some(bundle))
626    }
627}