1pub mod gossip;
2mod state;
3mod timekeeper;
4
5use crate::source::gossip::{GossipProof, PotGossipWorker, ToGossipMessage};
6use crate::source::state::{PotState, PotStateUpdateOutcome};
7use crate::source::timekeeper::{TimekeeperProof, run_timekeeper};
8use crate::verifier::PotVerifier;
9use core_affinity::CoreId;
10use derive_more::{Deref, DerefMut};
11use futures::channel::mpsc;
12use futures::{StreamExt, select};
13use sc_client_api::BlockchainEvents;
14use sc_network::{NotificationService, PeerId};
15use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing};
16use sp_api::{ApiError, ProvideRuntimeApi};
17use sp_blockchain::HeaderBackend;
18use sp_consensus::SyncOracle;
19use sp_consensus_slots::Slot;
20use sp_consensus_subspace::digests::{extract_pre_digest, extract_subspace_digest_items};
21use sp_consensus_subspace::{ChainConstants, PotNextSlotInput, PotParameters, SubspaceApi};
22use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
23use std::collections::HashSet;
24use std::marker::PhantomData;
25use std::sync::Arc;
26use std::thread;
27use subspace_core_primitives::PublicKey;
28use subspace_core_primitives::pot::{PotCheckpoints, PotOutput, PotSeed};
29use thread_priority::{ThreadPriority, set_current_thread_priority};
30use tokio::sync::broadcast;
31use tracing::{Span, debug, error, trace, warn};
32
33const LOCAL_PROOFS_CHANNEL_CAPACITY: usize = 10;
34const SLOTS_CHANNEL_CAPACITY: usize = 10;
35const GOSSIP_OUTGOING_CHANNEL_CAPACITY: usize = 10;
36const GOSSIP_INCOMING_CHANNEL_CAPACITY: usize = 10;
37
38#[derive(Clone)]
40pub struct PotSlotInfo {
41 pub slot: Slot,
43 pub checkpoints: PotCheckpoints,
45}
46
47#[derive(Debug, Deref, DerefMut)]
49pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
50
51#[derive(Debug)]
56#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
57pub struct PotSourceWorker<Block, Client, SO> {
58 client: Arc<Client>,
59 sync_oracle: SO,
60 chain_constants: ChainConstants,
61 timekeeper_proofs_receiver: mpsc::Receiver<TimekeeperProof>,
62 to_gossip_sender: mpsc::Sender<ToGossipMessage>,
63 from_gossip_receiver: mpsc::Receiver<(PeerId, GossipProof)>,
64 last_slot_sent: Slot,
65 slot_sender: broadcast::Sender<PotSlotInfo>,
66 state: Arc<PotState>,
67 _block: PhantomData<Block>,
68}
69
70impl<Block, Client, SO> PotSourceWorker<Block, Client, SO>
71where
72 Block: BlockT,
73 Client: BlockchainEvents<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block>,
74 Client::Api: SubspaceApi<Block, PublicKey>,
75 SO: SyncOracle + Clone + Send + Sync + 'static,
76{
77 #[allow(clippy::too_many_arguments)]
79 pub fn new<Network, GossipSync>(
80 is_timekeeper: bool,
81 timekeeper_cpu_cores: HashSet<usize>,
82 client: Arc<Client>,
83 pot_verifier: PotVerifier,
84 network: Network,
85 notification_service: Box<dyn NotificationService>,
86 sync: Arc<GossipSync>,
87 sync_oracle: SO,
88 ) -> Result<(Self, PotGossipWorker<Block>, PotSlotInfoStream), ApiError>
89 where
90 Network: GossipNetwork<Block> + Send + Sync + Clone + 'static,
91 GossipSync: GossipSyncing<Block> + 'static,
92 {
93 let best_hash = client.info().best_hash;
94 let runtime_api = client.runtime_api();
95 let chain_constants = runtime_api.chain_constants(best_hash)?;
96
97 let best_header = client
98 .header(best_hash)?
99 .ok_or_else(|| ApiError::UnknownBlock(format!("Parent block {best_hash} not found")))?;
100 let best_pre_digest = extract_pre_digest(&best_header)
101 .map_err(|error| ApiError::Application(error.into()))?;
102
103 let parent_slot = if best_header.number().is_zero() {
104 Slot::from(0)
105 } else {
106 best_pre_digest.slot() + chain_constants.block_authoring_delay()
108 };
109
110 let pot_parameters = runtime_api.pot_parameters(best_hash)?;
111 let maybe_next_parameters_change = pot_parameters.next_parameters_change();
112
113 let pot_input = pot_next_slot_input::<Block>(
114 best_header.number(),
115 parent_slot,
116 &pot_parameters,
117 pot_verifier.genesis_seed(),
118 best_pre_digest.pot_info().future_proof_of_time(),
119 );
120
121 let state = Arc::new(PotState::new(
122 pot_input,
123 maybe_next_parameters_change,
124 pot_verifier.clone(),
125 ));
126
127 let (timekeeper_proofs_sender, timekeeper_proofs_receiver) =
128 mpsc::channel(LOCAL_PROOFS_CHANNEL_CAPACITY);
129 let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
130 if is_timekeeper {
131 let state = Arc::clone(&state);
132 let pot_verifier = pot_verifier.clone();
133 let span = Span::current();
134
135 thread::Builder::new()
136 .name("timekeeper".to_string())
137 .spawn(move || {
138 let _guard = span.enter();
139
140 if let Some(core) = timekeeper_cpu_cores.into_iter().next()
141 && !core_affinity::set_for_current(CoreId { id: core })
142 {
143 warn!(
144 %core,
145 "Failed to set core affinity, timekeeper will run on random CPU \
146 core",
147 );
148 }
149
150 if let Err(error) = set_current_thread_priority(ThreadPriority::Max) {
151 warn!(
152 %error,
153 "Failed to set thread priority, timekeeper performance may be \
154 negatively impacted by other software running on this machine",
155 );
156 }
157
158 if let Err(error) =
159 run_timekeeper(state, pot_verifier, timekeeper_proofs_sender)
160 {
161 error!(%error, "Timekeeper exited with an error");
162 }
163 })
164 .expect("Thread creation must not panic");
165 }
166
167 let (to_gossip_sender, to_gossip_receiver) =
168 mpsc::channel(GOSSIP_OUTGOING_CHANNEL_CAPACITY);
169 let (from_gossip_sender, from_gossip_receiver) =
170 mpsc::channel(GOSSIP_INCOMING_CHANNEL_CAPACITY);
171 let gossip_worker = PotGossipWorker::new(
172 to_gossip_receiver,
173 from_gossip_sender,
174 pot_verifier,
175 Arc::clone(&state),
176 network,
177 notification_service,
178 sync,
179 sync_oracle.clone(),
180 );
181
182 let source_worker = Self {
183 client,
184 sync_oracle,
185 chain_constants,
186 timekeeper_proofs_receiver,
187 to_gossip_sender,
188 from_gossip_receiver,
189 last_slot_sent: Slot::from(0),
190 slot_sender,
191 state,
192 _block: PhantomData,
193 };
194
195 let pot_slot_info_stream = PotSlotInfoStream(slot_receiver);
196
197 Ok((source_worker, gossip_worker, pot_slot_info_stream))
198 }
199
200 pub async fn run(mut self) {
202 let mut import_notification_stream = self.client.import_notification_stream();
203
204 loop {
205 select! {
206 timekeeper_proof = self.timekeeper_proofs_receiver.select_next_some() => {
208 self.handle_timekeeper_proof(timekeeper_proof);
209 }
210 maybe_gossip_proof = self.from_gossip_receiver.next() => {
212 if let Some((sender, gossip_proof)) = maybe_gossip_proof {
213 self.handle_gossip_proof(sender, gossip_proof);
214 } else {
215 debug!("Incoming gossip messages stream ended, exiting");
216 return;
217 }
218 }
219 maybe_import_notification = import_notification_stream.next() => {
220 if let Some(import_notification) = maybe_import_notification {
221 if !import_notification.is_new_best {
222 continue;
224 }
225 self.handle_block_import_notification(
226 import_notification.hash,
227 &import_notification.header,
228 );
229 } else {
230 debug!("Import notifications stream ended, exiting");
231 return;
232 }
233 }
234 }
235 }
236 }
237
238 fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
239 let TimekeeperProof {
240 slot,
241 seed,
242 slot_iterations,
243 checkpoints,
244 } = proof;
245
246 if self.sync_oracle.is_major_syncing() {
247 trace!(
248 ?slot,
249 %seed,
250 %slot_iterations,
251 output = %checkpoints.output(),
252 "Ignore timekeeper proof due to major syncing",
253 );
254
255 return;
256 }
257
258 debug!(
259 ?slot,
260 %seed,
261 %slot_iterations,
262 output = %checkpoints.output(),
263 "Received timekeeper proof",
264 );
265
266 if self
267 .to_gossip_sender
268 .try_send(ToGossipMessage::Proof(GossipProof {
269 slot,
270 seed,
271 slot_iterations,
272 checkpoints,
273 }))
274 .is_err()
275 {
276 debug!(
277 %slot,
278 "Gossip is not able to keep-up with slot production (timekeeper)",
279 );
280 }
281
282 if slot > self.last_slot_sent {
283 self.last_slot_sent = slot;
284
285 let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
288 }
289 }
290
291 fn handle_gossip_proof(&mut self, _sender: PeerId, proof: GossipProof) {
294 let expected_next_slot_input = PotNextSlotInput {
295 slot: proof.slot,
296 slot_iterations: proof.slot_iterations,
297 seed: proof.seed,
298 };
299
300 if let Ok(next_slot_input) = self.state.try_extend(
301 expected_next_slot_input,
302 proof.slot,
303 proof.checkpoints.output(),
304 None,
305 ) {
306 if proof.slot > self.last_slot_sent {
307 self.last_slot_sent = proof.slot;
308
309 let _ = self.slot_sender.send(PotSlotInfo {
312 slot: proof.slot,
313 checkpoints: proof.checkpoints,
314 });
315 }
316
317 if self
318 .to_gossip_sender
319 .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
320 .is_err()
321 {
322 debug!(
323 slot = %proof.slot,
324 next_slot = %next_slot_input.slot,
325 "Gossip is not able to keep-up with slot production (gossip)",
326 );
327 }
328 }
329 }
330
331 fn handle_block_import_notification(
332 &mut self,
333 block_hash: Block::Hash,
334 header: &Block::Header,
335 ) {
336 let subspace_digest_items =
337 match extract_subspace_digest_items::<Block::Header, PublicKey>(header) {
338 Ok(pre_digest) => pre_digest,
339 Err(error) => {
340 error!(
341 %error,
342 block_number = %header.number(),
343 %block_hash,
344 "Failed to extract Subspace digest items from header"
345 );
346 return;
347 }
348 };
349
350 let best_slot =
351 subspace_digest_items.pre_digest.slot() + self.chain_constants.block_authoring_delay();
352 let best_proof = subspace_digest_items
353 .pre_digest
354 .pot_info()
355 .future_proof_of_time();
356
357 match self.state.update(
363 best_slot,
364 best_proof,
365 Some(subspace_digest_items.pot_parameters_change),
366 ) {
367 PotStateUpdateOutcome::NoChange => {
368 trace!(
369 %best_slot,
370 "Block import didn't result in proof of time chain changes",
371 );
372 }
373 PotStateUpdateOutcome::Extension { from, to } => {
374 warn!(
375 from_next_slot = %from.slot,
376 to_next_slot = %to.slot,
377 "Proof of time chain was extended from block import",
378 );
379
380 if self
381 .to_gossip_sender
382 .try_send(ToGossipMessage::NextSlotInput(to))
383 .is_err()
384 {
385 debug!(
386 next_slot = %to.slot,
387 "Gossip is not able to keep-up with slot production (block import)",
388 );
389 }
390 }
391 PotStateUpdateOutcome::Reorg { from, to } => {
392 warn!(
393 from_next_slot = %from.slot,
394 to_next_slot = %to.slot,
395 "Proof of time chain reorg happened",
396 );
397
398 if self
399 .to_gossip_sender
400 .try_send(ToGossipMessage::NextSlotInput(to))
401 .is_err()
402 {
403 debug!(
404 next_slot = %to.slot,
405 "Gossip is not able to keep-up with slot production (block import)",
406 );
407 }
408 }
409 }
410 }
411
412 pub fn subscribe_pot_slot_info_stream(&self) -> broadcast::Receiver<PotSlotInfo> {
414 self.slot_sender.subscribe()
415 }
416}
417
418pub fn pot_next_slot_input<Block: BlockT>(
420 parent_block_number: &NumberFor<Block>,
421 parent_slot: Slot,
422 parent_pot_parameters: &PotParameters,
423 genesis_pot_seed: PotSeed,
424 parent_pot_output: PotOutput,
425) -> PotNextSlotInput {
426 if parent_block_number.is_zero() {
427 PotNextSlotInput {
428 slot: parent_slot + Slot::from(1),
429 slot_iterations: parent_pot_parameters.slot_iterations(),
430 seed: genesis_pot_seed,
431 }
432 } else {
433 PotNextSlotInput::derive(
434 parent_pot_parameters.slot_iterations(),
435 parent_slot,
436 parent_pot_output,
437 &parent_pot_parameters.next_parameters_change(),
438 )
439 }
440}