1use crate::BundleSender;
2use crate::bundle_producer_election_solver::BundleProducerElectionSolver;
3use crate::domain_bundle_proposer::DomainBundleProposer;
4use crate::utils::OperatorSlotInfo;
5use async_trait::async_trait;
6use parity_scale_codec::Decode;
7use sc_client_api::{AuxStore, BlockBackend};
8use sp_api::ProvideRuntimeApi;
9use sp_block_builder::BlockBuilder;
10use sp_blockchain::HeaderBackend;
11use sp_consensus_slots::Slot;
12use sp_domains::bundle::bundle_v0::{BundleHeaderV0, BundleV0, SealedBundleHeaderV0};
13use sp_domains::core_api::DomainCoreApi;
14use sp_domains::execution_receipt::{
15 ExecutionReceiptVersion, SealedSingletonReceipt, SingletonReceipt,
16};
17use sp_domains::{
18 BundleAndExecutionReceiptVersion, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId,
19 OperatorPublicKey, OperatorSignature, ProofOfElection,
20};
21use sp_keystore::KeystorePtr;
22use sp_messenger::MessengerApi;
23use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
24use sp_runtime::{RuntimeAppPublic, Saturating};
25use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
26use std::sync::Arc;
27use subspace_runtime_primitives::{Balance, BlockHashFor, ExtrinsicFor, HeaderFor};
28use tracing::info;
29
30pub enum BundleHeaderFor<Block: BlockT, CBlock: BlockT> {
32 V0(BundleHeaderV0<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>),
33}
34
35pub type BundleHeaderV0For<Block, CBlock> =
37 BundleHeaderV0<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
38
39type OpaqueBundle<Block, CBlock> = sp_domains::bundle::OpaqueBundle<
40 NumberFor<CBlock>,
41 BlockHashFor<CBlock>,
42 HeaderFor<Block>,
43 Balance,
44>;
45
46pub type SealedSingletonReceiptFor<Block, CBlock> =
47 SealedSingletonReceipt<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
48
49pub enum DomainProposal<Block: BlockT, CBlock: BlockT> {
50 Bundle(OpaqueBundle<Block, CBlock>),
51 Receipt(SealedSingletonReceiptFor<Block, CBlock>),
52}
53
54impl<Block: BlockT, CBlock: BlockT> DomainProposal<Block, CBlock> {
55 pub fn into_opaque_bundle(self) -> Option<OpaqueBundle<Block, CBlock>> {
56 match self {
57 DomainProposal::Bundle(b) => Some(b),
58 DomainProposal::Receipt(_) => None,
59 }
60 }
61}
62
63#[async_trait]
64pub trait BundleProducer<Block, CBlock>
65where
66 Block: BlockT,
67 CBlock: BlockT,
68{
69 async fn produce_bundle(
71 &mut self,
72 operator_id: OperatorId,
73 slot_info: OperatorSlotInfo,
74 ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>>;
75}
76
77pub struct DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
78where
79 Block: BlockT,
80 CBlock: BlockT,
81{
82 domain_id: DomainId,
83 consensus_client: Arc<CClient>,
84 client: Arc<Client>,
85 bundle_sender: Arc<BundleSender<Block, CBlock>>,
86 keystore: KeystorePtr,
87 bundle_producer_election_solver: BundleProducerElectionSolver<Block, CBlock, CClient>,
88 domain_bundle_proposer: DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>,
89}
90
91impl<Block, CBlock, Client, CClient, TransactionPool> Clone
92 for DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
93where
94 Block: BlockT,
95 CBlock: BlockT,
96{
97 fn clone(&self) -> Self {
98 Self {
99 domain_id: self.domain_id,
100 consensus_client: self.consensus_client.clone(),
101 client: self.client.clone(),
102 bundle_sender: self.bundle_sender.clone(),
103 keystore: self.keystore.clone(),
104 bundle_producer_election_solver: self.bundle_producer_election_solver.clone(),
105 domain_bundle_proposer: self.domain_bundle_proposer.clone(),
106 }
107 }
108}
109
110impl<Block, CBlock, Client, CClient, TransactionPool>
111 DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
112where
113 Block: BlockT,
114 CBlock: BlockT,
115 NumberFor<Block>: Into<NumberFor<CBlock>>,
116 NumberFor<CBlock>: Into<NumberFor<Block>>,
117 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
118 Client::Api: BlockBuilder<Block>
119 + DomainCoreApi<Block>
120 + TaggedTransactionQueue<Block>
121 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
122 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
123 CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
124 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
125{
126 pub fn new(
127 domain_id: DomainId,
128 consensus_client: Arc<CClient>,
129 client: Arc<Client>,
130 domain_bundle_proposer: DomainBundleProposer<
131 Block,
132 Client,
133 CBlock,
134 CClient,
135 TransactionPool,
136 >,
137 bundle_sender: Arc<BundleSender<Block, CBlock>>,
138 keystore: KeystorePtr,
139 ) -> Self {
140 let bundle_producer_election_solver = BundleProducerElectionSolver::<Block, CBlock, _>::new(
141 keystore.clone(),
142 consensus_client.clone(),
143 );
144 Self {
145 domain_id,
146 consensus_client,
147 client,
148 bundle_sender,
149 keystore,
150 bundle_producer_election_solver,
151 domain_bundle_proposer,
152 }
153 }
154
155 fn sign(
156 &self,
157 operator_signing_key: &OperatorPublicKey,
158 msg: &[u8],
159 ) -> sp_blockchain::Result<OperatorSignature> {
160 let signature = self
161 .keystore
162 .sr25519_sign(OperatorPublicKey::ID, operator_signing_key.as_ref(), msg)
163 .map_err(|error| {
164 sp_blockchain::Error::Application(Box::from(format!(
165 "Error occurred when signing the bundle: {error}"
166 )))
167 })?
168 .ok_or_else(|| {
169 sp_blockchain::Error::Application(Box::from(
170 "This should not happen as the existence of key was just checked",
171 ))
172 })?;
173
174 OperatorSignature::decode(&mut signature.as_ref()).map_err(|err| {
175 sp_blockchain::Error::Application(Box::from(format!(
176 "Failed to decode the signature of bundle: {err}"
177 )))
178 })
179 }
180
181 #[expect(clippy::type_complexity)]
182 fn claim_bundle_slot(
183 &self,
184 operator_id: OperatorId,
185 slot_info: &OperatorSlotInfo,
186 domain_best_number: NumberFor<Block>,
187 consensus_chain_best_hash: BlockHashFor<CBlock>,
188 ) -> sp_blockchain::Result<
189 Option<(
190 NumberFor<Block>,
191 NumberFor<Block>,
192 ProofOfElection,
193 OperatorPublicKey,
194 )>,
195 > {
196 let OperatorSlotInfo {
197 slot,
198 proof_of_time,
199 } = slot_info;
200
201 let domain_best_number_onchain = self
202 .consensus_client
203 .runtime_api()
204 .domain_best_number(consensus_chain_best_hash, self.domain_id)?
205 .ok_or_else(|| {
206 sp_blockchain::Error::Application(
207 format!(
208 "Failed to get the head domain number for domain {:?} at {:?}",
209 self.domain_id, consensus_chain_best_hash
210 )
211 .into(),
212 )
213 })?;
214 let head_receipt_number = self
215 .consensus_client
216 .runtime_api()
217 .head_receipt_number(consensus_chain_best_hash, self.domain_id)?;
218
219 let is_operator_lagging =
224 !domain_best_number.is_zero() && domain_best_number <= head_receipt_number;
225
226 if is_operator_lagging {
227 tracing::warn!(
228 ?domain_best_number,
229 "Skipping bundle production on slot {slot}"
230 );
231 return Ok(None);
232 }
233
234 if let Some((proof_of_election, operator_signing_key)) =
235 self.bundle_producer_election_solver.solve_challenge(
236 *slot,
237 consensus_chain_best_hash,
238 self.domain_id,
239 operator_id,
240 *proof_of_time,
241 )?
242 {
243 tracing::info!("📦 Claimed slot {slot}");
244
245 Ok(Some((
246 domain_best_number_onchain,
247 head_receipt_number,
248 proof_of_election,
249 operator_signing_key,
250 )))
251 } else {
252 Ok(None)
253 }
254 }
255
256 #[allow(clippy::too_many_arguments)]
257 fn prepare_receipt(
258 &self,
259 slot_info: &OperatorSlotInfo,
260 domain_best_number_onchain: NumberFor<Block>,
261 head_receipt_number: NumberFor<Block>,
262 proof_of_election: &ProofOfElection,
263 operator_signing_key: &OperatorPublicKey,
264 execution_receipt_version: ExecutionReceiptVersion,
265 ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
266 if domain_best_number_onchain.saturating_sub(head_receipt_number) > 1u32.into() {
269 info!(
270 ?domain_best_number_onchain,
271 ?head_receipt_number,
272 "🔖 Producing singleton receipt at slot {:?}",
273 slot_info.slot
274 );
275
276 let receipt = self.domain_bundle_proposer.load_next_receipt(
277 domain_best_number_onchain,
278 head_receipt_number,
279 execution_receipt_version,
280 )?;
281
282 let singleton_receipt = SingletonReceipt {
283 proof_of_election: proof_of_election.clone(),
284 receipt,
285 };
286
287 let signature = {
288 let to_sign: BlockHashFor<Block> = singleton_receipt.hash();
289 self.sign(operator_signing_key, to_sign.as_ref())?
290 };
291
292 let sealed_singleton_receipt = SealedSingletonReceipt {
293 singleton_receipt,
294 signature,
295 };
296
297 Ok(Some(DomainProposal::Receipt(sealed_singleton_receipt)))
298 } else {
299 Ok(None)
300 }
301 }
302
303 async fn prepare_bundle(
304 &mut self,
305 operator_id: OperatorId,
306 consensus_chain_best_hash: BlockHashFor<CBlock>,
307 domain_best_number_onchain: NumberFor<Block>,
308 head_receipt_number: NumberFor<Block>,
309 proof_of_election: ProofOfElection,
310 bundle_and_execution_receipt_version: BundleAndExecutionReceiptVersion,
311 ) -> sp_blockchain::Result<(BundleHeaderFor<Block, CBlock>, Vec<ExtrinsicFor<Block>>)> {
312 let tx_range = self
313 .consensus_client
314 .runtime_api()
315 .domain_tx_range(consensus_chain_best_hash, self.domain_id)
316 .map_err(|error| {
317 sp_blockchain::Error::Application(Box::from(format!(
318 "Error getting tx range: {error}"
319 )))
320 })?;
321
322 let receipt = self.domain_bundle_proposer.load_next_receipt(
323 domain_best_number_onchain,
324 head_receipt_number,
325 bundle_and_execution_receipt_version.execution_receipt_version,
326 )?;
327
328 let (bundle_header, extrinsics) = self
329 .domain_bundle_proposer
330 .propose_bundle_at(
331 proof_of_election.clone(),
332 tx_range,
333 operator_id,
334 receipt,
335 bundle_and_execution_receipt_version.bundle_version,
336 )
337 .await?;
338
339 Ok((bundle_header, extrinsics))
340 }
341
342 fn is_bundle_empty(
343 &self,
344 consensus_chain_best_hash: BlockHashFor<CBlock>,
345 extrinsics: &[ExtrinsicFor<Block>],
346 ) -> sp_blockchain::Result<bool> {
347 let is_empty = extrinsics.is_empty()
348 && !self
349 .consensus_client
350 .runtime_api()
351 .non_empty_er_exists(consensus_chain_best_hash, self.domain_id)?;
352
353 Ok(is_empty)
354 }
355
356 fn seal_bundle(
357 &self,
358 bundle_header: BundleHeaderFor<Block, CBlock>,
359 operator_signing_key: &OperatorPublicKey,
360 extrinsics: Vec<ExtrinsicFor<Block>>,
361 ) -> sp_blockchain::Result<DomainProposal<Block, CBlock>> {
362 let bundle = match bundle_header {
363 BundleHeaderFor::V0(header) => {
364 let to_sign = header.hash();
365 let signature = self.sign(operator_signing_key, to_sign.as_ref())?;
366 BundleV0 {
367 sealed_header: SealedBundleHeaderV0::new(header, signature),
368 extrinsics,
369 }
370 .into_opaque_bundle()
371 }
372 };
373
374 Ok(DomainProposal::Bundle(bundle))
380 }
381
382 fn current_bundle_and_execution_receipt_version(
384 &self,
385 consensus_chain_best_hash: CBlock::Hash,
386 ) -> sp_blockchain::Result<BundleAndExecutionReceiptVersion> {
387 let runtime_api = self.consensus_client.runtime_api();
388
389 runtime_api
390 .current_bundle_and_execution_receipt_version(consensus_chain_best_hash)
391 .map_err(sp_blockchain::Error::RuntimeApiError)
392 }
393}
394
395#[async_trait]
396impl<Block, CBlock, Client, CClient, TransactionPool> BundleProducer<Block, CBlock>
397 for DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
398where
399 Block: BlockT,
400 CBlock: BlockT,
401 NumberFor<Block>: Into<NumberFor<CBlock>>,
402 NumberFor<CBlock>: Into<NumberFor<Block>>,
403 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
404 Client::Api: BlockBuilder<Block>
405 + DomainCoreApi<Block>
406 + TaggedTransactionQueue<Block>
407 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
408 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
409 CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
410 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
411{
412 async fn produce_bundle(
413 &mut self,
414 operator_id: OperatorId,
415 slot_info: OperatorSlotInfo,
416 ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
417 let domain_best_number = self.client.info().best_number;
418 let consensus_chain_best_hash = self.consensus_client.info().best_hash;
419
420 let bundle_and_execution_receipt_version =
421 self.current_bundle_and_execution_receipt_version(consensus_chain_best_hash)?;
422
423 let Some((
424 domain_best_number_onchain,
425 head_receipt_number,
426 proof_of_election,
427 operator_signing_key,
428 )) = self.claim_bundle_slot(
429 operator_id,
430 &slot_info,
431 domain_best_number,
432 consensus_chain_best_hash,
433 )?
434 else {
435 return Ok(None);
436 };
437
438 if let Some(receipt) = self.prepare_receipt(
439 &slot_info,
440 domain_best_number_onchain,
441 head_receipt_number,
442 &proof_of_election,
443 &operator_signing_key,
444 bundle_and_execution_receipt_version.execution_receipt_version,
445 )? {
446 return Ok(Some(receipt));
447 }
448
449 let (bundle_header, extrinsics) = self
450 .prepare_bundle(
451 operator_id,
452 consensus_chain_best_hash,
453 domain_best_number_onchain,
454 head_receipt_number,
455 proof_of_election,
456 bundle_and_execution_receipt_version,
457 )
458 .await?;
459
460 if self.is_bundle_empty(consensus_chain_best_hash, &extrinsics)? {
463 tracing::warn!(
464 ?domain_best_number,
465 "Skipping empty bundle production on slot {}",
466 slot_info.slot,
467 );
468
469 return Ok(None);
470 }
471
472 info!("🔖 Producing bundle at slot {:?}", slot_info.slot);
473
474 let bundle = self.seal_bundle(bundle_header, &operator_signing_key, extrinsics)?;
475
476 Ok(Some(bundle))
477 }
478}
479
480pub fn uses_default_bundle_producer_params(
484 skip_empty_bundle_production: bool,
485 skip_out_of_order_slot: bool,
486) -> bool {
487 skip_empty_bundle_production && !skip_out_of_order_slot
488}
489
490pub struct TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
491where
492 Block: BlockT,
493 CBlock: BlockT,
494{
495 inner: DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>,
496 skip_empty_bundle_production: bool,
498 skip_out_of_order_slot: bool,
499 last_processed_slot: Option<Slot>,
500}
501
502impl<Block, CBlock, Client, CClient, TransactionPool> Clone
503 for TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
504where
505 Block: BlockT,
506 CBlock: BlockT,
507{
508 fn clone(&self) -> Self {
509 Self {
510 inner: self.inner.clone(),
511 skip_empty_bundle_production: self.skip_empty_bundle_production,
512 skip_out_of_order_slot: self.skip_out_of_order_slot,
513 last_processed_slot: None,
514 }
515 }
516}
517
518impl<Block, CBlock, Client, CClient, TransactionPool>
519 TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
520where
521 Block: BlockT,
522 CBlock: BlockT,
523 NumberFor<Block>: Into<NumberFor<CBlock>>,
524 NumberFor<CBlock>: Into<NumberFor<Block>>,
525 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
526 Client::Api: BlockBuilder<Block>
527 + DomainCoreApi<Block>
528 + TaggedTransactionQueue<Block>
529 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
530 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
531 CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
532 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
533{
534 #[expect(clippy::too_many_arguments)]
535 pub fn new(
536 domain_id: DomainId,
537 consensus_client: Arc<CClient>,
538 client: Arc<Client>,
539 domain_bundle_proposer: DomainBundleProposer<
540 Block,
541 Client,
542 CBlock,
543 CClient,
544 TransactionPool,
545 >,
546 bundle_sender: Arc<BundleSender<Block, CBlock>>,
547 keystore: KeystorePtr,
548 skip_empty_bundle_production: bool,
549 skip_out_of_order_slot: bool,
550 ) -> Self {
551 Self {
552 inner: DomainBundleProducer::new(
553 domain_id,
554 consensus_client,
555 client,
556 domain_bundle_proposer,
557 bundle_sender,
558 keystore,
559 ),
560 skip_empty_bundle_production,
561 skip_out_of_order_slot,
562 last_processed_slot: None,
563 }
564 }
565}
566
567#[async_trait]
568impl<Block, CBlock, Client, CClient, TransactionPool> BundleProducer<Block, CBlock>
569 for TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
570where
571 Block: BlockT,
572 CBlock: BlockT,
573 NumberFor<Block>: Into<NumberFor<CBlock>>,
574 NumberFor<CBlock>: Into<NumberFor<Block>>,
575 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
576 Client::Api: BlockBuilder<Block>
577 + DomainCoreApi<Block>
578 + TaggedTransactionQueue<Block>
579 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
580 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
581 CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
582 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
583{
584 async fn produce_bundle(
585 &mut self,
586 operator_id: OperatorId,
587 slot_info: OperatorSlotInfo,
588 ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
589 let domain_best_number = self.inner.client.info().best_number;
590 let consensus_chain_best_hash = self.inner.consensus_client.info().best_hash;
591
592 let bundle_and_execution_receipt_version = self
593 .inner
594 .current_bundle_and_execution_receipt_version(consensus_chain_best_hash)?;
595
596 let skip_out_of_order_slot = self.skip_out_of_order_slot
598 && self
599 .last_processed_slot
600 .map(|last_slot| last_slot >= slot_info.slot)
601 .unwrap_or(false);
602
603 if skip_out_of_order_slot {
604 tracing::warn!(
605 ?domain_best_number,
606 "Skipping out of order bundle production on slot {}",
607 slot_info.slot,
608 );
609 return Ok(None);
610 }
611
612 let Some((
613 domain_best_number_onchain,
614 head_receipt_number,
615 proof_of_election,
616 operator_signing_key,
617 )) = self.inner.claim_bundle_slot(
618 operator_id,
619 &slot_info,
620 domain_best_number,
621 consensus_chain_best_hash,
622 )?
623 else {
624 return Ok(None);
625 };
626
627 if let Some(receipt) = self.inner.prepare_receipt(
628 &slot_info,
629 domain_best_number_onchain,
630 head_receipt_number,
631 &proof_of_election,
632 &operator_signing_key,
633 bundle_and_execution_receipt_version.execution_receipt_version,
634 )? {
635 return Ok(Some(receipt));
636 }
637
638 let (bundle_header, extrinsics) = self
639 .inner
640 .prepare_bundle(
641 operator_id,
642 consensus_chain_best_hash,
643 domain_best_number_onchain,
644 head_receipt_number,
645 proof_of_election,
646 bundle_and_execution_receipt_version,
647 )
648 .await?;
649
650 if self.skip_empty_bundle_production
653 && self
654 .inner
655 .is_bundle_empty(consensus_chain_best_hash, &extrinsics)?
656 {
657 tracing::warn!(
658 ?domain_best_number,
659 "Skipping empty bundle production on slot {}",
660 slot_info.slot,
661 );
662
663 return Ok(None);
664 }
665
666 self.last_processed_slot.replace(slot_info.slot);
667
668 info!("🔖 Producing bundle at slot {:?}", slot_info.slot);
669
670 let bundle = self
671 .inner
672 .seal_bundle(bundle_header, &operator_signing_key, extrinsics)?;
673
674 Ok(Some(bundle))
675 }
676}