1use crate::bundle_producer_election_solver::BundleProducerElectionSolver;
2use crate::domain_bundle_proposer::DomainBundleProposer;
3use crate::utils::OperatorSlotInfo;
4use crate::BundleSender;
5use async_trait::async_trait;
6use parity_scale_codec::Decode;
7use sc_client_api::{AuxStore, BlockBackend};
8use sp_api::ProvideRuntimeApi;
9use sp_block_builder::BlockBuilder;
10use sp_blockchain::HeaderBackend;
11use sp_consensus_slots::Slot;
12use sp_domains::core_api::DomainCoreApi;
13use sp_domains::{
14 Bundle, BundleHeader, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId,
15 OperatorPublicKey, OperatorSignature, ProofOfElection, SealedBundleHeader,
16 SealedSingletonReceipt, SingletonReceipt,
17};
18use sp_keystore::KeystorePtr;
19use sp_messenger::MessengerApi;
20use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
21use sp_runtime::{RuntimeAppPublic, Saturating};
22use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
23use std::sync::Arc;
24use subspace_runtime_primitives::{Balance, BlockHashFor, ExtrinsicFor, HeaderFor};
25use tracing::info;
26
27pub type BundleHeaderFor<Block, CBlock> =
29 BundleHeader<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
30
31type OpaqueBundle<Block, CBlock> =
32 sp_domains::OpaqueBundle<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
33
34type SealedSingletonReceiptFor<Block, CBlock> =
35 SealedSingletonReceipt<NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>;
36
37pub enum DomainProposal<Block: BlockT, CBlock: BlockT> {
38 Bundle(OpaqueBundle<Block, CBlock>),
39 Receipt(SealedSingletonReceiptFor<Block, CBlock>),
40}
41
42impl<Block: BlockT, CBlock: BlockT> DomainProposal<Block, CBlock> {
43 pub fn into_opaque_bundle(self) -> Option<OpaqueBundle<Block, CBlock>> {
44 match self {
45 DomainProposal::Bundle(b) => Some(b),
46 DomainProposal::Receipt(_) => None,
47 }
48 }
49}
50
51#[async_trait]
52pub trait BundleProducer<Block, CBlock>
53where
54 Block: BlockT,
55 CBlock: BlockT,
56{
57 async fn produce_bundle(
59 &mut self,
60 operator_id: OperatorId,
61 slot_info: OperatorSlotInfo,
62 ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>>;
63}
64
65pub struct DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
66where
67 Block: BlockT,
68 CBlock: BlockT,
69{
70 domain_id: DomainId,
71 consensus_client: Arc<CClient>,
72 client: Arc<Client>,
73 bundle_sender: Arc<BundleSender<Block, CBlock>>,
74 keystore: KeystorePtr,
75 bundle_producer_election_solver: BundleProducerElectionSolver<Block, CBlock, CClient>,
76 domain_bundle_proposer: DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>,
77}
78
79impl<Block, CBlock, Client, CClient, TransactionPool> Clone
80 for DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
81where
82 Block: BlockT,
83 CBlock: BlockT,
84{
85 fn clone(&self) -> Self {
86 Self {
87 domain_id: self.domain_id,
88 consensus_client: self.consensus_client.clone(),
89 client: self.client.clone(),
90 bundle_sender: self.bundle_sender.clone(),
91 keystore: self.keystore.clone(),
92 bundle_producer_election_solver: self.bundle_producer_election_solver.clone(),
93 domain_bundle_proposer: self.domain_bundle_proposer.clone(),
94 }
95 }
96}
97
98impl<Block, CBlock, Client, CClient, TransactionPool>
99 DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
100where
101 Block: BlockT,
102 CBlock: BlockT,
103 NumberFor<Block>: Into<NumberFor<CBlock>>,
104 NumberFor<CBlock>: Into<NumberFor<Block>>,
105 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
106 Client::Api: BlockBuilder<Block>
107 + DomainCoreApi<Block>
108 + TaggedTransactionQueue<Block>
109 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
110 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
111 CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
112 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
113{
114 pub fn new(
115 domain_id: DomainId,
116 consensus_client: Arc<CClient>,
117 client: Arc<Client>,
118 domain_bundle_proposer: DomainBundleProposer<
119 Block,
120 Client,
121 CBlock,
122 CClient,
123 TransactionPool,
124 >,
125 bundle_sender: Arc<BundleSender<Block, CBlock>>,
126 keystore: KeystorePtr,
127 ) -> Self {
128 let bundle_producer_election_solver = BundleProducerElectionSolver::<Block, CBlock, _>::new(
129 keystore.clone(),
130 consensus_client.clone(),
131 );
132 Self {
133 domain_id,
134 consensus_client,
135 client,
136 bundle_sender,
137 keystore,
138 bundle_producer_election_solver,
139 domain_bundle_proposer,
140 }
141 }
142
143 fn sign(
144 &self,
145 operator_signing_key: &OperatorPublicKey,
146 msg: &[u8],
147 ) -> sp_blockchain::Result<OperatorSignature> {
148 let signature = self
149 .keystore
150 .sr25519_sign(OperatorPublicKey::ID, operator_signing_key.as_ref(), msg)
151 .map_err(|error| {
152 sp_blockchain::Error::Application(Box::from(format!(
153 "Error occurred when signing the bundle: {error}"
154 )))
155 })?
156 .ok_or_else(|| {
157 sp_blockchain::Error::Application(Box::from(
158 "This should not happen as the existence of key was just checked",
159 ))
160 })?;
161
162 OperatorSignature::decode(&mut signature.as_ref()).map_err(|err| {
163 sp_blockchain::Error::Application(Box::from(format!(
164 "Failed to decode the signature of bundle: {err}"
165 )))
166 })
167 }
168
169 #[expect(clippy::type_complexity)]
170 fn claim_bundle_slot(
171 &self,
172 operator_id: OperatorId,
173 slot_info: &OperatorSlotInfo,
174 domain_best_number: NumberFor<Block>,
175 consensus_chain_best_hash: BlockHashFor<CBlock>,
176 ) -> sp_blockchain::Result<
177 Option<(
178 NumberFor<Block>,
179 NumberFor<Block>,
180 ProofOfElection,
181 OperatorPublicKey,
182 )>,
183 > {
184 let OperatorSlotInfo {
185 slot,
186 proof_of_time,
187 } = slot_info;
188
189 let domain_best_number_onchain = self
190 .consensus_client
191 .runtime_api()
192 .domain_best_number(consensus_chain_best_hash, self.domain_id)?
193 .ok_or_else(|| {
194 sp_blockchain::Error::Application(
195 format!(
196 "Failed to get the head domain number for domain {:?} at {:?}",
197 self.domain_id, consensus_chain_best_hash
198 )
199 .into(),
200 )
201 })?;
202 let head_receipt_number = self
203 .consensus_client
204 .runtime_api()
205 .head_receipt_number(consensus_chain_best_hash, self.domain_id)?;
206
207 let is_operator_lagging =
212 !domain_best_number.is_zero() && domain_best_number <= head_receipt_number;
213
214 if is_operator_lagging {
215 tracing::warn!(
216 ?domain_best_number,
217 "Skipping bundle production on slot {slot}"
218 );
219 return Ok(None);
220 }
221
222 if let Some((proof_of_election, operator_signing_key)) =
223 self.bundle_producer_election_solver.solve_challenge(
224 *slot,
225 consensus_chain_best_hash,
226 self.domain_id,
227 operator_id,
228 *proof_of_time,
229 )?
230 {
231 tracing::info!("📦 Claimed slot {slot}");
232
233 Ok(Some((
234 domain_best_number_onchain,
235 head_receipt_number,
236 proof_of_election,
237 operator_signing_key,
238 )))
239 } else {
240 Ok(None)
241 }
242 }
243
244 fn prepare_receipt(
245 &self,
246 slot_info: &OperatorSlotInfo,
247 domain_best_number_onchain: NumberFor<Block>,
248 head_receipt_number: NumberFor<Block>,
249 proof_of_election: &ProofOfElection,
250 operator_signing_key: &OperatorPublicKey,
251 ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
252 if domain_best_number_onchain.saturating_sub(head_receipt_number) > 1u32.into() {
255 info!(
256 ?domain_best_number_onchain,
257 ?head_receipt_number,
258 "🔖 Producing singleton receipt at slot {:?}",
259 slot_info.slot
260 );
261
262 let receipt = self
263 .domain_bundle_proposer
264 .load_next_receipt(domain_best_number_onchain, head_receipt_number)?;
265
266 let singleton_receipt = SingletonReceipt {
267 proof_of_election: proof_of_election.clone(),
268 receipt,
269 };
270
271 let signature = {
272 let to_sign: BlockHashFor<Block> = singleton_receipt.hash();
273 self.sign(operator_signing_key, to_sign.as_ref())?
274 };
275
276 let sealed_singleton_receipt: SealedSingletonReceiptFor<Block, CBlock> =
277 SealedSingletonReceipt {
278 singleton_receipt,
279 signature,
280 };
281
282 Ok(Some(DomainProposal::Receipt(sealed_singleton_receipt)))
283 } else {
284 Ok(None)
285 }
286 }
287
288 async fn prepare_bundle(
289 &mut self,
290 operator_id: OperatorId,
291 consensus_chain_best_hash: BlockHashFor<CBlock>,
292 domain_best_number_onchain: NumberFor<Block>,
293 head_receipt_number: NumberFor<Block>,
294 proof_of_election: ProofOfElection,
295 ) -> sp_blockchain::Result<(BundleHeaderFor<Block, CBlock>, Vec<ExtrinsicFor<Block>>)> {
296 let tx_range = self
297 .consensus_client
298 .runtime_api()
299 .domain_tx_range(consensus_chain_best_hash, self.domain_id)
300 .map_err(|error| {
301 sp_blockchain::Error::Application(Box::from(format!(
302 "Error getting tx range: {error}"
303 )))
304 })?;
305
306 let receipt = self
307 .domain_bundle_proposer
308 .load_next_receipt(domain_best_number_onchain, head_receipt_number)?;
309
310 let (bundle_header, extrinsics) = self
311 .domain_bundle_proposer
312 .propose_bundle_at(proof_of_election.clone(), tx_range, operator_id, receipt)
313 .await?;
314
315 Ok((bundle_header, extrinsics))
316 }
317
318 fn is_bundle_empty(
319 &self,
320 consensus_chain_best_hash: BlockHashFor<CBlock>,
321 extrinsics: &[ExtrinsicFor<Block>],
322 ) -> sp_blockchain::Result<bool> {
323 let is_empty = extrinsics.is_empty()
324 && !self
325 .consensus_client
326 .runtime_api()
327 .non_empty_er_exists(consensus_chain_best_hash, self.domain_id)?;
328
329 Ok(is_empty)
330 }
331
332 fn seal_bundle(
333 &self,
334 bundle_header: BundleHeaderFor<Block, CBlock>,
335 operator_signing_key: &OperatorPublicKey,
336 extrinsics: Vec<ExtrinsicFor<Block>>,
337 ) -> sp_blockchain::Result<DomainProposal<Block, CBlock>> {
338 let signature = {
339 let to_sign = bundle_header.hash();
340 self.sign(operator_signing_key, to_sign.as_ref())?
341 };
342
343 let bundle = Bundle {
344 sealed_header: SealedBundleHeader::new(bundle_header, signature),
345 extrinsics,
346 };
347
348 Ok(DomainProposal::Bundle(bundle.into_opaque_bundle()))
354 }
355}
356
357#[async_trait]
358impl<Block, CBlock, Client, CClient, TransactionPool> BundleProducer<Block, CBlock>
359 for DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
360where
361 Block: BlockT,
362 CBlock: BlockT,
363 NumberFor<Block>: Into<NumberFor<CBlock>>,
364 NumberFor<CBlock>: Into<NumberFor<Block>>,
365 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
366 Client::Api: BlockBuilder<Block>
367 + DomainCoreApi<Block>
368 + TaggedTransactionQueue<Block>
369 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
370 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
371 CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
372 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
373{
374 async fn produce_bundle(
375 &mut self,
376 operator_id: OperatorId,
377 slot_info: OperatorSlotInfo,
378 ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
379 let domain_best_number = self.client.info().best_number;
380 let consensus_chain_best_hash = self.consensus_client.info().best_hash;
381
382 let Some((
383 domain_best_number_onchain,
384 head_receipt_number,
385 proof_of_election,
386 operator_signing_key,
387 )) = self.claim_bundle_slot(
388 operator_id,
389 &slot_info,
390 domain_best_number,
391 consensus_chain_best_hash,
392 )?
393 else {
394 return Ok(None);
395 };
396
397 if let Some(receipt) = self.prepare_receipt(
398 &slot_info,
399 domain_best_number_onchain,
400 head_receipt_number,
401 &proof_of_election,
402 &operator_signing_key,
403 )? {
404 return Ok(Some(receipt));
405 }
406
407 let (bundle_header, extrinsics) = self
408 .prepare_bundle(
409 operator_id,
410 consensus_chain_best_hash,
411 domain_best_number_onchain,
412 head_receipt_number,
413 proof_of_election,
414 )
415 .await?;
416
417 if self.is_bundle_empty(consensus_chain_best_hash, &extrinsics)? {
420 tracing::warn!(
421 ?domain_best_number,
422 "Skipping empty bundle production on slot {}",
423 slot_info.slot,
424 );
425
426 return Ok(None);
427 }
428
429 info!("🔖 Producing bundle at slot {:?}", slot_info.slot);
430
431 let bundle = self.seal_bundle(bundle_header, &operator_signing_key, extrinsics)?;
432
433 Ok(Some(bundle))
434 }
435}
436
437pub fn uses_default_bundle_producer_params(
441 skip_empty_bundle_production: bool,
442 skip_out_of_order_slot: bool,
443) -> bool {
444 skip_empty_bundle_production && !skip_out_of_order_slot
445}
446
447pub struct TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
448where
449 Block: BlockT,
450 CBlock: BlockT,
451{
452 inner: DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>,
453 skip_empty_bundle_production: bool,
455 skip_out_of_order_slot: bool,
456 last_processed_slot: Option<Slot>,
457}
458
459impl<Block, CBlock, Client, CClient, TransactionPool> Clone
460 for TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
461where
462 Block: BlockT,
463 CBlock: BlockT,
464{
465 fn clone(&self) -> Self {
466 Self {
467 inner: self.inner.clone(),
468 skip_empty_bundle_production: self.skip_empty_bundle_production,
469 skip_out_of_order_slot: self.skip_out_of_order_slot,
470 last_processed_slot: None,
471 }
472 }
473}
474
475impl<Block, CBlock, Client, CClient, TransactionPool>
476 TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
477where
478 Block: BlockT,
479 CBlock: BlockT,
480 NumberFor<Block>: Into<NumberFor<CBlock>>,
481 NumberFor<CBlock>: Into<NumberFor<Block>>,
482 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
483 Client::Api: BlockBuilder<Block>
484 + DomainCoreApi<Block>
485 + TaggedTransactionQueue<Block>
486 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
487 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
488 CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
489 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
490{
491 #[expect(clippy::too_many_arguments)]
492 pub fn new(
493 domain_id: DomainId,
494 consensus_client: Arc<CClient>,
495 client: Arc<Client>,
496 domain_bundle_proposer: DomainBundleProposer<
497 Block,
498 Client,
499 CBlock,
500 CClient,
501 TransactionPool,
502 >,
503 bundle_sender: Arc<BundleSender<Block, CBlock>>,
504 keystore: KeystorePtr,
505 skip_empty_bundle_production: bool,
506 skip_out_of_order_slot: bool,
507 ) -> Self {
508 Self {
509 inner: DomainBundleProducer::new(
510 domain_id,
511 consensus_client,
512 client,
513 domain_bundle_proposer,
514 bundle_sender,
515 keystore,
516 ),
517 skip_empty_bundle_production,
518 skip_out_of_order_slot,
519 last_processed_slot: None,
520 }
521 }
522}
523
524#[async_trait]
525impl<Block, CBlock, Client, CClient, TransactionPool> BundleProducer<Block, CBlock>
526 for TestBundleProducer<Block, CBlock, Client, CClient, TransactionPool>
527where
528 Block: BlockT,
529 CBlock: BlockT,
530 NumberFor<Block>: Into<NumberFor<CBlock>>,
531 NumberFor<CBlock>: Into<NumberFor<Block>>,
532 Client: HeaderBackend<Block> + BlockBackend<Block> + AuxStore + ProvideRuntimeApi<Block>,
533 Client::Api: BlockBuilder<Block>
534 + DomainCoreApi<Block>
535 + TaggedTransactionQueue<Block>
536 + MessengerApi<Block, NumberFor<CBlock>, CBlock::Hash>,
537 CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
538 CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
539 TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block, Hash = Block::Hash>,
540{
541 async fn produce_bundle(
542 &mut self,
543 operator_id: OperatorId,
544 slot_info: OperatorSlotInfo,
545 ) -> sp_blockchain::Result<Option<DomainProposal<Block, CBlock>>> {
546 let domain_best_number = self.inner.client.info().best_number;
547 let consensus_chain_best_hash = self.inner.consensus_client.info().best_hash;
548
549 let skip_out_of_order_slot = self.skip_out_of_order_slot
551 && self
552 .last_processed_slot
553 .map(|last_slot| last_slot >= slot_info.slot)
554 .unwrap_or(false);
555
556 if skip_out_of_order_slot {
557 tracing::warn!(
558 ?domain_best_number,
559 "Skipping out of order bundle production on slot {}",
560 slot_info.slot,
561 );
562 return Ok(None);
563 }
564
565 let Some((
566 domain_best_number_onchain,
567 head_receipt_number,
568 proof_of_election,
569 operator_signing_key,
570 )) = self.inner.claim_bundle_slot(
571 operator_id,
572 &slot_info,
573 domain_best_number,
574 consensus_chain_best_hash,
575 )?
576 else {
577 return Ok(None);
578 };
579
580 if let Some(receipt) = self.inner.prepare_receipt(
581 &slot_info,
582 domain_best_number_onchain,
583 head_receipt_number,
584 &proof_of_election,
585 &operator_signing_key,
586 )? {
587 return Ok(Some(receipt));
588 }
589
590 let (bundle_header, extrinsics) = self
591 .inner
592 .prepare_bundle(
593 operator_id,
594 consensus_chain_best_hash,
595 domain_best_number_onchain,
596 head_receipt_number,
597 proof_of_election,
598 )
599 .await?;
600
601 if self.skip_empty_bundle_production
604 && self
605 .inner
606 .is_bundle_empty(consensus_chain_best_hash, &extrinsics)?
607 {
608 tracing::warn!(
609 ?domain_best_number,
610 "Skipping empty bundle production on slot {}",
611 slot_info.slot,
612 );
613
614 return Ok(None);
615 }
616
617 self.last_processed_slot.replace(slot_info.slot);
618
619 info!("🔖 Producing bundle at slot {:?}", slot_info.slot);
620
621 let bundle = self
622 .inner
623 .seal_bundle(bundle_header, &operator_signing_key, extrinsics)?;
624
625 Ok(Some(bundle))
626 }
627}