1#![feature(let_chains)]
4
5mod slots;
6pub mod source;
7pub mod verifier;
8
9use crate::slots::SlotInfoProducer;
10use crate::source::{PotSlotInfo, PotSlotInfoStream};
11use sc_consensus_slots::{SimpleSlotWorker, SimpleSlotWorkerToSlotWorker, SlotWorker};
12use sp_api::ProvideRuntimeApi;
13use sp_blockchain::HeaderBackend;
14use sp_consensus::{SelectChain, SyncOracle};
15use sp_consensus_slots::{Slot, SlotDuration};
16use sp_consensus_subspace::SubspaceApi;
17use sp_inherents::CreateInherentDataProviders;
18use sp_runtime::traits::Block as BlockT;
19use std::sync::Arc;
20use subspace_core_primitives::pot::PotCheckpoints;
21use subspace_core_primitives::PublicKey;
22use tokio::sync::broadcast::error::RecvError;
23use tracing::{debug, error, info, trace};
24
25pub trait PotSlotWorker<Block>
26where
27 Block: BlockT,
28{
29 fn on_proof(&mut self, slot: Slot, checkpoints: PotCheckpoints);
33}
34
35pub async fn start_slot_worker<Block, Client, SC, Worker, SO, CIDP>(
40 slot_duration: SlotDuration,
41 client: Arc<Client>,
42 select_chain: SC,
43 worker: Worker,
44 sync_oracle: SO,
45 create_inherent_data_providers: CIDP,
46 mut slot_info_stream: PotSlotInfoStream,
47) where
48 Block: BlockT,
49 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
50 Client::Api: SubspaceApi<Block, PublicKey>,
51 SC: SelectChain<Block>,
52 Worker: PotSlotWorker<Block> + SimpleSlotWorker<Block> + Send + Sync,
53 SO: SyncOracle + Send,
54 CIDP: CreateInherentDataProviders<Block, ()> + Send + 'static,
55{
56 let best_hash = client.info().best_hash;
57 let runtime_api = client.runtime_api();
58 let block_authoring_delay = match runtime_api.chain_constants(best_hash) {
59 Ok(chain_constants) => chain_constants.block_authoring_delay(),
60 Err(error) => {
61 error!(%error, "Failed to retrieve chain constants from runtime API");
62 return;
63 }
64 };
65
66 let slot_info_producer = SlotInfoProducer::new(
67 slot_duration.as_duration(),
68 create_inherent_data_providers,
69 select_chain,
70 );
71
72 let mut worker = SimpleSlotWorkerToSlotWorker(worker);
73
74 let mut maybe_last_proven_slot = None;
75
76 loop {
77 let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
78 Ok(slot_info) => slot_info,
79 Err(err) => match err {
80 RecvError::Closed => {
81 info!("No Slot info senders available. Exiting slot worker.");
82 return;
83 }
84 RecvError::Lagged(skipped_notifications) => {
85 debug!(
86 "Slot worker is lagging. Skipped {} slot notification(s)",
87 skipped_notifications
88 );
89 continue;
90 }
91 },
92 };
93 if let Some(last_proven_slot) = maybe_last_proven_slot {
94 if last_proven_slot >= slot {
95 continue;
97 }
98 }
99 maybe_last_proven_slot.replace(slot);
100
101 worker.0.on_proof(slot, checkpoints);
102
103 if sync_oracle.is_major_syncing() {
104 debug!(%slot, "Skipping proposal slot due to sync");
105 continue;
106 }
107
108 let Some(slot_to_claim) = slot.checked_sub(*block_authoring_delay).map(Slot::from) else {
110 trace!("Skipping very early slot during chain start");
111 continue;
112 };
113
114 if let Some(slot_info) = slot_info_producer.produce_slot_info(slot_to_claim).await {
115 let _ = worker.on_slot(slot_info).await;
116 }
117 }
118}