1pub mod gossip;
2mod state;
3mod timekeeper;
4
5use crate::source::gossip::{GossipProof, PotGossipWorker, ToGossipMessage};
6use crate::source::state::{PotState, PotStateUpdateOutcome};
7use crate::source::timekeeper::{run_timekeeper, TimekeeperProof};
8use crate::verifier::PotVerifier;
9use core_affinity::CoreId;
10use derive_more::{Deref, DerefMut};
11use futures::channel::mpsc;
12use futures::{select, StreamExt};
13use sc_client_api::BlockchainEvents;
14use sc_network::{NotificationService, PeerId};
15use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing};
16use sp_api::{ApiError, ProvideRuntimeApi};
17use sp_blockchain::HeaderBackend;
18use sp_consensus::SyncOracle;
19use sp_consensus_slots::Slot;
20use sp_consensus_subspace::digests::{extract_pre_digest, extract_subspace_digest_items};
21use sp_consensus_subspace::{ChainConstants, PotNextSlotInput, SubspaceApi};
22use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero};
23use std::collections::HashSet;
24use std::marker::PhantomData;
25use std::sync::Arc;
26use std::thread;
27use subspace_core_primitives::pot::PotCheckpoints;
28use subspace_core_primitives::PublicKey;
29use thread_priority::{set_current_thread_priority, ThreadPriority};
30use tokio::sync::broadcast;
31use tracing::{debug, error, trace, warn, Span};
32
33const LOCAL_PROOFS_CHANNEL_CAPACITY: usize = 10;
34const SLOTS_CHANNEL_CAPACITY: usize = 10;
35const GOSSIP_OUTGOING_CHANNEL_CAPACITY: usize = 10;
36const GOSSIP_INCOMING_CHANNEL_CAPACITY: usize = 10;
37
38#[derive(Clone)]
40pub struct PotSlotInfo {
41 pub slot: Slot,
43 pub checkpoints: PotCheckpoints,
45}
46
47#[derive(Debug, Deref, DerefMut)]
49pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
50
51#[derive(Debug)]
56#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
57pub struct PotSourceWorker<Block, Client, SO> {
58 client: Arc<Client>,
59 sync_oracle: SO,
60 chain_constants: ChainConstants,
61 timekeeper_proofs_receiver: mpsc::Receiver<TimekeeperProof>,
62 to_gossip_sender: mpsc::Sender<ToGossipMessage>,
63 from_gossip_receiver: mpsc::Receiver<(PeerId, GossipProof)>,
64 last_slot_sent: Slot,
65 slot_sender: broadcast::Sender<PotSlotInfo>,
66 state: Arc<PotState>,
67 _block: PhantomData<Block>,
68}
69
70impl<Block, Client, SO> PotSourceWorker<Block, Client, SO>
71where
72 Block: BlockT,
73 Client: BlockchainEvents<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block>,
74 Client::Api: SubspaceApi<Block, PublicKey>,
75 SO: SyncOracle + Clone + Send + Sync + 'static,
76{
77 #[allow(clippy::too_many_arguments)]
79 pub fn new<Network, GossipSync>(
80 is_timekeeper: bool,
81 timekeeper_cpu_cores: HashSet<usize>,
82 client: Arc<Client>,
83 pot_verifier: PotVerifier,
84 network: Network,
85 notification_service: Box<dyn NotificationService>,
86 sync: Arc<GossipSync>,
87 sync_oracle: SO,
88 ) -> Result<(Self, PotGossipWorker<Block>, PotSlotInfoStream), ApiError>
89 where
90 Network: GossipNetwork<Block> + Send + Sync + Clone + 'static,
91 GossipSync: GossipSyncing<Block> + 'static,
92 {
93 let best_hash = client.info().best_hash;
94 let runtime_api = client.runtime_api();
95 let chain_constants = runtime_api.chain_constants(best_hash)?;
96
97 let best_header = client
98 .header(best_hash)?
99 .ok_or_else(|| ApiError::UnknownBlock(format!("Parent block {best_hash} not found")))?;
100 let best_pre_digest = extract_pre_digest(&best_header)
101 .map_err(|error| ApiError::Application(error.into()))?;
102
103 let parent_slot = if best_header.number().is_zero() {
104 Slot::from(0)
105 } else {
106 best_pre_digest.slot() + chain_constants.block_authoring_delay()
108 };
109
110 let pot_parameters = runtime_api.pot_parameters(best_hash)?;
111 let maybe_next_parameters_change = pot_parameters.next_parameters_change();
112
113 let pot_input = if best_header.number().is_zero() {
114 PotNextSlotInput {
115 slot: parent_slot + Slot::from(1),
116 slot_iterations: pot_parameters.slot_iterations(),
117 seed: pot_verifier.genesis_seed(),
118 }
119 } else {
120 PotNextSlotInput::derive(
121 pot_parameters.slot_iterations(),
122 parent_slot,
123 best_pre_digest.pot_info().future_proof_of_time(),
124 &maybe_next_parameters_change,
125 )
126 };
127
128 let state = Arc::new(PotState::new(
129 pot_input,
130 maybe_next_parameters_change,
131 pot_verifier.clone(),
132 ));
133
134 let (timekeeper_proofs_sender, timekeeper_proofs_receiver) =
135 mpsc::channel(LOCAL_PROOFS_CHANNEL_CAPACITY);
136 let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
137 if is_timekeeper {
138 let state = Arc::clone(&state);
139 let pot_verifier = pot_verifier.clone();
140 let span = Span::current();
141
142 thread::Builder::new()
143 .name("timekeeper".to_string())
144 .spawn(move || {
145 let _guard = span.enter();
146
147 if let Some(core) = timekeeper_cpu_cores.into_iter().next() {
148 if !core_affinity::set_for_current(CoreId { id: core }) {
149 warn!(
150 %core,
151 "Failed to set core affinity, timekeeper will run on random CPU \
152 core",
153 );
154 }
155 }
156
157 if let Err(error) = set_current_thread_priority(ThreadPriority::Max) {
158 warn!(
159 %error,
160 "Failed to set thread priority, timekeeper performance may be \
161 negatively impacted by other software running on this machine",
162 );
163 }
164
165 if let Err(error) =
166 run_timekeeper(state, pot_verifier, timekeeper_proofs_sender)
167 {
168 error!(%error, "Timekeeper exited with an error");
169 }
170 })
171 .expect("Thread creation must not panic");
172 }
173
174 let (to_gossip_sender, to_gossip_receiver) =
175 mpsc::channel(GOSSIP_OUTGOING_CHANNEL_CAPACITY);
176 let (from_gossip_sender, from_gossip_receiver) =
177 mpsc::channel(GOSSIP_INCOMING_CHANNEL_CAPACITY);
178 let gossip_worker = PotGossipWorker::new(
179 to_gossip_receiver,
180 from_gossip_sender,
181 pot_verifier,
182 Arc::clone(&state),
183 network,
184 notification_service,
185 sync,
186 sync_oracle.clone(),
187 );
188
189 let source_worker = Self {
190 client,
191 sync_oracle,
192 chain_constants,
193 timekeeper_proofs_receiver,
194 to_gossip_sender,
195 from_gossip_receiver,
196 last_slot_sent: Slot::from(0),
197 slot_sender,
198 state,
199 _block: PhantomData,
200 };
201
202 let pot_slot_info_stream = PotSlotInfoStream(slot_receiver);
203
204 Ok((source_worker, gossip_worker, pot_slot_info_stream))
205 }
206
207 pub async fn run(mut self) {
209 let mut import_notification_stream = self.client.import_notification_stream();
210
211 loop {
212 select! {
213 timekeeper_proof = self.timekeeper_proofs_receiver.select_next_some() => {
215 self.handle_timekeeper_proof(timekeeper_proof);
216 }
217 maybe_gossip_proof = self.from_gossip_receiver.next() => {
219 if let Some((sender, gossip_proof)) = maybe_gossip_proof {
220 self.handle_gossip_proof(sender, gossip_proof);
221 } else {
222 debug!("Incoming gossip messages stream ended, exiting");
223 return;
224 }
225 }
226 maybe_import_notification = import_notification_stream.next() => {
227 if let Some(import_notification) = maybe_import_notification {
228 if !import_notification.is_new_best {
229 continue;
231 }
232 self.handle_block_import_notification(
233 import_notification.hash,
234 &import_notification.header,
235 );
236 } else {
237 debug!("Import notifications stream ended, exiting");
238 return;
239 }
240 }
241 }
242 }
243 }
244
245 fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
246 let TimekeeperProof {
247 slot,
248 seed,
249 slot_iterations,
250 checkpoints,
251 } = proof;
252
253 if self.sync_oracle.is_major_syncing() {
254 trace!(
255 ?slot,
256 %seed,
257 %slot_iterations,
258 output = %checkpoints.output(),
259 "Ignore timekeeper proof due to major syncing",
260 );
261
262 return;
263 }
264
265 debug!(
266 ?slot,
267 %seed,
268 %slot_iterations,
269 output = %checkpoints.output(),
270 "Received timekeeper proof",
271 );
272
273 if self
274 .to_gossip_sender
275 .try_send(ToGossipMessage::Proof(GossipProof {
276 slot,
277 seed,
278 slot_iterations,
279 checkpoints,
280 }))
281 .is_err()
282 {
283 debug!(
284 %slot,
285 "Gossip is not able to keep-up with slot production (timekeeper)",
286 );
287 }
288
289 if slot > self.last_slot_sent {
290 self.last_slot_sent = slot;
291
292 let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
295 }
296 }
297
298 fn handle_gossip_proof(&mut self, _sender: PeerId, proof: GossipProof) {
301 let expected_next_slot_input = PotNextSlotInput {
302 slot: proof.slot,
303 slot_iterations: proof.slot_iterations,
304 seed: proof.seed,
305 };
306
307 if let Ok(next_slot_input) = self.state.try_extend(
308 expected_next_slot_input,
309 proof.slot,
310 proof.checkpoints.output(),
311 None,
312 ) {
313 if proof.slot > self.last_slot_sent {
314 self.last_slot_sent = proof.slot;
315
316 let _ = self.slot_sender.send(PotSlotInfo {
319 slot: proof.slot,
320 checkpoints: proof.checkpoints,
321 });
322 }
323
324 if self
325 .to_gossip_sender
326 .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
327 .is_err()
328 {
329 debug!(
330 slot = %proof.slot,
331 next_slot = %next_slot_input.slot,
332 "Gossip is not able to keep-up with slot production (gossip)",
333 );
334 }
335 }
336 }
337
338 fn handle_block_import_notification(
339 &mut self,
340 block_hash: Block::Hash,
341 header: &Block::Header,
342 ) {
343 let subspace_digest_items =
344 match extract_subspace_digest_items::<Block::Header, PublicKey>(header) {
345 Ok(pre_digest) => pre_digest,
346 Err(error) => {
347 error!(
348 %error,
349 block_number = %header.number(),
350 %block_hash,
351 "Failed to extract Subspace digest items from header"
352 );
353 return;
354 }
355 };
356
357 let best_slot =
358 subspace_digest_items.pre_digest.slot() + self.chain_constants.block_authoring_delay();
359 let best_proof = subspace_digest_items
360 .pre_digest
361 .pot_info()
362 .future_proof_of_time();
363
364 match self.state.update(
370 best_slot,
371 best_proof,
372 Some(subspace_digest_items.pot_parameters_change),
373 ) {
374 PotStateUpdateOutcome::NoChange => {
375 trace!(
376 %best_slot,
377 "Block import didn't result in proof of time chain changes",
378 );
379 }
380 PotStateUpdateOutcome::Extension { from, to } => {
381 warn!(
382 from_next_slot = %from.slot,
383 to_next_slot = %to.slot,
384 "Proof of time chain was extended from block import",
385 );
386
387 if self
388 .to_gossip_sender
389 .try_send(ToGossipMessage::NextSlotInput(to))
390 .is_err()
391 {
392 debug!(
393 next_slot = %to.slot,
394 "Gossip is not able to keep-up with slot production (block import)",
395 );
396 }
397 }
398 PotStateUpdateOutcome::Reorg { from, to } => {
399 warn!(
400 from_next_slot = %from.slot,
401 to_next_slot = %to.slot,
402 "Proof of time chain reorg happened",
403 );
404
405 if self
406 .to_gossip_sender
407 .try_send(ToGossipMessage::NextSlotInput(to))
408 .is_err()
409 {
410 debug!(
411 next_slot = %to.slot,
412 "Gossip is not able to keep-up with slot production (block import)",
413 );
414 }
415 }
416 }
417 }
418
419 pub fn subscribe_pot_slot_info_stream(&self) -> broadcast::Receiver<PotSlotInfo> {
421 self.slot_sender.subscribe()
422 }
423}