sc_proof_of_time/source/
gossip.rs

1//! PoT gossip functionality.
2
3use crate::source::state::PotState;
4use crate::verifier::PotVerifier;
5use futures::channel::mpsc;
6use futures::{FutureExt, SinkExt, StreamExt};
7use parity_scale_codec::{Decode, Encode};
8use parking_lot::Mutex;
9use sc_network::config::NonDefaultSetConfig;
10use sc_network::{NetworkPeers, NotificationService, PeerId};
11use sc_network_gossip::{
12    GossipEngine, MessageIntent, Network as GossipNetwork, Syncing as GossipSyncing,
13    ValidationResult, Validator, ValidatorContext,
14};
15use schnellru::{ByLength, LruMap};
16use sp_consensus::SyncOracle;
17use sp_consensus_slots::Slot;
18use sp_consensus_subspace::PotNextSlotInput;
19use sp_runtime::traits::{Block as BlockT, Hash as HashT, HashingFor};
20use std::cmp;
21use std::collections::{HashMap, VecDeque};
22use std::future::poll_fn;
23use std::num::NonZeroU32;
24use std::pin::pin;
25use std::sync::Arc;
26use subspace_core_primitives::pot::{PotCheckpoints, PotSeed};
27use tracing::{debug, error, trace, warn};
28
29/// How many slots can proof be before it is too far
30const MAX_SLOTS_IN_THE_FUTURE: u64 = 10;
31/// How much faster PoT verification is expected to be comparing to PoT proving
32const EXPECTED_POT_VERIFICATION_SPEEDUP: usize = 7;
33const GOSSIP_CACHE_PEER_COUNT: u32 = 1_000;
34const GOSSIP_CACHE_PER_PEER_SIZE: usize = 20;
35
36mod rep {
37    use sc_network::ReputationChange;
38
39    /// Reputation change when a peer sends us a gossip message that can't be decoded.
40    pub(super) const GOSSIP_NOT_DECODABLE: ReputationChange =
41        ReputationChange::new(-(1 << 3), "PoT: not decodable");
42    /// Reputation change when a peer sends us proof that do not match next slot inputs.
43    pub(super) const GOSSIP_NEXT_SLOT_MISMATCH: ReputationChange =
44        ReputationChange::new(-(1 << 5), "PoT: next slot mismatch");
45    /// Reputation change when a peer sends us proof that correspond to old slot.
46    pub(super) const GOSSIP_OLD_SLOT: ReputationChange =
47        ReputationChange::new(-(1 << 5), "PoT: old slot");
48    /// Reputation change when a peer sends us proof that correspond to slot that is too far
49    /// in the future.
50    pub(super) const GOSSIP_TOO_FAR_IN_THE_FUTURE: ReputationChange =
51        ReputationChange::new(-(1 << 5), "PoT: slot too far in the future");
52    /// Reputation change when a peer sends us proof that correspond to slot iterations
53    /// outside of range.
54    pub(super) const GOSSIP_SLOT_ITERATIONS_OUTSIDE_OF_RANGE: ReputationChange =
55        ReputationChange::new(-(1 << 5), "PoT: slot iterations outside of range");
56    /// Reputation change when a peer sends us proof that was unused and ended up becoming
57    /// outdated.
58    pub(super) const GOSSIP_OUTDATED_PROOF: ReputationChange =
59        ReputationChange::new(-(1 << 5), "PoT: outdated proof");
60    /// Reputation change when a peer sends us too many proofs.
61    pub(super) const GOSSIP_TOO_MANY_PROOFS: ReputationChange =
62        ReputationChange::new(-(1 << 5), "PoT: too many proofs");
63    /// Reputation change when a peer sends us proof that were unused and ended up not
64    /// matching slot inputs.
65    pub(super) const GOSSIP_SLOT_INPUT_MISMATCH: ReputationChange =
66        ReputationChange::new(-(1 << 5), "PoT: slot input mismatch");
67    /// Reputation change when a peer sends us an invalid proof.
68    pub(super) const GOSSIP_INVALID_PROOF: ReputationChange =
69        ReputationChange::new_fatal("PoT: Invalid proof");
70}
71
72const GOSSIP_PROTOCOL: &str = "/subspace/subspace-proof-of-time/1";
73
74/// Returns the network configuration for PoT gossip.
75pub fn pot_gossip_peers_set_config() -> (
76    NonDefaultSetConfig,
77    Box<dyn sc_network::NotificationService>,
78) {
79    let (mut cfg, notification_service) = NonDefaultSetConfig::new(
80        GOSSIP_PROTOCOL.into(),
81        Vec::new(),
82        1024,
83        None,
84        Default::default(),
85    );
86    cfg.allow_non_reserved(25, 25);
87    (cfg, notification_service)
88}
89
90#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Encode, Decode)]
91pub(super) struct GossipProof {
92    /// Slot number
93    pub(super) slot: Slot,
94    /// Proof of time seed
95    pub(super) seed: PotSeed,
96    /// Iterations per slot
97    pub(super) slot_iterations: NonZeroU32,
98    /// Proof of time checkpoints
99    pub(super) checkpoints: PotCheckpoints,
100}
101
102#[derive(Debug)]
103pub(super) enum ToGossipMessage {
104    Proof(GossipProof),
105    NextSlotInput(PotNextSlotInput),
106}
107
108/// PoT gossip worker
109#[must_use = "Gossip worker doesn't do anything unless run() method is called"]
110pub struct PotGossipWorker<Block>
111where
112    Block: BlockT,
113{
114    engine: Arc<Mutex<GossipEngine<Block>>>,
115    network: Arc<dyn NetworkPeers + Send + Sync>,
116    topic: Block::Hash,
117    state: Arc<PotState>,
118    pot_verifier: PotVerifier,
119    gossip_cache: LruMap<PeerId, VecDeque<GossipProof>>,
120    to_gossip_receiver: mpsc::Receiver<ToGossipMessage>,
121    from_gossip_sender: mpsc::Sender<(PeerId, GossipProof)>,
122}
123
124impl<Block> PotGossipWorker<Block>
125where
126    Block: BlockT,
127{
128    /// Instantiate gossip worker
129    // TODO: Struct for arguments
130    #[allow(clippy::too_many_arguments)]
131    pub(super) fn new<Network, GossipSync, SO>(
132        to_gossip_receiver: mpsc::Receiver<ToGossipMessage>,
133        from_gossip_sender: mpsc::Sender<(PeerId, GossipProof)>,
134        pot_verifier: PotVerifier,
135        state: Arc<PotState>,
136        network: Network,
137        notification_service: Box<dyn NotificationService>,
138        sync: Arc<GossipSync>,
139        sync_oracle: SO,
140    ) -> Self
141    where
142        Network: GossipNetwork<Block> + NetworkPeers + Send + Sync + Clone + 'static,
143        GossipSync: GossipSyncing<Block> + 'static,
144        SO: SyncOracle + Send + Sync + 'static,
145    {
146        let topic = HashingFor::<Block>::hash(b"proofs");
147
148        let validator = Arc::new(PotGossipValidator::new(
149            Arc::clone(&state),
150            topic,
151            sync_oracle,
152            network.clone(),
153        ));
154        let engine = GossipEngine::new(
155            network.clone(),
156            sync,
157            notification_service,
158            GOSSIP_PROTOCOL,
159            validator,
160            None,
161        );
162
163        Self {
164            engine: Arc::new(Mutex::new(engine)),
165            network: Arc::new(network),
166            topic,
167            state,
168            pot_verifier,
169            gossip_cache: LruMap::new(ByLength::new(GOSSIP_CACHE_PEER_COUNT)),
170            to_gossip_receiver,
171            from_gossip_sender,
172        }
173    }
174
175    /// Run gossip engine.
176    ///
177    /// NOTE: Even though this function is async, it might do blocking operations internally and
178    /// should be running on a dedicated thread.
179    pub async fn run(mut self) {
180        let message_receiver = self.engine.lock().messages_for(self.topic);
181        let incoming_unverified_messages =
182            pin!(message_receiver.filter_map(|notification| async move {
183                notification.sender.map(|sender| {
184                    let proof = GossipProof::decode(&mut notification.message.as_ref())
185                        .expect("Only valid messages get here; qed");
186
187                    (sender, proof)
188                })
189            }));
190        let mut incoming_unverified_messages = incoming_unverified_messages.fuse();
191
192        loop {
193            let mut gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)).fuse();
194
195            futures::select! {
196                (sender, proof) = incoming_unverified_messages.select_next_some() => {
197                    self.handle_proof_candidate(sender, proof).await;
198                },
199                message = self.to_gossip_receiver.select_next_some() => {
200                    self.handle_to_gossip_messages(message).await
201                },
202                 _ = gossip_engine_poll => {
203                    error!("Gossip engine has terminated");
204                    return;
205                }
206            }
207        }
208    }
209
210    async fn handle_proof_candidate(&mut self, sender: PeerId, proof: GossipProof) {
211        let next_slot_input = self.state.next_slot_input();
212
213        match proof.slot.cmp(&next_slot_input.slot) {
214            cmp::Ordering::Less => {
215                trace!(
216                    %sender,
217                    slot = %proof.slot,
218                    next_slot = %next_slot_input.slot,
219                    "Proof for outdated slot, ignoring",
220                );
221
222                if let Some(verified_checkpoints) = self
223                    .pot_verifier
224                    .try_get_checkpoints(proof.slot_iterations, proof.seed)
225                {
226                    if verified_checkpoints != proof.checkpoints {
227                        trace!(
228                            %sender,
229                            slot = %proof.slot,
230                            "Invalid old proof, punishing sender",
231                        );
232
233                        self.engine.lock().report(sender, rep::GOSSIP_INVALID_PROOF);
234                    }
235                } else {
236                    // We didn't use it, but also didn't bother verifying
237                    self.engine
238                        .lock()
239                        .report(sender, rep::GOSSIP_OUTDATED_PROOF);
240                }
241
242                return;
243            }
244            cmp::Ordering::Equal => {
245                if !(proof.seed == next_slot_input.seed
246                    && proof.slot_iterations == next_slot_input.slot_iterations)
247                {
248                    trace!(
249                        %sender,
250                        slot = %proof.slot,
251                        "Proof with next slot mismatch, ignoring",
252                    );
253
254                    self.engine
255                        .lock()
256                        .report(sender, rep::GOSSIP_NEXT_SLOT_MISMATCH);
257                    return;
258                }
259            }
260            cmp::Ordering::Greater => {
261                trace!(
262                    %sender,
263                    slot = %proof.slot,
264                    next_slot = %next_slot_input.slot,
265                    "Proof from the future",
266                );
267
268                if let Some(proofs) = self.gossip_cache.get_or_insert(sender, Default::default) {
269                    if proofs.len() == GOSSIP_CACHE_PER_PEER_SIZE {
270                        if let Some(proof) = proofs.pop_front() {
271                            trace!(
272                                %sender,
273                                slot = %proof.slot,
274                                next_slot = %next_slot_input.slot,
275                                "Too many proofs stored from peer",
276                            );
277
278                            self.engine
279                                .lock()
280                                .report(sender, rep::GOSSIP_TOO_MANY_PROOFS);
281                        }
282                    }
283                    proofs.push_back(proof);
284                    return;
285                }
286            }
287        }
288
289        if self.pot_verifier.verify_checkpoints(
290            proof.seed,
291            proof.slot_iterations,
292            &proof.checkpoints,
293        ) {
294            debug!(%sender, slot = %proof.slot, "Full verification succeeded");
295
296            self.engine
297                .lock()
298                .gossip_message(self.topic, proof.encode(), false);
299
300            if let Err(error) = self.from_gossip_sender.send((sender, proof)).await {
301                warn!(%error, "Failed to send incoming message");
302            }
303        } else {
304            debug!(%sender, slot = %proof.slot, "Full verification failed");
305            self.engine.lock().report(sender, rep::GOSSIP_INVALID_PROOF);
306        }
307    }
308
309    async fn handle_to_gossip_messages(&mut self, message: ToGossipMessage) {
310        match message {
311            ToGossipMessage::Proof(proof) => {
312                self.engine
313                    .lock()
314                    .gossip_message(self.topic, proof.encode(), false);
315            }
316            ToGossipMessage::NextSlotInput(next_slot_input) => {
317                self.handle_next_slot_input(next_slot_input).await;
318            }
319        }
320    }
321
322    /// Handle next slot input and try to remove outdated proofs information from internal cache as
323    /// well as produce next proof if it was already received out of order before
324    async fn handle_next_slot_input(&mut self, next_slot_input: PotNextSlotInput) {
325        let mut old_proofs = HashMap::<GossipProof, Vec<PeerId>>::new();
326
327        for (sender, proofs) in &mut self.gossip_cache.iter_mut() {
328            proofs.retain(|proof| {
329                if proof.slot > next_slot_input.slot {
330                    true
331                } else {
332                    old_proofs.entry(*proof).or_default().push(*sender);
333                    false
334                }
335            });
336        }
337
338        let mut potentially_matching_proofs = Vec::new();
339
340        for (proof, senders) in old_proofs {
341            if proof.slot != next_slot_input.slot {
342                let invalid_proof = self
343                    .pot_verifier
344                    .try_get_checkpoints(proof.slot_iterations, proof.seed)
345                    .map(|verified_checkpoints| verified_checkpoints != proof.checkpoints)
346                    .unwrap_or_default();
347
348                let engine = self.engine.lock();
349                if invalid_proof {
350                    for sender in senders {
351                        trace!(
352                            %sender,
353                            slot = %proof.slot,
354                            "Proof ended up being invalid",
355                        );
356
357                        engine.report(sender, rep::GOSSIP_INVALID_PROOF);
358                    }
359                } else {
360                    for sender in senders {
361                        trace!(
362                            %sender,
363                            slot = %proof.slot,
364                            "Proof ended up being unused",
365                        );
366
367                        engine.report(sender, rep::GOSSIP_OUTDATED_PROOF);
368                    }
369                }
370
371                continue;
372            }
373
374            if !(proof.seed == next_slot_input.seed
375                && proof.slot_iterations == next_slot_input.slot_iterations)
376            {
377                let engine = self.engine.lock();
378                for sender in senders {
379                    trace!(
380                        %sender,
381                        slot = %proof.slot,
382                        "Proof ended up not matching slot inputs",
383                    );
384
385                    engine.report(sender, rep::GOSSIP_SLOT_INPUT_MISMATCH);
386                }
387
388                continue;
389            }
390
391            potentially_matching_proofs.push((proof, senders));
392        }
393
394        // Avoid blocking gossip for too long
395        rayon::spawn({
396            let engine = Arc::clone(&self.engine);
397            let network = Arc::clone(&self.network);
398            let pot_verifier = self.pot_verifier.clone();
399            let from_gossip_sender = self.from_gossip_sender.clone();
400            let topic = self.topic;
401
402            move || {
403                Self::handle_potentially_matching_proofs(
404                    next_slot_input,
405                    potentially_matching_proofs,
406                    engine,
407                    network.as_ref(),
408                    &pot_verifier,
409                    from_gossip_sender,
410                    topic,
411                );
412            }
413        });
414    }
415
416    fn handle_potentially_matching_proofs(
417        next_slot_input: PotNextSlotInput,
418        mut potentially_matching_proofs: Vec<(GossipProof, Vec<PeerId>)>,
419        engine: Arc<Mutex<GossipEngine<Block>>>,
420        network: &dyn NetworkPeers,
421        pot_verifier: &PotVerifier,
422        mut from_gossip_sender: mpsc::Sender<(PeerId, GossipProof)>,
423        topic: Block::Hash,
424    ) {
425        if potentially_matching_proofs.is_empty() {
426            // Nothing left to do
427            return;
428        }
429
430        // This sorts from lowest reputation to highest
431        potentially_matching_proofs.sort_by_cached_key(|(_proof, peer_ids)| {
432            peer_ids
433                .iter()
434                .map(|peer_id| network.peer_reputation(peer_id))
435                .max()
436        });
437
438        // If we have too many unique proofs to verify it might be cheaper to prove it ourselves
439        let correct_proof = if potentially_matching_proofs.len() < EXPECTED_POT_VERIFICATION_SPEEDUP
440        {
441            let mut correct_proof = None;
442
443            // Verify all proofs, starting with those sent by most reputable peers
444            for (proof, _senders) in potentially_matching_proofs.iter().rev() {
445                if pot_verifier.verify_checkpoints(
446                    proof.seed,
447                    proof.slot_iterations,
448                    &proof.checkpoints,
449                ) {
450                    correct_proof.replace(*proof);
451                    break;
452                }
453            }
454
455            correct_proof
456        } else {
457            // Last proof includes peer with the highest reputation
458            let (proof, _senders) = potentially_matching_proofs
459                .last()
460                .expect("Guaranteed to be non-empty; qed");
461
462            if pot_verifier.verify_checkpoints(
463                proof.seed,
464                proof.slot_iterations,
465                &proof.checkpoints,
466            ) {
467                Some(*proof)
468            } else {
469                match subspace_proof_of_time::prove(
470                    next_slot_input.seed,
471                    next_slot_input.slot_iterations,
472                ) {
473                    Ok(checkpoints) => Some(GossipProof {
474                        slot: next_slot_input.slot,
475                        seed: next_slot_input.seed,
476                        slot_iterations: next_slot_input.slot_iterations,
477                        checkpoints,
478                    }),
479                    Err(error) => {
480                        error!(
481                            %error,
482                            slot = %next_slot_input.slot,
483                            "Failed to run proof of time, this is an implementation bug",
484                        );
485                        return;
486                    }
487                }
488            }
489        };
490
491        for (proof, senders) in potentially_matching_proofs {
492            if Some(proof) == correct_proof {
493                let mut sent = false;
494                for sender in senders {
495                    debug!(%sender, slot = %proof.slot, "Correct future proof");
496
497                    if sent {
498                        continue;
499                    }
500                    sent = true;
501
502                    engine.lock().gossip_message(topic, proof.encode(), false);
503
504                    if let Err(error) =
505                        futures::executor::block_on(from_gossip_sender.send((sender, proof)))
506                    {
507                        warn!(
508                            %error,
509                            slot = %proof.slot,
510                            "Failed to send future proof",
511                        );
512                    }
513                }
514            } else {
515                let engine = engine.lock();
516                for sender in senders {
517                    debug!(%sender, slot = %proof.slot, "Next slot proof is invalid");
518                    engine.report(sender, rep::GOSSIP_INVALID_PROOF);
519                }
520            }
521        }
522    }
523}
524
525/// Validator for gossiped messages
526struct PotGossipValidator<Block, SO, Network>
527where
528    Block: BlockT,
529{
530    state: Arc<PotState>,
531    topic: Block::Hash,
532    sync_oracle: SO,
533    network: Network,
534}
535
536impl<Block, SO, Network> PotGossipValidator<Block, SO, Network>
537where
538    Block: BlockT,
539    SO: SyncOracle,
540{
541    /// Creates the validator.
542    fn new(state: Arc<PotState>, topic: Block::Hash, sync_oracle: SO, network: Network) -> Self {
543        Self {
544            state,
545            topic,
546            sync_oracle,
547            network,
548        }
549    }
550}
551
552impl<Block, SO, Network> Validator<Block> for PotGossipValidator<Block, SO, Network>
553where
554    Block: BlockT,
555    SO: SyncOracle + Send + Sync,
556    Network: NetworkPeers + Send + Sync + 'static,
557{
558    fn validate(
559        &self,
560        _context: &mut dyn ValidatorContext<Block>,
561        sender: &PeerId,
562        mut data: &[u8],
563    ) -> ValidationResult<Block::Hash> {
564        // Ignore gossip while major syncing
565        if self.sync_oracle.is_major_syncing() {
566            return ValidationResult::Discard;
567        }
568
569        match GossipProof::decode(&mut data) {
570            Ok(proof) => {
571                let next_slot_input = self.state.next_slot_input();
572                let current_slot = next_slot_input.slot - Slot::from(1);
573
574                if proof.slot < current_slot {
575                    trace!(
576                        %sender,
577                        slot = %proof.slot,
578                        "Received proof for old slot, ignoring",
579                    );
580
581                    self.network.report_peer(*sender, rep::GOSSIP_OLD_SLOT);
582                    return ValidationResult::Discard;
583                }
584                if proof.slot > current_slot + Slot::from(MAX_SLOTS_IN_THE_FUTURE) {
585                    trace!(
586                        %sender,
587                        slot = %proof.slot,
588                        "Received proof for slot too far in the future, ignoring",
589                    );
590
591                    self.network
592                        .report_peer(*sender, rep::GOSSIP_TOO_FAR_IN_THE_FUTURE);
593                    return ValidationResult::Discard;
594                }
595                // Next slot matches expectations, but other inputs are not
596                if proof.slot == next_slot_input.slot
597                    && !(proof.seed == next_slot_input.seed
598                        && proof.slot_iterations == next_slot_input.slot_iterations)
599                {
600                    trace!(
601                        %sender,
602                        slot = %proof.slot,
603                        "Received proof with next slot mismatch, ignoring",
604                    );
605
606                    self.network
607                        .report_peer(*sender, rep::GOSSIP_NEXT_SLOT_MISMATCH);
608                    return ValidationResult::Discard;
609                }
610
611                let current_slot_iterations = next_slot_input.slot_iterations;
612
613                // Check that number of slot iterations is between current and 1.5 of current slot
614                // iterations
615                if proof.slot_iterations.get() < next_slot_input.slot_iterations.get()
616                    || proof.slot_iterations.get() > current_slot_iterations.get() * 3 / 2
617                {
618                    debug!(
619                        %sender,
620                        slot = %proof.slot,
621                        slot_iterations = %proof.slot_iterations,
622                        current_slot_iterations = %current_slot_iterations,
623                        "Slot iterations outside of reasonable range"
624                    );
625
626                    self.network
627                        .report_peer(*sender, rep::GOSSIP_SLOT_ITERATIONS_OUTSIDE_OF_RANGE);
628                    return ValidationResult::Discard;
629                }
630
631                trace!(%sender, slot = %proof.slot, "Superficial verification succeeded");
632
633                // We will fully validate and re-gossip it explicitly later if necessary
634                ValidationResult::ProcessAndDiscard(self.topic)
635            }
636            Err(error) => {
637                debug!(%error, "Gossip message couldn't be decoded");
638
639                self.network.report_peer(*sender, rep::GOSSIP_NOT_DECODABLE);
640                ValidationResult::Discard
641            }
642        }
643    }
644
645    fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
646        let current_slot = u64::from(self.state.next_slot_input().slot) - 1;
647        Box::new(move |_topic, mut data| {
648            if let Ok(proof) = GossipProof::decode(&mut data) {
649                // Slot is the only meaningful expiration policy here
650                if proof.slot >= current_slot {
651                    return false;
652                }
653            }
654
655            true
656        })
657    }
658
659    fn message_allowed<'a>(
660        &'a self,
661    ) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
662        Box::new(move |_who, intent, _topic, _data| {
663            // We do not need force broadcast or rebroadcasting
664            matches!(intent, MessageIntent::Broadcast)
665        })
666    }
667}