1use crate::source::state::PotState;
4use crate::verifier::PotVerifier;
5use futures::channel::mpsc;
6use futures::{FutureExt, SinkExt, StreamExt};
7use parity_scale_codec::{Decode, Encode};
8use parking_lot::Mutex;
9use sc_network::config::NonDefaultSetConfig;
10use sc_network::{NetworkPeers, NotificationService, PeerId};
11use sc_network_gossip::{
12 GossipEngine, MessageIntent, Network as GossipNetwork, Syncing as GossipSyncing,
13 ValidationResult, Validator, ValidatorContext,
14};
15use schnellru::{ByLength, LruMap};
16use sp_consensus::SyncOracle;
17use sp_consensus_slots::Slot;
18use sp_consensus_subspace::PotNextSlotInput;
19use sp_runtime::traits::{Block as BlockT, Hash as HashT, HashingFor};
20use std::cmp;
21use std::collections::{HashMap, VecDeque};
22use std::future::poll_fn;
23use std::num::NonZeroU32;
24use std::pin::pin;
25use std::sync::Arc;
26use subspace_core_primitives::pot::{PotCheckpoints, PotSeed};
27use tracing::{debug, error, trace, warn};
28
29const MAX_SLOTS_IN_THE_FUTURE: u64 = 10;
31const EXPECTED_POT_VERIFICATION_SPEEDUP: usize = 7;
33const GOSSIP_CACHE_PEER_COUNT: u32 = 1_000;
34const GOSSIP_CACHE_PER_PEER_SIZE: usize = 20;
35
36mod rep {
37 use sc_network::ReputationChange;
38
39 pub(super) const GOSSIP_NOT_DECODABLE: ReputationChange =
41 ReputationChange::new(-(1 << 3), "PoT: not decodable");
42 pub(super) const GOSSIP_NEXT_SLOT_MISMATCH: ReputationChange =
44 ReputationChange::new(-(1 << 5), "PoT: next slot mismatch");
45 pub(super) const GOSSIP_OLD_SLOT: ReputationChange =
47 ReputationChange::new(-(1 << 5), "PoT: old slot");
48 pub(super) const GOSSIP_TOO_FAR_IN_THE_FUTURE: ReputationChange =
51 ReputationChange::new(-(1 << 5), "PoT: slot too far in the future");
52 pub(super) const GOSSIP_SLOT_ITERATIONS_OUTSIDE_OF_RANGE: ReputationChange =
55 ReputationChange::new(-(1 << 5), "PoT: slot iterations outside of range");
56 pub(super) const GOSSIP_OUTDATED_PROOF: ReputationChange =
59 ReputationChange::new(-(1 << 5), "PoT: outdated proof");
60 pub(super) const GOSSIP_TOO_MANY_PROOFS: ReputationChange =
62 ReputationChange::new(-(1 << 5), "PoT: too many proofs");
63 pub(super) const GOSSIP_SLOT_INPUT_MISMATCH: ReputationChange =
66 ReputationChange::new(-(1 << 5), "PoT: slot input mismatch");
67 pub(super) const GOSSIP_INVALID_PROOF: ReputationChange =
69 ReputationChange::new_fatal("PoT: Invalid proof");
70}
71
72const GOSSIP_PROTOCOL: &str = "/subspace/subspace-proof-of-time/1";
73
74pub fn pot_gossip_peers_set_config() -> (
76 NonDefaultSetConfig,
77 Box<dyn sc_network::NotificationService>,
78) {
79 let (mut cfg, notification_service) = NonDefaultSetConfig::new(
80 GOSSIP_PROTOCOL.into(),
81 Vec::new(),
82 1024,
83 None,
84 Default::default(),
85 );
86 cfg.allow_non_reserved(25, 25);
87 (cfg, notification_service)
88}
89
90#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Encode, Decode)]
91pub(super) struct GossipProof {
92 pub(super) slot: Slot,
94 pub(super) seed: PotSeed,
96 pub(super) slot_iterations: NonZeroU32,
98 pub(super) checkpoints: PotCheckpoints,
100}
101
102#[derive(Debug)]
103pub(super) enum ToGossipMessage {
104 Proof(GossipProof),
105 NextSlotInput(PotNextSlotInput),
106}
107
108#[must_use = "Gossip worker doesn't do anything unless run() method is called"]
110pub struct PotGossipWorker<Block>
111where
112 Block: BlockT,
113{
114 engine: Arc<Mutex<GossipEngine<Block>>>,
115 network: Arc<dyn NetworkPeers + Send + Sync>,
116 topic: Block::Hash,
117 state: Arc<PotState>,
118 pot_verifier: PotVerifier,
119 gossip_cache: LruMap<PeerId, VecDeque<GossipProof>>,
120 to_gossip_receiver: mpsc::Receiver<ToGossipMessage>,
121 from_gossip_sender: mpsc::Sender<(PeerId, GossipProof)>,
122}
123
124impl<Block> PotGossipWorker<Block>
125where
126 Block: BlockT,
127{
128 #[allow(clippy::too_many_arguments)]
131 pub(super) fn new<Network, GossipSync, SO>(
132 to_gossip_receiver: mpsc::Receiver<ToGossipMessage>,
133 from_gossip_sender: mpsc::Sender<(PeerId, GossipProof)>,
134 pot_verifier: PotVerifier,
135 state: Arc<PotState>,
136 network: Network,
137 notification_service: Box<dyn NotificationService>,
138 sync: Arc<GossipSync>,
139 sync_oracle: SO,
140 ) -> Self
141 where
142 Network: GossipNetwork<Block> + NetworkPeers + Send + Sync + Clone + 'static,
143 GossipSync: GossipSyncing<Block> + 'static,
144 SO: SyncOracle + Send + Sync + 'static,
145 {
146 let topic = HashingFor::<Block>::hash(b"proofs");
147
148 let validator = Arc::new(PotGossipValidator::new(
149 Arc::clone(&state),
150 topic,
151 sync_oracle,
152 network.clone(),
153 ));
154 let engine = GossipEngine::new(
155 network.clone(),
156 sync,
157 notification_service,
158 GOSSIP_PROTOCOL,
159 validator,
160 None,
161 );
162
163 Self {
164 engine: Arc::new(Mutex::new(engine)),
165 network: Arc::new(network),
166 topic,
167 state,
168 pot_verifier,
169 gossip_cache: LruMap::new(ByLength::new(GOSSIP_CACHE_PEER_COUNT)),
170 to_gossip_receiver,
171 from_gossip_sender,
172 }
173 }
174
175 pub async fn run(mut self) {
180 let message_receiver = self.engine.lock().messages_for(self.topic);
181 let incoming_unverified_messages =
182 pin!(message_receiver.filter_map(|notification| async move {
183 notification.sender.map(|sender| {
184 let proof = GossipProof::decode(&mut notification.message.as_ref())
185 .expect("Only valid messages get here; qed");
186
187 (sender, proof)
188 })
189 }));
190 let mut incoming_unverified_messages = incoming_unverified_messages.fuse();
191
192 loop {
193 let mut gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)).fuse();
194
195 futures::select! {
196 (sender, proof) = incoming_unverified_messages.select_next_some() => {
197 self.handle_proof_candidate(sender, proof).await;
198 },
199 message = self.to_gossip_receiver.select_next_some() => {
200 self.handle_to_gossip_messages(message).await
201 },
202 _ = gossip_engine_poll => {
203 error!("Gossip engine has terminated");
204 return;
205 }
206 }
207 }
208 }
209
210 async fn handle_proof_candidate(&mut self, sender: PeerId, proof: GossipProof) {
211 let next_slot_input = self.state.next_slot_input();
212
213 match proof.slot.cmp(&next_slot_input.slot) {
214 cmp::Ordering::Less => {
215 trace!(
216 %sender,
217 slot = %proof.slot,
218 next_slot = %next_slot_input.slot,
219 "Proof for outdated slot, ignoring",
220 );
221
222 if let Some(verified_checkpoints) = self
223 .pot_verifier
224 .try_get_checkpoints(proof.slot_iterations, proof.seed)
225 {
226 if verified_checkpoints != proof.checkpoints {
227 trace!(
228 %sender,
229 slot = %proof.slot,
230 "Invalid old proof, punishing sender",
231 );
232
233 self.engine.lock().report(sender, rep::GOSSIP_INVALID_PROOF);
234 }
235 } else {
236 self.engine
238 .lock()
239 .report(sender, rep::GOSSIP_OUTDATED_PROOF);
240 }
241
242 return;
243 }
244 cmp::Ordering::Equal => {
245 if !(proof.seed == next_slot_input.seed
246 && proof.slot_iterations == next_slot_input.slot_iterations)
247 {
248 trace!(
249 %sender,
250 slot = %proof.slot,
251 "Proof with next slot mismatch, ignoring",
252 );
253
254 self.engine
255 .lock()
256 .report(sender, rep::GOSSIP_NEXT_SLOT_MISMATCH);
257 return;
258 }
259 }
260 cmp::Ordering::Greater => {
261 trace!(
262 %sender,
263 slot = %proof.slot,
264 next_slot = %next_slot_input.slot,
265 "Proof from the future",
266 );
267
268 if let Some(proofs) = self.gossip_cache.get_or_insert(sender, Default::default) {
269 if proofs.len() == GOSSIP_CACHE_PER_PEER_SIZE {
270 if let Some(proof) = proofs.pop_front() {
271 trace!(
272 %sender,
273 slot = %proof.slot,
274 next_slot = %next_slot_input.slot,
275 "Too many proofs stored from peer",
276 );
277
278 self.engine
279 .lock()
280 .report(sender, rep::GOSSIP_TOO_MANY_PROOFS);
281 }
282 }
283 proofs.push_back(proof);
284 return;
285 }
286 }
287 }
288
289 if self.pot_verifier.verify_checkpoints(
290 proof.seed,
291 proof.slot_iterations,
292 &proof.checkpoints,
293 ) {
294 debug!(%sender, slot = %proof.slot, "Full verification succeeded");
295
296 self.engine
297 .lock()
298 .gossip_message(self.topic, proof.encode(), false);
299
300 if let Err(error) = self.from_gossip_sender.send((sender, proof)).await {
301 warn!(%error, "Failed to send incoming message");
302 }
303 } else {
304 debug!(%sender, slot = %proof.slot, "Full verification failed");
305 self.engine.lock().report(sender, rep::GOSSIP_INVALID_PROOF);
306 }
307 }
308
309 async fn handle_to_gossip_messages(&mut self, message: ToGossipMessage) {
310 match message {
311 ToGossipMessage::Proof(proof) => {
312 self.engine
313 .lock()
314 .gossip_message(self.topic, proof.encode(), false);
315 }
316 ToGossipMessage::NextSlotInput(next_slot_input) => {
317 self.handle_next_slot_input(next_slot_input).await;
318 }
319 }
320 }
321
322 async fn handle_next_slot_input(&mut self, next_slot_input: PotNextSlotInput) {
325 let mut old_proofs = HashMap::<GossipProof, Vec<PeerId>>::new();
326
327 for (sender, proofs) in &mut self.gossip_cache.iter_mut() {
328 proofs.retain(|proof| {
329 if proof.slot > next_slot_input.slot {
330 true
331 } else {
332 old_proofs.entry(*proof).or_default().push(*sender);
333 false
334 }
335 });
336 }
337
338 let mut potentially_matching_proofs = Vec::new();
339
340 for (proof, senders) in old_proofs {
341 if proof.slot != next_slot_input.slot {
342 let invalid_proof = self
343 .pot_verifier
344 .try_get_checkpoints(proof.slot_iterations, proof.seed)
345 .map(|verified_checkpoints| verified_checkpoints != proof.checkpoints)
346 .unwrap_or_default();
347
348 let engine = self.engine.lock();
349 if invalid_proof {
350 for sender in senders {
351 trace!(
352 %sender,
353 slot = %proof.slot,
354 "Proof ended up being invalid",
355 );
356
357 engine.report(sender, rep::GOSSIP_INVALID_PROOF);
358 }
359 } else {
360 for sender in senders {
361 trace!(
362 %sender,
363 slot = %proof.slot,
364 "Proof ended up being unused",
365 );
366
367 engine.report(sender, rep::GOSSIP_OUTDATED_PROOF);
368 }
369 }
370
371 continue;
372 }
373
374 if !(proof.seed == next_slot_input.seed
375 && proof.slot_iterations == next_slot_input.slot_iterations)
376 {
377 let engine = self.engine.lock();
378 for sender in senders {
379 trace!(
380 %sender,
381 slot = %proof.slot,
382 "Proof ended up not matching slot inputs",
383 );
384
385 engine.report(sender, rep::GOSSIP_SLOT_INPUT_MISMATCH);
386 }
387
388 continue;
389 }
390
391 potentially_matching_proofs.push((proof, senders));
392 }
393
394 rayon::spawn({
396 let engine = Arc::clone(&self.engine);
397 let network = Arc::clone(&self.network);
398 let pot_verifier = self.pot_verifier.clone();
399 let from_gossip_sender = self.from_gossip_sender.clone();
400 let topic = self.topic;
401
402 move || {
403 Self::handle_potentially_matching_proofs(
404 next_slot_input,
405 potentially_matching_proofs,
406 engine,
407 network.as_ref(),
408 &pot_verifier,
409 from_gossip_sender,
410 topic,
411 );
412 }
413 });
414 }
415
416 fn handle_potentially_matching_proofs(
417 next_slot_input: PotNextSlotInput,
418 mut potentially_matching_proofs: Vec<(GossipProof, Vec<PeerId>)>,
419 engine: Arc<Mutex<GossipEngine<Block>>>,
420 network: &dyn NetworkPeers,
421 pot_verifier: &PotVerifier,
422 mut from_gossip_sender: mpsc::Sender<(PeerId, GossipProof)>,
423 topic: Block::Hash,
424 ) {
425 if potentially_matching_proofs.is_empty() {
426 return;
428 }
429
430 potentially_matching_proofs.sort_by_cached_key(|(_proof, peer_ids)| {
432 peer_ids
433 .iter()
434 .map(|peer_id| network.peer_reputation(peer_id))
435 .max()
436 });
437
438 let correct_proof = if potentially_matching_proofs.len() < EXPECTED_POT_VERIFICATION_SPEEDUP
440 {
441 let mut correct_proof = None;
442
443 for (proof, _senders) in potentially_matching_proofs.iter().rev() {
445 if pot_verifier.verify_checkpoints(
446 proof.seed,
447 proof.slot_iterations,
448 &proof.checkpoints,
449 ) {
450 correct_proof.replace(*proof);
451 break;
452 }
453 }
454
455 correct_proof
456 } else {
457 let (proof, _senders) = potentially_matching_proofs
459 .last()
460 .expect("Guaranteed to be non-empty; qed");
461
462 if pot_verifier.verify_checkpoints(
463 proof.seed,
464 proof.slot_iterations,
465 &proof.checkpoints,
466 ) {
467 Some(*proof)
468 } else {
469 match subspace_proof_of_time::prove(
470 next_slot_input.seed,
471 next_slot_input.slot_iterations,
472 ) {
473 Ok(checkpoints) => Some(GossipProof {
474 slot: next_slot_input.slot,
475 seed: next_slot_input.seed,
476 slot_iterations: next_slot_input.slot_iterations,
477 checkpoints,
478 }),
479 Err(error) => {
480 error!(
481 %error,
482 slot = %next_slot_input.slot,
483 "Failed to run proof of time, this is an implementation bug",
484 );
485 return;
486 }
487 }
488 }
489 };
490
491 for (proof, senders) in potentially_matching_proofs {
492 if Some(proof) == correct_proof {
493 let mut sent = false;
494 for sender in senders {
495 debug!(%sender, slot = %proof.slot, "Correct future proof");
496
497 if sent {
498 continue;
499 }
500 sent = true;
501
502 engine.lock().gossip_message(topic, proof.encode(), false);
503
504 if let Err(error) =
505 futures::executor::block_on(from_gossip_sender.send((sender, proof)))
506 {
507 warn!(
508 %error,
509 slot = %proof.slot,
510 "Failed to send future proof",
511 );
512 }
513 }
514 } else {
515 let engine = engine.lock();
516 for sender in senders {
517 debug!(%sender, slot = %proof.slot, "Next slot proof is invalid");
518 engine.report(sender, rep::GOSSIP_INVALID_PROOF);
519 }
520 }
521 }
522 }
523}
524
525struct PotGossipValidator<Block, SO, Network>
527where
528 Block: BlockT,
529{
530 state: Arc<PotState>,
531 topic: Block::Hash,
532 sync_oracle: SO,
533 network: Network,
534}
535
536impl<Block, SO, Network> PotGossipValidator<Block, SO, Network>
537where
538 Block: BlockT,
539 SO: SyncOracle,
540{
541 fn new(state: Arc<PotState>, topic: Block::Hash, sync_oracle: SO, network: Network) -> Self {
543 Self {
544 state,
545 topic,
546 sync_oracle,
547 network,
548 }
549 }
550}
551
552impl<Block, SO, Network> Validator<Block> for PotGossipValidator<Block, SO, Network>
553where
554 Block: BlockT,
555 SO: SyncOracle + Send + Sync,
556 Network: NetworkPeers + Send + Sync + 'static,
557{
558 fn validate(
559 &self,
560 _context: &mut dyn ValidatorContext<Block>,
561 sender: &PeerId,
562 mut data: &[u8],
563 ) -> ValidationResult<Block::Hash> {
564 if self.sync_oracle.is_major_syncing() {
566 return ValidationResult::Discard;
567 }
568
569 match GossipProof::decode(&mut data) {
570 Ok(proof) => {
571 let next_slot_input = self.state.next_slot_input();
572 let current_slot = next_slot_input.slot - Slot::from(1);
573
574 if proof.slot < current_slot {
575 trace!(
576 %sender,
577 slot = %proof.slot,
578 "Received proof for old slot, ignoring",
579 );
580
581 self.network.report_peer(*sender, rep::GOSSIP_OLD_SLOT);
582 return ValidationResult::Discard;
583 }
584 if proof.slot > current_slot + Slot::from(MAX_SLOTS_IN_THE_FUTURE) {
585 trace!(
586 %sender,
587 slot = %proof.slot,
588 "Received proof for slot too far in the future, ignoring",
589 );
590
591 self.network
592 .report_peer(*sender, rep::GOSSIP_TOO_FAR_IN_THE_FUTURE);
593 return ValidationResult::Discard;
594 }
595 if proof.slot == next_slot_input.slot
597 && !(proof.seed == next_slot_input.seed
598 && proof.slot_iterations == next_slot_input.slot_iterations)
599 {
600 trace!(
601 %sender,
602 slot = %proof.slot,
603 "Received proof with next slot mismatch, ignoring",
604 );
605
606 self.network
607 .report_peer(*sender, rep::GOSSIP_NEXT_SLOT_MISMATCH);
608 return ValidationResult::Discard;
609 }
610
611 let current_slot_iterations = next_slot_input.slot_iterations;
612
613 if proof.slot_iterations.get() < next_slot_input.slot_iterations.get()
616 || proof.slot_iterations.get() > current_slot_iterations.get() * 3 / 2
617 {
618 debug!(
619 %sender,
620 slot = %proof.slot,
621 slot_iterations = %proof.slot_iterations,
622 current_slot_iterations = %current_slot_iterations,
623 "Slot iterations outside of reasonable range"
624 );
625
626 self.network
627 .report_peer(*sender, rep::GOSSIP_SLOT_ITERATIONS_OUTSIDE_OF_RANGE);
628 return ValidationResult::Discard;
629 }
630
631 trace!(%sender, slot = %proof.slot, "Superficial verification succeeded");
632
633 ValidationResult::ProcessAndDiscard(self.topic)
635 }
636 Err(error) => {
637 debug!(%error, "Gossip message couldn't be decoded");
638
639 self.network.report_peer(*sender, rep::GOSSIP_NOT_DECODABLE);
640 ValidationResult::Discard
641 }
642 }
643 }
644
645 fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
646 let current_slot = u64::from(self.state.next_slot_input().slot) - 1;
647 Box::new(move |_topic, mut data| {
648 if let Ok(proof) = GossipProof::decode(&mut data) {
649 if proof.slot >= current_slot {
651 return false;
652 }
653 }
654
655 true
656 })
657 }
658
659 fn message_allowed<'a>(
660 &'a self,
661 ) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
662 Box::new(move |_who, intent, _topic, _data| {
663 matches!(intent, MessageIntent::Broadcast)
665 })
666 }
667}