sc_proof_of_time/
source.rs

1pub mod gossip;
2mod state;
3mod timekeeper;
4
5use crate::source::gossip::{GossipProof, PotGossipWorker, ToGossipMessage};
6use crate::source::state::{PotState, PotStateUpdateOutcome};
7use crate::source::timekeeper::{run_timekeeper, TimekeeperProof};
8use crate::verifier::PotVerifier;
9use core_affinity::CoreId;
10use derive_more::{Deref, DerefMut};
11use futures::channel::mpsc;
12use futures::{select, StreamExt};
13use sc_client_api::BlockchainEvents;
14use sc_network::{NotificationService, PeerId};
15use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing};
16use sp_api::{ApiError, ProvideRuntimeApi};
17use sp_blockchain::HeaderBackend;
18use sp_consensus::SyncOracle;
19use sp_consensus_slots::Slot;
20use sp_consensus_subspace::digests::{extract_pre_digest, extract_subspace_digest_items};
21use sp_consensus_subspace::{ChainConstants, PotNextSlotInput, SubspaceApi};
22use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero};
23use std::collections::HashSet;
24use std::marker::PhantomData;
25use std::sync::Arc;
26use std::thread;
27use subspace_core_primitives::pot::PotCheckpoints;
28use subspace_core_primitives::PublicKey;
29use thread_priority::{set_current_thread_priority, ThreadPriority};
30use tokio::sync::broadcast;
31use tracing::{debug, error, trace, warn, Span};
32
33const LOCAL_PROOFS_CHANNEL_CAPACITY: usize = 10;
34const SLOTS_CHANNEL_CAPACITY: usize = 10;
35const GOSSIP_OUTGOING_CHANNEL_CAPACITY: usize = 10;
36const GOSSIP_INCOMING_CHANNEL_CAPACITY: usize = 10;
37
38/// Proof of time slot information
39#[derive(Clone)]
40pub struct PotSlotInfo {
41    /// Slot number
42    pub slot: Slot,
43    /// Proof of time checkpoints
44    pub checkpoints: PotCheckpoints,
45}
46
47/// Stream with proof of time slots
48#[derive(Debug, Deref, DerefMut)]
49pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
50
51/// Worker producing proofs of time.
52///
53/// Depending on configuration may produce proofs of time locally, send/receive via gossip and keep
54/// up to day with blockchain reorgs.
55#[derive(Debug)]
56#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
57pub struct PotSourceWorker<Block, Client, SO> {
58    client: Arc<Client>,
59    sync_oracle: SO,
60    chain_constants: ChainConstants,
61    timekeeper_proofs_receiver: mpsc::Receiver<TimekeeperProof>,
62    to_gossip_sender: mpsc::Sender<ToGossipMessage>,
63    from_gossip_receiver: mpsc::Receiver<(PeerId, GossipProof)>,
64    last_slot_sent: Slot,
65    slot_sender: broadcast::Sender<PotSlotInfo>,
66    state: Arc<PotState>,
67    _block: PhantomData<Block>,
68}
69
70impl<Block, Client, SO> PotSourceWorker<Block, Client, SO>
71where
72    Block: BlockT,
73    Client: BlockchainEvents<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block>,
74    Client::Api: SubspaceApi<Block, PublicKey>,
75    SO: SyncOracle + Clone + Send + Sync + 'static,
76{
77    // TODO: Struct for arguments
78    #[allow(clippy::too_many_arguments)]
79    pub fn new<Network, GossipSync>(
80        is_timekeeper: bool,
81        timekeeper_cpu_cores: HashSet<usize>,
82        client: Arc<Client>,
83        pot_verifier: PotVerifier,
84        network: Network,
85        notification_service: Box<dyn NotificationService>,
86        sync: Arc<GossipSync>,
87        sync_oracle: SO,
88    ) -> Result<(Self, PotGossipWorker<Block>, PotSlotInfoStream), ApiError>
89    where
90        Network: GossipNetwork<Block> + Send + Sync + Clone + 'static,
91        GossipSync: GossipSyncing<Block> + 'static,
92    {
93        let best_hash = client.info().best_hash;
94        let runtime_api = client.runtime_api();
95        let chain_constants = runtime_api.chain_constants(best_hash)?;
96
97        let best_header = client
98            .header(best_hash)?
99            .ok_or_else(|| ApiError::UnknownBlock(format!("Parent block {best_hash} not found")))?;
100        let best_pre_digest = extract_pre_digest(&best_header)
101            .map_err(|error| ApiError::Application(error.into()))?;
102
103        let parent_slot = if best_header.number().is_zero() {
104            Slot::from(0)
105        } else {
106            // The best one seen
107            best_pre_digest.slot() + chain_constants.block_authoring_delay()
108        };
109
110        let pot_parameters = runtime_api.pot_parameters(best_hash)?;
111        let maybe_next_parameters_change = pot_parameters.next_parameters_change();
112
113        let pot_input = if best_header.number().is_zero() {
114            PotNextSlotInput {
115                slot: parent_slot + Slot::from(1),
116                slot_iterations: pot_parameters.slot_iterations(),
117                seed: pot_verifier.genesis_seed(),
118            }
119        } else {
120            PotNextSlotInput::derive(
121                pot_parameters.slot_iterations(),
122                parent_slot,
123                best_pre_digest.pot_info().future_proof_of_time(),
124                &maybe_next_parameters_change,
125            )
126        };
127
128        let state = Arc::new(PotState::new(
129            pot_input,
130            maybe_next_parameters_change,
131            pot_verifier.clone(),
132        ));
133
134        let (timekeeper_proofs_sender, timekeeper_proofs_receiver) =
135            mpsc::channel(LOCAL_PROOFS_CHANNEL_CAPACITY);
136        let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
137        if is_timekeeper {
138            let state = Arc::clone(&state);
139            let pot_verifier = pot_verifier.clone();
140            let span = Span::current();
141
142            thread::Builder::new()
143                .name("timekeeper".to_string())
144                .spawn(move || {
145                    let _guard = span.enter();
146
147                    if let Some(core) = timekeeper_cpu_cores.into_iter().next() {
148                        if !core_affinity::set_for_current(CoreId { id: core }) {
149                            warn!(
150                                %core,
151                                "Failed to set core affinity, timekeeper will run on random CPU \
152                                core",
153                            );
154                        }
155                    }
156
157                    if let Err(error) = set_current_thread_priority(ThreadPriority::Max) {
158                        warn!(
159                            %error,
160                            "Failed to set thread priority, timekeeper performance may be \
161                            negatively impacted by other software running on this machine",
162                        );
163                    }
164
165                    if let Err(error) =
166                        run_timekeeper(state, pot_verifier, timekeeper_proofs_sender)
167                    {
168                        error!(%error, "Timekeeper exited with an error");
169                    }
170                })
171                .expect("Thread creation must not panic");
172        }
173
174        let (to_gossip_sender, to_gossip_receiver) =
175            mpsc::channel(GOSSIP_OUTGOING_CHANNEL_CAPACITY);
176        let (from_gossip_sender, from_gossip_receiver) =
177            mpsc::channel(GOSSIP_INCOMING_CHANNEL_CAPACITY);
178        let gossip_worker = PotGossipWorker::new(
179            to_gossip_receiver,
180            from_gossip_sender,
181            pot_verifier,
182            Arc::clone(&state),
183            network,
184            notification_service,
185            sync,
186            sync_oracle.clone(),
187        );
188
189        let source_worker = Self {
190            client,
191            sync_oracle,
192            chain_constants,
193            timekeeper_proofs_receiver,
194            to_gossip_sender,
195            from_gossip_receiver,
196            last_slot_sent: Slot::from(0),
197            slot_sender,
198            state,
199            _block: PhantomData,
200        };
201
202        let pot_slot_info_stream = PotSlotInfoStream(slot_receiver);
203
204        Ok((source_worker, gossip_worker, pot_slot_info_stream))
205    }
206
207    /// Run proof of time source
208    pub async fn run(mut self) {
209        let mut import_notification_stream = self.client.import_notification_stream();
210
211        loop {
212            select! {
213                // List of blocks that the client has finalized.
214                timekeeper_proof = self.timekeeper_proofs_receiver.select_next_some() => {
215                    self.handle_timekeeper_proof(timekeeper_proof);
216                }
217                // List of blocks that the client has finalized.
218                maybe_gossip_proof = self.from_gossip_receiver.next() => {
219                    if let Some((sender, gossip_proof)) = maybe_gossip_proof {
220                        self.handle_gossip_proof(sender, gossip_proof);
221                    } else {
222                        debug!("Incoming gossip messages stream ended, exiting");
223                        return;
224                    }
225                }
226                maybe_import_notification = import_notification_stream.next() => {
227                    if let Some(import_notification) = maybe_import_notification {
228                        if !import_notification.is_new_best {
229                            // Ignore blocks that don't extend the chain
230                            continue;
231                        }
232                        self.handle_block_import_notification(
233                            import_notification.hash,
234                            &import_notification.header,
235                        );
236                    } else {
237                        debug!("Import notifications stream ended, exiting");
238                        return;
239                    }
240                }
241            }
242        }
243    }
244
245    fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
246        let TimekeeperProof {
247            slot,
248            seed,
249            slot_iterations,
250            checkpoints,
251        } = proof;
252
253        if self.sync_oracle.is_major_syncing() {
254            trace!(
255                ?slot,
256                %seed,
257                %slot_iterations,
258                output = %checkpoints.output(),
259                "Ignore timekeeper proof due to major syncing",
260            );
261
262            return;
263        }
264
265        debug!(
266            ?slot,
267            %seed,
268            %slot_iterations,
269            output = %checkpoints.output(),
270            "Received timekeeper proof",
271        );
272
273        if self
274            .to_gossip_sender
275            .try_send(ToGossipMessage::Proof(GossipProof {
276                slot,
277                seed,
278                slot_iterations,
279                checkpoints,
280            }))
281            .is_err()
282        {
283            debug!(
284                %slot,
285                "Gossip is not able to keep-up with slot production (timekeeper)",
286            );
287        }
288
289        if slot > self.last_slot_sent {
290            self.last_slot_sent = slot;
291
292            // We don't care if block production is too slow or block production is not enabled on this
293            // node at all
294            let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
295        }
296    }
297
298    // TODO: Follow both verified and unverified checkpoints to start secondary timekeeper ASAP in
299    //  case verification succeeds
300    fn handle_gossip_proof(&mut self, _sender: PeerId, proof: GossipProof) {
301        let expected_next_slot_input = PotNextSlotInput {
302            slot: proof.slot,
303            slot_iterations: proof.slot_iterations,
304            seed: proof.seed,
305        };
306
307        if let Ok(next_slot_input) = self.state.try_extend(
308            expected_next_slot_input,
309            proof.slot,
310            proof.checkpoints.output(),
311            None,
312        ) {
313            if proof.slot > self.last_slot_sent {
314                self.last_slot_sent = proof.slot;
315
316                // We don't care if block production is too slow or block production is not enabled on
317                // this node at all
318                let _ = self.slot_sender.send(PotSlotInfo {
319                    slot: proof.slot,
320                    checkpoints: proof.checkpoints,
321                });
322            }
323
324            if self
325                .to_gossip_sender
326                .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
327                .is_err()
328            {
329                debug!(
330                    slot = %proof.slot,
331                    next_slot = %next_slot_input.slot,
332                    "Gossip is not able to keep-up with slot production (gossip)",
333                );
334            }
335        }
336    }
337
338    fn handle_block_import_notification(
339        &mut self,
340        block_hash: Block::Hash,
341        header: &Block::Header,
342    ) {
343        let subspace_digest_items =
344            match extract_subspace_digest_items::<Block::Header, PublicKey>(header) {
345                Ok(pre_digest) => pre_digest,
346                Err(error) => {
347                    error!(
348                        %error,
349                        block_number = %header.number(),
350                        %block_hash,
351                        "Failed to extract Subspace digest items from header"
352                    );
353                    return;
354                }
355            };
356
357        let best_slot =
358            subspace_digest_items.pre_digest.slot() + self.chain_constants.block_authoring_delay();
359        let best_proof = subspace_digest_items
360            .pre_digest
361            .pot_info()
362            .future_proof_of_time();
363
364        // This will do one of 3 things depending on circumstances:
365        // * if block import is ahead of timekeeper and gossip, it will update next slot input
366        // * if block import is on a different PoT chain, it will update next slot input to the
367        //   correct fork (reorg)
368        // * if block import is on the same PoT chain this will essentially do nothing
369        match self.state.update(
370            best_slot,
371            best_proof,
372            Some(subspace_digest_items.pot_parameters_change),
373        ) {
374            PotStateUpdateOutcome::NoChange => {
375                trace!(
376                    %best_slot,
377                    "Block import didn't result in proof of time chain changes",
378                );
379            }
380            PotStateUpdateOutcome::Extension { from, to } => {
381                warn!(
382                    from_next_slot = %from.slot,
383                    to_next_slot = %to.slot,
384                    "Proof of time chain was extended from block import",
385                );
386
387                if self
388                    .to_gossip_sender
389                    .try_send(ToGossipMessage::NextSlotInput(to))
390                    .is_err()
391                {
392                    debug!(
393                        next_slot = %to.slot,
394                        "Gossip is not able to keep-up with slot production (block import)",
395                    );
396                }
397            }
398            PotStateUpdateOutcome::Reorg { from, to } => {
399                warn!(
400                    from_next_slot = %from.slot,
401                    to_next_slot = %to.slot,
402                    "Proof of time chain reorg happened",
403                );
404
405                if self
406                    .to_gossip_sender
407                    .try_send(ToGossipMessage::NextSlotInput(to))
408                    .is_err()
409                {
410                    debug!(
411                        next_slot = %to.slot,
412                        "Gossip is not able to keep-up with slot production (block import)",
413                    );
414                }
415            }
416        }
417    }
418
419    /// Subscribe to pot slot notifications.
420    pub fn subscribe_pot_slot_info_stream(&self) -> broadcast::Receiver<PotSlotInfo> {
421        self.slot_sender.subscribe()
422    }
423}