sc_proof_of_time/
source.rs

1pub mod gossip;
2mod state;
3mod timekeeper;
4
5use crate::source::gossip::{GossipProof, PotGossipWorker, ToGossipMessage};
6use crate::source::state::{PotState, PotStateUpdateOutcome};
7use crate::source::timekeeper::{TimekeeperProof, run_timekeeper};
8use crate::verifier::PotVerifier;
9use core_affinity::CoreId;
10use derive_more::{Deref, DerefMut};
11use futures::channel::mpsc;
12use futures::{StreamExt, select};
13use sc_client_api::BlockchainEvents;
14use sc_network::{NotificationService, PeerId};
15use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing};
16use sp_api::{ApiError, ProvideRuntimeApi};
17use sp_blockchain::HeaderBackend;
18use sp_consensus::SyncOracle;
19use sp_consensus_slots::Slot;
20use sp_consensus_subspace::digests::{extract_pre_digest, extract_subspace_digest_items};
21use sp_consensus_subspace::{ChainConstants, PotNextSlotInput, PotParameters, SubspaceApi};
22use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
23use std::collections::HashSet;
24use std::marker::PhantomData;
25use std::sync::Arc;
26use std::thread;
27use subspace_core_primitives::PublicKey;
28use subspace_core_primitives::pot::{PotCheckpoints, PotOutput, PotSeed};
29use thread_priority::{ThreadPriority, set_current_thread_priority};
30use tokio::sync::broadcast;
31use tracing::{Span, debug, error, trace, warn};
32
33const LOCAL_PROOFS_CHANNEL_CAPACITY: usize = 10;
34const SLOTS_CHANNEL_CAPACITY: usize = 10;
35const GOSSIP_OUTGOING_CHANNEL_CAPACITY: usize = 10;
36const GOSSIP_INCOMING_CHANNEL_CAPACITY: usize = 10;
37
38/// Proof of time slot information
39#[derive(Clone)]
40pub struct PotSlotInfo {
41    /// Slot number
42    pub slot: Slot,
43    /// Proof of time checkpoints
44    pub checkpoints: PotCheckpoints,
45}
46
47/// Stream with proof of time slots
48#[derive(Debug, Deref, DerefMut)]
49pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
50
51/// Worker producing proofs of time.
52///
53/// Depending on configuration may produce proofs of time locally, send/receive via gossip and keep
54/// up to day with blockchain reorgs.
55#[derive(Debug)]
56#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
57pub struct PotSourceWorker<Block, Client, SO> {
58    client: Arc<Client>,
59    sync_oracle: SO,
60    chain_constants: ChainConstants,
61    timekeeper_proofs_receiver: mpsc::Receiver<TimekeeperProof>,
62    to_gossip_sender: mpsc::Sender<ToGossipMessage>,
63    from_gossip_receiver: mpsc::Receiver<(PeerId, GossipProof)>,
64    last_slot_sent: Slot,
65    slot_sender: broadcast::Sender<PotSlotInfo>,
66    state: Arc<PotState>,
67    _block: PhantomData<Block>,
68}
69
70impl<Block, Client, SO> PotSourceWorker<Block, Client, SO>
71where
72    Block: BlockT,
73    Client: BlockchainEvents<Block> + HeaderBackend<Block> + ProvideRuntimeApi<Block>,
74    Client::Api: SubspaceApi<Block, PublicKey>,
75    SO: SyncOracle + Clone + Send + Sync + 'static,
76{
77    // TODO: Struct for arguments
78    #[allow(clippy::too_many_arguments)]
79    pub fn new<Network, GossipSync>(
80        is_timekeeper: bool,
81        timekeeper_cpu_cores: HashSet<usize>,
82        client: Arc<Client>,
83        pot_verifier: PotVerifier,
84        network: Network,
85        notification_service: Box<dyn NotificationService>,
86        sync: Arc<GossipSync>,
87        sync_oracle: SO,
88    ) -> Result<(Self, PotGossipWorker<Block>, PotSlotInfoStream), ApiError>
89    where
90        Network: GossipNetwork<Block> + Send + Sync + Clone + 'static,
91        GossipSync: GossipSyncing<Block> + 'static,
92    {
93        let best_hash = client.info().best_hash;
94        let runtime_api = client.runtime_api();
95        let chain_constants = runtime_api.chain_constants(best_hash)?;
96
97        let best_header = client
98            .header(best_hash)?
99            .ok_or_else(|| ApiError::UnknownBlock(format!("Parent block {best_hash} not found")))?;
100        let best_pre_digest = extract_pre_digest(&best_header)
101            .map_err(|error| ApiError::Application(error.into()))?;
102
103        let parent_slot = if best_header.number().is_zero() {
104            Slot::from(0)
105        } else {
106            // The best one seen
107            best_pre_digest.slot() + chain_constants.block_authoring_delay()
108        };
109
110        let pot_parameters = runtime_api.pot_parameters(best_hash)?;
111        let maybe_next_parameters_change = pot_parameters.next_parameters_change();
112
113        let pot_input = pot_next_slot_input::<Block>(
114            best_header.number(),
115            parent_slot,
116            &pot_parameters,
117            pot_verifier.genesis_seed(),
118            best_pre_digest.pot_info().future_proof_of_time(),
119        );
120
121        let state = Arc::new(PotState::new(
122            pot_input,
123            maybe_next_parameters_change,
124            pot_verifier.clone(),
125        ));
126
127        let (timekeeper_proofs_sender, timekeeper_proofs_receiver) =
128            mpsc::channel(LOCAL_PROOFS_CHANNEL_CAPACITY);
129        let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
130        if is_timekeeper {
131            let state = Arc::clone(&state);
132            let pot_verifier = pot_verifier.clone();
133            let span = Span::current();
134
135            thread::Builder::new()
136                .name("timekeeper".to_string())
137                .spawn(move || {
138                    let _guard = span.enter();
139
140                    if let Some(core) = timekeeper_cpu_cores.into_iter().next()
141                        && !core_affinity::set_for_current(CoreId { id: core })
142                    {
143                        warn!(
144                            %core,
145                            "Failed to set core affinity, timekeeper will run on random CPU \
146                            core",
147                        );
148                    }
149
150                    if let Err(error) = set_current_thread_priority(ThreadPriority::Max) {
151                        warn!(
152                            %error,
153                            "Failed to set thread priority, timekeeper performance may be \
154                            negatively impacted by other software running on this machine",
155                        );
156                    }
157
158                    if let Err(error) =
159                        run_timekeeper(state, pot_verifier, timekeeper_proofs_sender)
160                    {
161                        error!(%error, "Timekeeper exited with an error");
162                    }
163                })
164                .expect("Thread creation must not panic");
165        }
166
167        let (to_gossip_sender, to_gossip_receiver) =
168            mpsc::channel(GOSSIP_OUTGOING_CHANNEL_CAPACITY);
169        let (from_gossip_sender, from_gossip_receiver) =
170            mpsc::channel(GOSSIP_INCOMING_CHANNEL_CAPACITY);
171        let gossip_worker = PotGossipWorker::new(
172            to_gossip_receiver,
173            from_gossip_sender,
174            pot_verifier,
175            Arc::clone(&state),
176            network,
177            notification_service,
178            sync,
179            sync_oracle.clone(),
180        );
181
182        let source_worker = Self {
183            client,
184            sync_oracle,
185            chain_constants,
186            timekeeper_proofs_receiver,
187            to_gossip_sender,
188            from_gossip_receiver,
189            last_slot_sent: Slot::from(0),
190            slot_sender,
191            state,
192            _block: PhantomData,
193        };
194
195        let pot_slot_info_stream = PotSlotInfoStream(slot_receiver);
196
197        Ok((source_worker, gossip_worker, pot_slot_info_stream))
198    }
199
200    /// Run proof of time source
201    pub async fn run(mut self) {
202        let mut import_notification_stream = self.client.import_notification_stream();
203
204        loop {
205            select! {
206                // List of blocks that the client has finalized.
207                timekeeper_proof = self.timekeeper_proofs_receiver.select_next_some() => {
208                    self.handle_timekeeper_proof(timekeeper_proof);
209                }
210                // List of blocks that the client has finalized.
211                maybe_gossip_proof = self.from_gossip_receiver.next() => {
212                    if let Some((sender, gossip_proof)) = maybe_gossip_proof {
213                        self.handle_gossip_proof(sender, gossip_proof);
214                    } else {
215                        debug!("Incoming gossip messages stream ended, exiting");
216                        return;
217                    }
218                }
219                maybe_import_notification = import_notification_stream.next() => {
220                    if let Some(import_notification) = maybe_import_notification {
221                        if !import_notification.is_new_best {
222                            // Ignore blocks that don't extend the chain
223                            continue;
224                        }
225                        self.handle_block_import_notification(
226                            import_notification.hash,
227                            &import_notification.header,
228                        );
229                    } else {
230                        debug!("Import notifications stream ended, exiting");
231                        return;
232                    }
233                }
234            }
235        }
236    }
237
238    fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
239        let TimekeeperProof {
240            slot,
241            seed,
242            slot_iterations,
243            checkpoints,
244        } = proof;
245
246        if self.sync_oracle.is_major_syncing() {
247            trace!(
248                ?slot,
249                %seed,
250                %slot_iterations,
251                output = %checkpoints.output(),
252                "Ignore timekeeper proof due to major syncing",
253            );
254
255            return;
256        }
257
258        debug!(
259            ?slot,
260            %seed,
261            %slot_iterations,
262            output = %checkpoints.output(),
263            "Received timekeeper proof",
264        );
265
266        if self
267            .to_gossip_sender
268            .try_send(ToGossipMessage::Proof(GossipProof {
269                slot,
270                seed,
271                slot_iterations,
272                checkpoints,
273            }))
274            .is_err()
275        {
276            debug!(
277                %slot,
278                "Gossip is not able to keep-up with slot production (timekeeper)",
279            );
280        }
281
282        if slot > self.last_slot_sent {
283            self.last_slot_sent = slot;
284
285            // We don't care if block production is too slow or block production is not enabled on this
286            // node at all
287            let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
288        }
289    }
290
291    // TODO: Follow both verified and unverified checkpoints to start secondary timekeeper ASAP in
292    //  case verification succeeds
293    fn handle_gossip_proof(&mut self, _sender: PeerId, proof: GossipProof) {
294        let expected_next_slot_input = PotNextSlotInput {
295            slot: proof.slot,
296            slot_iterations: proof.slot_iterations,
297            seed: proof.seed,
298        };
299
300        if let Ok(next_slot_input) = self.state.try_extend(
301            expected_next_slot_input,
302            proof.slot,
303            proof.checkpoints.output(),
304            None,
305        ) {
306            if proof.slot > self.last_slot_sent {
307                self.last_slot_sent = proof.slot;
308
309                // We don't care if block production is too slow or block production is not enabled on
310                // this node at all
311                let _ = self.slot_sender.send(PotSlotInfo {
312                    slot: proof.slot,
313                    checkpoints: proof.checkpoints,
314                });
315            }
316
317            if self
318                .to_gossip_sender
319                .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
320                .is_err()
321            {
322                debug!(
323                    slot = %proof.slot,
324                    next_slot = %next_slot_input.slot,
325                    "Gossip is not able to keep-up with slot production (gossip)",
326                );
327            }
328        }
329    }
330
331    fn handle_block_import_notification(
332        &mut self,
333        block_hash: Block::Hash,
334        header: &Block::Header,
335    ) {
336        let subspace_digest_items =
337            match extract_subspace_digest_items::<Block::Header, PublicKey>(header) {
338                Ok(pre_digest) => pre_digest,
339                Err(error) => {
340                    error!(
341                        %error,
342                        block_number = %header.number(),
343                        %block_hash,
344                        "Failed to extract Subspace digest items from header"
345                    );
346                    return;
347                }
348            };
349
350        let best_slot =
351            subspace_digest_items.pre_digest.slot() + self.chain_constants.block_authoring_delay();
352        let best_proof = subspace_digest_items
353            .pre_digest
354            .pot_info()
355            .future_proof_of_time();
356
357        // This will do one of 3 things depending on circumstances:
358        // * if block import is ahead of timekeeper and gossip, it will update next slot input
359        // * if block import is on a different PoT chain, it will update next slot input to the
360        //   correct fork (reorg)
361        // * if block import is on the same PoT chain this will essentially do nothing
362        match self.state.update(
363            best_slot,
364            best_proof,
365            Some(subspace_digest_items.pot_parameters_change),
366        ) {
367            PotStateUpdateOutcome::NoChange => {
368                trace!(
369                    %best_slot,
370                    "Block import didn't result in proof of time chain changes",
371                );
372            }
373            PotStateUpdateOutcome::Extension { from, to } => {
374                warn!(
375                    from_next_slot = %from.slot,
376                    to_next_slot = %to.slot,
377                    "Proof of time chain was extended from block import",
378                );
379
380                if self
381                    .to_gossip_sender
382                    .try_send(ToGossipMessage::NextSlotInput(to))
383                    .is_err()
384                {
385                    debug!(
386                        next_slot = %to.slot,
387                        "Gossip is not able to keep-up with slot production (block import)",
388                    );
389                }
390            }
391            PotStateUpdateOutcome::Reorg { from, to } => {
392                warn!(
393                    from_next_slot = %from.slot,
394                    to_next_slot = %to.slot,
395                    "Proof of time chain reorg happened",
396                );
397
398                if self
399                    .to_gossip_sender
400                    .try_send(ToGossipMessage::NextSlotInput(to))
401                    .is_err()
402                {
403                    debug!(
404                        next_slot = %to.slot,
405                        "Gossip is not able to keep-up with slot production (block import)",
406                    );
407                }
408            }
409        }
410    }
411
412    /// Subscribe to pot slot notifications.
413    pub fn subscribe_pot_slot_info_stream(&self) -> broadcast::Receiver<PotSlotInfo> {
414        self.slot_sender.subscribe()
415    }
416}
417
418/// Derives the next PoT slot input from the parent.
419pub fn pot_next_slot_input<Block: BlockT>(
420    parent_block_number: &NumberFor<Block>,
421    parent_slot: Slot,
422    parent_pot_parameters: &PotParameters,
423    genesis_pot_seed: PotSeed,
424    parent_pot_output: PotOutput,
425) -> PotNextSlotInput {
426    if parent_block_number.is_zero() {
427        PotNextSlotInput {
428            slot: parent_slot + Slot::from(1),
429            slot_iterations: parent_pot_parameters.slot_iterations(),
430            seed: genesis_pot_seed,
431        }
432    } else {
433        PotNextSlotInput::derive(
434            parent_pot_parameters.slot_iterations(),
435            parent_slot,
436            parent_pot_output,
437            &parent_pot_parameters.next_parameters_change(),
438        )
439    }
440}