1use crate::utils::{AsyncJoinOnDrop, Handler, HandlerFn};
2use async_trait::async_trait;
3use event_listener_primitives::HandlerId;
4use fs2::FileExt;
5use futures::FutureExt;
6use futures::future::{Fuse, pending};
7use libp2p::multiaddr::Protocol;
8use libp2p::{Multiaddr, PeerId};
9use memmap2::{MmapMut, MmapOptions};
10use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
11use parking_lot::Mutex;
12use schnellru::{ByLength, LruMap};
13use std::collections::HashSet;
14use std::fs::OpenOptions;
15use std::io::{Read, Seek, SeekFrom};
16use std::path::Path;
17use std::pin::Pin;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21use std::{io, mem};
22use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash};
23use thiserror::Error;
24use tokio::time::{Sleep, sleep};
25use tracing::{debug, error, trace, warn};
26
27type FailureTime = Option<SystemTime>;
29
30const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
32const ADDRESSES_CACHE_SIZE: u32 = 30;
34const DATA_FLUSH_DURATION_SECS: u64 = 5;
36const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
38const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
40const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
42
43#[derive(Debug, Clone)]
45pub struct PeerAddressRemovedEvent {
46 pub peer_id: PeerId,
48 pub address: Multiaddr,
50}
51
52#[derive(Debug, Encode, Decode)]
53struct EncodableKnownPeerAddress {
54 multiaddr: Vec<u8>,
55 failure_time: Option<u64>,
57}
58
59#[derive(Debug, Encode, Decode)]
60struct EncodableKnownPeers {
61 cache_size: u32,
62 timestamp: u64,
63 known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
65}
66
67impl EncodableKnownPeers {
68 fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
69 let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
70
71 self.known_peers
73 .sort_by_cached_key(|(_peer_id, addresses)| {
74 addresses.iter().fold(0u64, |acc, address| {
75 acc.max(address.failure_time.unwrap_or(u64::MAX))
76 })
77 });
78
79 'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
81 let mut peer_cache =
82 LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
83
84 let peer_id = match PeerId::from_bytes(&peer_id) {
85 Ok(peer_id) => peer_id,
86 Err(error) => {
87 debug!(%error, "Failed to decode known peer ID, skipping peer entry");
88 continue;
89 }
90 };
91 for address in addresses {
92 let multiaddr = match Multiaddr::try_from(address.multiaddr) {
93 Ok(multiaddr) => multiaddr,
94 Err(error) => {
95 debug!(
96 %error,
97 "Failed to decode known peer multiaddress, skipping peer entry"
98 );
99 continue 'peers;
100 }
101 };
102
103 peer_cache.insert(
104 multiaddr,
105 address.failure_time.map(|failure_time| {
106 SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
107 }),
108 );
109 }
110
111 peers_cache.insert(peer_id, peer_cache);
112 }
113
114 peers_cache
115 }
116
117 fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
118 let single_peer_encoded_address_size =
119 KnownPeersManager::single_peer_encoded_address_size();
120 Self {
121 cache_size,
122 timestamp: SystemTime::now()
123 .duration_since(SystemTime::UNIX_EPOCH)
124 .expect("Never before Unix epoch; qed")
125 .as_secs(),
126 known_peers: cache
127 .iter()
128 .map(|(peer_id, addresses)| {
129 (
130 peer_id.to_bytes(),
131 addresses
132 .iter()
133 .filter_map(|(multiaddr, failure_time)| {
134 let multiaddr_bytes = multiaddr.to_vec();
135
136 if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
137 {
138 debug!(
140 encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
141 limit = %single_peer_encoded_address_size,
142 ?multiaddr,
143 "Unexpectedly large multiaddress"
144 );
145 return None;
146 }
147
148 Some(EncodableKnownPeerAddress {
149 multiaddr: multiaddr_bytes,
150 failure_time: failure_time.map(|failure_time| {
151 failure_time
152 .duration_since(SystemTime::UNIX_EPOCH)
153 .expect("Never before Unix epoch; qed")
154 .as_secs()
155 }),
156 })
157 })
158 .collect(),
159 )
160 })
161 .collect(),
162 }
163 }
164}
165
166struct KnownPeersSlots {
168 a: MmapMut,
169 b: MmapMut,
170}
171
172impl KnownPeersSlots {
173 fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
174 let known_peers_bytes = encodable_known_peers.encode();
175 let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
176 encoded_bytes.copy_from_slice(&known_peers_bytes);
177 remaining_bytes[..Blake3Hash::SIZE]
179 .copy_from_slice(blake3_hash(&known_peers_bytes).as_ref());
180 if let Err(error) = self.a.flush() {
181 warn!(%error, "Failed to flush known peers to disk");
182 }
183
184 mem::swap(&mut self.a, &mut self.b);
186 }
187}
188
189#[async_trait]
191pub trait KnownPeersRegistry: Send + Sync {
192 async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
194
195 async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
197
198 fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
200
201 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
203
204 fn count_known_peers(&mut self) -> (usize, usize);
206
207 async fn run(&mut self);
209
210 fn on_unreachable_address(
215 &mut self,
216 handler: HandlerFn<PeerAddressRemovedEvent>,
217 ) -> Option<HandlerId>;
218}
219
220#[derive(Clone, Default)]
222pub(crate) struct StubNetworkingParametersManager;
223
224impl StubNetworkingParametersManager {
225 pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
227 Box::new(self)
228 }
229}
230
231#[async_trait]
232impl KnownPeersRegistry for StubNetworkingParametersManager {
233 async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
234
235 async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
236
237 fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
238
239 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
240 Vec::new()
241 }
242
243 fn count_known_peers(&mut self) -> (usize, usize) {
244 (0, 0)
245 }
246
247 async fn run(&mut self) {
248 futures::future::pending().await
250 }
251
252 fn on_unreachable_address(
253 &mut self,
254 _handler: HandlerFn<PeerAddressRemovedEvent>,
255 ) -> Option<HandlerId> {
256 None
257 }
258}
259
260#[derive(Debug, Clone)]
262pub struct KnownPeersManagerConfig {
263 pub enable_known_peers_source: bool,
265 pub cache_size: u32,
267 pub ignore_peer_list: HashSet<PeerId>,
269 pub path: Option<Box<Path>>,
271 pub failed_address_cache_removal_interval: Duration,
273 pub failed_address_kademlia_removal_interval: Duration,
275 pub stale_known_peers_timeout: Duration,
277}
278
279impl Default for KnownPeersManagerConfig {
280 fn default() -> Self {
281 Self {
282 enable_known_peers_source: true,
283 cache_size: KNOWN_PEERS_CACHE_SIZE,
284 ignore_peer_list: Default::default(),
285 path: None,
286 failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
287 failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
288 stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
289 }
290 }
291}
292
293#[derive(Debug, Error)]
295pub enum KnownPeersManagerPersistenceError {
296 #[error("I/O error: {0}")]
298 Io(#[from] io::Error),
299 #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
301 CantPreallocateKnownPeersFile(io::Error),
302}
303
304pub struct KnownPeersManager {
306 cache_need_saving: bool,
308 known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
310 networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
312 known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
314 address_removed: Handler<PeerAddressRemovedEvent>,
316 config: KnownPeersManagerConfig,
318}
319
320impl Drop for KnownPeersManager {
321 fn drop(&mut self) {
322 if self.cache_need_saving
323 && let Some(known_peers_slots) = &self.known_peers_slots
324 {
325 known_peers_slots
326 .lock()
327 .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
328 &self.known_peers,
329 self.config.cache_size,
330 ));
331 }
332 }
333}
334
335impl KnownPeersManager {
336 fn init_file(
337 path: &Path,
338 cache_size: u32,
339 ) -> Result<
340 (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
341 KnownPeersManagerPersistenceError,
342 > {
343 let mut file = OpenOptions::new()
344 .read(true)
345 .write(true)
346 .create(true)
347 .truncate(false)
348 .open(path)?;
349
350 let known_addresses_size = Self::known_addresses_size(cache_size);
351 let file_size = Self::file_size(cache_size);
352 let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
354
355 {
356 let mut file_contents = Vec::with_capacity(file_size);
357 file.read_to_end(&mut file_contents)?;
358 if !file_contents.is_empty() {
359 for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
360 let known_addresses =
361 match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
362 Ok(known_addresses) => known_addresses,
363 Err(error) => {
364 debug!(%error, "Failed to decode encodable known peers");
365 continue;
366 }
367 };
368
369 let (encoded_bytes, remaining_bytes) =
370 known_addresses_bytes.split_at(known_addresses.encoded_size());
371 if remaining_bytes.len() < Blake3Hash::SIZE {
372 debug!(
373 remaining_bytes = %remaining_bytes.len(),
374 "Not enough bytes to decode checksum, file was likely corrupted"
375 );
376 continue;
377 }
378
379 let actual_checksum = blake3_hash(encoded_bytes);
381 let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
382 if *actual_checksum != *expected_checksum {
383 debug!(
384 encoded_bytes_len = %encoded_bytes.len(),
385 actual_checksum = %hex::encode(actual_checksum),
386 expected_checksum = %hex::encode(expected_checksum),
387 "Hash doesn't match, possible disk corruption or file was just \
388 created, ignoring"
389 );
390 continue;
391 }
392
393 match &mut maybe_newest_known_addresses {
394 Some(newest_known_addresses) => {
395 if newest_known_addresses.timestamp < known_addresses.timestamp {
396 *newest_known_addresses = known_addresses;
397 }
398 }
399 None => {
400 maybe_newest_known_addresses.replace(known_addresses);
401 }
402 }
403 }
404 }
405 }
406
407 let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
409 file.allocate(file_size as u64)
412 .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
413 file.set_len(file_size as u64)?;
415 true
416 } else {
417 false
418 };
419
420 let mut a_mmap = unsafe {
421 MmapOptions::new()
422 .len(known_addresses_size)
423 .map_mut(&file)?
424 };
425 let mut b_mmap = unsafe {
426 MmapOptions::new()
427 .offset(known_addresses_size as u64)
428 .len(known_addresses_size)
429 .map_mut(&file)?
430 };
431
432 if file_resized {
433 if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
435 let bytes = newest_known_addresses.encode();
436 a_mmap[..bytes.len()].copy_from_slice(&bytes);
437 a_mmap.flush()?;
438 b_mmap[..bytes.len()].copy_from_slice(&bytes);
439 b_mmap.flush()?;
440 }
441 }
442
443 let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
444 a: a_mmap,
445 b: b_mmap,
446 }));
447
448 Ok((maybe_newest_known_addresses, known_peers_slots))
449 }
450
451 pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
453 let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
454 Self::init_file(path, config.cache_size)
455 .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
456 } else {
457 (None, None)
458 };
459
460 let known_peers = maybe_newest_known_addresses
461 .filter(|newest_known_addresses| {
462 let time_since_unix_epoch = SystemTime::now()
463 .duration_since(SystemTime::UNIX_EPOCH)
464 .expect("Never before Unix epoch; qed");
465 let known_peers_age = time_since_unix_epoch
466 .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
467
468 known_peers_age <= config.stale_known_peers_timeout
469 })
470 .map(EncodableKnownPeers::into_cache)
471 .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
472
473 Ok(Self {
474 cache_need_saving: false,
475 known_peers,
476 networking_parameters_save_delay: Self::default_delay(),
477 known_peers_slots,
478 address_removed: Default::default(),
479 config,
480 })
481 }
482
483 pub fn file_size(cache_size: u32) -> usize {
485 Self::known_addresses_size(cache_size) * 2
487 }
488
489 pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
491 Box::new(self)
492 }
493
494 fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
496 Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
497 }
498
499 fn single_peer_encoded_address_size() -> usize {
500 let multiaddr = Multiaddr::from_str(
501 "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
502 )
503 .expect("Valid multiaddr; qed");
504 multiaddr.to_vec().encoded_size() * 3
507 }
508
509 fn single_peer_encoded_size() -> usize {
512 PeerId::random().to_bytes().encoded_size()
515 + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
516 + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
517 * ADDRESSES_CACHE_SIZE as usize
518 }
519
520 fn known_addresses_size(cache_size: u32) -> usize {
525 mem::size_of::<u64>()
528 + Compact::compact_len(&(cache_size))
529 + Self::single_peer_encoded_size() * cache_size as usize
530 + Blake3Hash::SIZE
531 }
532
533 fn persistent_enabled(&self) -> bool {
534 self.config.path.is_some()
535 }
536
537 #[cfg(test)]
538 pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
539 self.known_peers
540 .peek(peer_id)
541 .map(|addresses| addresses.peek(address).is_some())
542 .unwrap_or_default()
543 }
544}
545
546#[async_trait]
547impl KnownPeersRegistry for KnownPeersManager {
548 async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
549 if self.config.ignore_peer_list.contains(&peer_id) {
550 debug!(
551 %peer_id,
552 addr_num=addresses.len(),
553 "Adding new peer addresses canceled (ignore list): {:?}",
554 addresses
555 );
556
557 return;
558 }
559
560 debug!(
561 %peer_id,
562 addr_num=addresses.len(),
563 "Add new peer addresses to the networking parameters registry: {:?}",
564 addresses
565 );
566
567 addresses
568 .iter()
569 .filter(|addr| {
570 !addr
572 .into_iter()
573 .any(|protocol| matches!(protocol, Protocol::Memory(..)))
574 })
575 .cloned()
576 .map(remove_p2p_suffix)
577 .for_each(|addr| {
578 self.known_peers
580 .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
581
582 if let Some(addresses) = self.known_peers.get(&peer_id) {
583 let previous_entry = addresses.peek(&addr).cloned().flatten();
584 addresses.insert(addr, None);
585 if let Some(previous_entry) = previous_entry {
586 trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
587 }
588 }
589 });
590
591 self.cache_need_saving = true;
592 }
593
594 async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
595 trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
596
597 let removed_addresses = remove_known_peer_addresses_internal(
598 &mut self.known_peers,
599 peer_id,
600 addresses,
601 self.config.failed_address_cache_removal_interval,
602 self.config.failed_address_kademlia_removal_interval,
603 );
604
605 for event in removed_addresses {
606 self.address_removed.call_simple(&event);
607 }
608
609 self.cache_need_saving = true;
610 }
611
612 fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
613 trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
614
615 self.known_peers.remove(&peer_id);
616
617 self.cache_need_saving = true;
618 }
619
620 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
621 if !self.config.enable_known_peers_source {
622 return Vec::new();
623 }
624
625 self.known_peers
626 .iter()
627 .map(|(&peer_id, addresses)| {
628 (
629 peer_id,
630 addresses
631 .iter()
632 .map(|(addr, _failure_time)| addr.clone())
633 .collect(),
634 )
635 })
636 .collect()
637 }
638
639 fn count_known_peers(&mut self) -> (usize, usize) {
640 if !self.config.enable_known_peers_source {
641 return (0, 0);
642 }
643
644 (
645 self.known_peers.len(),
646 self.known_peers
647 .iter()
648 .map(|(_peer, addresses)| addresses.len())
649 .sum(),
650 )
651 }
652
653 async fn run(&mut self) {
654 if !self.persistent_enabled() {
655 pending().await
656 }
657
658 loop {
659 (&mut self.networking_parameters_save_delay).await;
660
661 if let Some(known_peers_slots) = &self.known_peers_slots
662 && self.cache_need_saving
663 {
664 let known_peers =
665 EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
666 let known_peers_slots = Arc::clone(known_peers_slots);
667 let write_known_peers_fut =
668 AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || {
669 known_peers_slots
670 .lock()
671 .write_to_inactive_slot(&known_peers);
672 }));
673
674 if let Err(error) = write_known_peers_fut.await {
675 error!(%error, "Failed to write known peers");
676 }
677
678 self.cache_need_saving = false;
679 }
680 self.networking_parameters_save_delay = KnownPeersManager::default_delay();
681 }
682 }
683
684 fn on_unreachable_address(
685 &mut self,
686 handler: HandlerFn<PeerAddressRemovedEvent>,
687 ) -> Option<HandlerId> {
688 let handler_id = self.address_removed.add(handler);
689
690 Some(handler_id)
691 }
692}
693
694pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
696 let last_protocol = address.pop();
697
698 if let Some(Protocol::P2p(_)) = &last_protocol {
699 return address;
700 }
701
702 if let Some(protocol) = last_protocol {
703 address.push(protocol)
704 }
705
706 address
707}
708
709pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
711 let last_protocol = address.pop();
712
713 if let Some(protocol) = last_protocol
714 && !matches!(protocol, Protocol::P2p(..))
715 {
716 address.push(protocol)
717 }
718 address.push(Protocol::P2p(peer_id));
719
720 address
721}
722
723pub(super) fn remove_known_peer_addresses_internal(
725 known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
726 peer_id: PeerId,
727 addresses: Vec<Multiaddr>,
728 expired_address_duration_persistent_storage: Duration,
729 expired_address_duration_kademlia: Duration,
730) -> Vec<PeerAddressRemovedEvent> {
731 let mut address_removed_events = Vec::new();
732 let now = SystemTime::now();
733
734 addresses
735 .into_iter()
736 .map(remove_p2p_suffix)
737 .for_each(|addr| {
738 if let Some(addresses) = known_peers.peek_mut(&peer_id) {
740 let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
741 if let Some(first_failed_time) = addresses.peek_mut(&addr) {
744 if let Some(time) = first_failed_time {
746 if *time + expired_address_duration_kademlia < now {
748 let address_removed = PeerAddressRemovedEvent {
749 peer_id,
750 address: addr.clone(),
751 };
752
753 address_removed_events.push(address_removed);
754
755 trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
756 }
757
758 if *time + expired_address_duration_persistent_storage < now {
760 addresses.remove(&addr);
762
763 if last_address {
765 known_peers.remove(&peer_id);
766
767 trace!(%peer_id, "Peer removed from the cache");
768 }
769
770 trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
771 } else {
772 trace!(
773 %peer_id, "Saving failed connection attempt to a peer: {:?}",
774 addr
775 );
776 }
777 } else {
778 first_failed_time.replace(now);
780
781 trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
782 }
783 }
784 }
785 });
786
787 address_removed_events
788}