1mod slots;
4pub mod source;
5pub mod verifier;
6
7use crate::slots::SlotInfoProducer;
8use crate::source::{PotSlotInfo, PotSlotInfoStream};
9use sc_consensus_slots::{SimpleSlotWorker, SimpleSlotWorkerToSlotWorker, SlotWorker};
10use sp_api::ProvideRuntimeApi;
11use sp_blockchain::HeaderBackend;
12use sp_consensus::{SelectChain, SyncOracle};
13use sp_consensus_slots::{Slot, SlotDuration};
14use sp_consensus_subspace::SubspaceApi;
15use sp_inherents::CreateInherentDataProviders;
16use sp_runtime::traits::Block as BlockT;
17use std::sync::Arc;
18use subspace_core_primitives::PublicKey;
19use subspace_core_primitives::pot::PotCheckpoints;
20use tokio::sync::broadcast::error::RecvError;
21use tracing::{debug, error, info, trace};
22
23pub trait PotSlotWorker<Block>
24where
25 Block: BlockT,
26{
27 fn on_proof(&mut self, slot: Slot, checkpoints: PotCheckpoints);
31}
32
33pub async fn start_slot_worker<Block, Client, SC, Worker, SO, CIDP>(
38 slot_duration: SlotDuration,
39 client: Arc<Client>,
40 select_chain: SC,
41 worker: Worker,
42 sync_oracle: SO,
43 create_inherent_data_providers: CIDP,
44 mut slot_info_stream: PotSlotInfoStream,
45) where
46 Block: BlockT,
47 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
48 Client::Api: SubspaceApi<Block, PublicKey>,
49 SC: SelectChain<Block>,
50 Worker: PotSlotWorker<Block> + SimpleSlotWorker<Block> + Send + Sync,
51 SO: SyncOracle + Send,
52 CIDP: CreateInherentDataProviders<Block, ()> + Send + 'static,
53{
54 let best_hash = client.info().best_hash;
55 let runtime_api = client.runtime_api();
56 let block_authoring_delay = match runtime_api.chain_constants(best_hash) {
57 Ok(chain_constants) => chain_constants.block_authoring_delay(),
58 Err(error) => {
59 error!(%error, "Failed to retrieve chain constants from runtime API");
60 return;
61 }
62 };
63
64 let slot_info_producer = SlotInfoProducer::new(
65 slot_duration.as_duration(),
66 create_inherent_data_providers,
67 select_chain,
68 );
69
70 let mut worker = SimpleSlotWorkerToSlotWorker(worker);
71
72 let mut maybe_last_proven_slot = None;
73
74 loop {
75 let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
76 Ok(slot_info) => slot_info,
77 Err(err) => match err {
78 RecvError::Closed => {
79 info!("No Slot info senders available. Exiting slot worker.");
80 return;
81 }
82 RecvError::Lagged(skipped_notifications) => {
83 debug!(
84 "Slot worker is lagging. Skipped {} slot notification(s)",
85 skipped_notifications
86 );
87 continue;
88 }
89 },
90 };
91 if let Some(last_proven_slot) = maybe_last_proven_slot
92 && last_proven_slot >= slot
93 {
94 continue;
96 }
97 maybe_last_proven_slot.replace(slot);
98
99 worker.0.on_proof(slot, checkpoints);
100
101 if sync_oracle.is_major_syncing() {
102 debug!(%slot, "Skipping proposal slot due to sync");
103 continue;
104 }
105
106 let Some(slot_to_claim) = slot.checked_sub(*block_authoring_delay).map(Slot::from) else {
108 trace!("Skipping very early slot during chain start");
109 continue;
110 };
111
112 if let Some(slot_info) = slot_info_producer.produce_slot_info(slot_to_claim).await {
113 let _ = worker.on_slot(slot_info).await;
114 }
115 }
116}