1use crate::utils::{Handler, HandlerFn};
2use async_trait::async_trait;
3use event_listener_primitives::HandlerId;
4use fs2::FileExt;
5use futures::FutureExt;
6use futures::future::{Fuse, pending};
7use libp2p::multiaddr::Protocol;
8use libp2p::{Multiaddr, PeerId};
9use memmap2::{MmapMut, MmapOptions};
10use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
11use parking_lot::Mutex;
12use schnellru::{ByLength, LruMap};
13use std::collections::HashSet;
14use std::fs::OpenOptions;
15use std::io::{Read, Seek, SeekFrom};
16use std::path::Path;
17use std::pin::Pin;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21use std::{io, mem};
22use subspace_core_primitives::hashes::{Blake3Hash, blake3_hash};
23use subspace_process::AsyncJoinOnDrop;
24use thiserror::Error;
25use tokio::time::{Sleep, sleep};
26use tracing::{debug, error, trace, warn};
27
28type FailureTime = Option<SystemTime>;
30
31const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
33const ADDRESSES_CACHE_SIZE: u32 = 30;
35const DATA_FLUSH_DURATION_SECS: u64 = 5;
37const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
39const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
41const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
43
44#[derive(Debug, Clone)]
46pub struct PeerAddressRemovedEvent {
47 pub peer_id: PeerId,
49 pub address: Multiaddr,
51}
52
53#[derive(Debug, Encode, Decode)]
54struct EncodableKnownPeerAddress {
55 multiaddr: Vec<u8>,
56 failure_time: Option<u64>,
58}
59
60#[derive(Debug, Encode, Decode)]
61struct EncodableKnownPeers {
62 cache_size: u32,
63 timestamp: u64,
64 known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
66}
67
68impl EncodableKnownPeers {
69 fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
70 let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
71
72 self.known_peers
74 .sort_by_cached_key(|(_peer_id, addresses)| {
75 addresses.iter().fold(0u64, |acc, address| {
76 acc.max(address.failure_time.unwrap_or(u64::MAX))
77 })
78 });
79
80 'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
82 let mut peer_cache =
83 LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
84
85 let peer_id = match PeerId::from_bytes(&peer_id) {
86 Ok(peer_id) => peer_id,
87 Err(error) => {
88 debug!(%error, "Failed to decode known peer ID, skipping peer entry");
89 continue;
90 }
91 };
92 for address in addresses {
93 let multiaddr = match Multiaddr::try_from(address.multiaddr) {
94 Ok(multiaddr) => multiaddr,
95 Err(error) => {
96 debug!(
97 %error,
98 "Failed to decode known peer multiaddress, skipping peer entry"
99 );
100 continue 'peers;
101 }
102 };
103
104 peer_cache.insert(
105 multiaddr,
106 address.failure_time.map(|failure_time| {
107 SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
108 }),
109 );
110 }
111
112 peers_cache.insert(peer_id, peer_cache);
113 }
114
115 peers_cache
116 }
117
118 fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
119 let single_peer_encoded_address_size =
120 KnownPeersManager::single_peer_encoded_address_size();
121 Self {
122 cache_size,
123 timestamp: SystemTime::now()
124 .duration_since(SystemTime::UNIX_EPOCH)
125 .expect("Never before Unix epoch; qed")
126 .as_secs(),
127 known_peers: cache
128 .iter()
129 .map(|(peer_id, addresses)| {
130 (
131 peer_id.to_bytes(),
132 addresses
133 .iter()
134 .filter_map(|(multiaddr, failure_time)| {
135 let multiaddr_bytes = multiaddr.to_vec();
136
137 if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
138 {
139 debug!(
141 encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
142 limit = %single_peer_encoded_address_size,
143 ?multiaddr,
144 "Unexpectedly large multiaddress"
145 );
146 return None;
147 }
148
149 Some(EncodableKnownPeerAddress {
150 multiaddr: multiaddr_bytes,
151 failure_time: failure_time.map(|failure_time| {
152 failure_time
153 .duration_since(SystemTime::UNIX_EPOCH)
154 .expect("Never before Unix epoch; qed")
155 .as_secs()
156 }),
157 })
158 })
159 .collect(),
160 )
161 })
162 .collect(),
163 }
164 }
165}
166
167struct KnownPeersSlots {
169 a: MmapMut,
170 b: MmapMut,
171}
172
173impl KnownPeersSlots {
174 fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
175 let known_peers_bytes = encodable_known_peers.encode();
176 let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
177 encoded_bytes.copy_from_slice(&known_peers_bytes);
178 remaining_bytes[..Blake3Hash::SIZE]
180 .copy_from_slice(blake3_hash(&known_peers_bytes).as_ref());
181 if let Err(error) = self.a.flush() {
182 warn!(%error, "Failed to flush known peers to disk");
183 }
184
185 mem::swap(&mut self.a, &mut self.b);
187 }
188}
189
190#[async_trait]
192pub trait KnownPeersRegistry: Send + Sync {
193 async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
195
196 async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
198
199 fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
201
202 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
204
205 fn count_known_peers(&mut self) -> (usize, usize);
207
208 async fn run(&mut self);
210
211 fn on_unreachable_address(
216 &mut self,
217 handler: HandlerFn<PeerAddressRemovedEvent>,
218 ) -> Option<HandlerId>;
219}
220
221#[derive(Clone, Default)]
223pub(crate) struct StubNetworkingParametersManager;
224
225impl StubNetworkingParametersManager {
226 pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
228 Box::new(self)
229 }
230}
231
232#[async_trait]
233impl KnownPeersRegistry for StubNetworkingParametersManager {
234 async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
235
236 async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
237
238 fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
239
240 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
241 Vec::new()
242 }
243
244 fn count_known_peers(&mut self) -> (usize, usize) {
245 (0, 0)
246 }
247
248 async fn run(&mut self) {
249 futures::future::pending().await
251 }
252
253 fn on_unreachable_address(
254 &mut self,
255 _handler: HandlerFn<PeerAddressRemovedEvent>,
256 ) -> Option<HandlerId> {
257 None
258 }
259}
260
261#[derive(Debug, Clone)]
263pub struct KnownPeersManagerConfig {
264 pub enable_known_peers_source: bool,
266 pub cache_size: u32,
268 pub ignore_peer_list: HashSet<PeerId>,
270 pub path: Option<Box<Path>>,
272 pub failed_address_cache_removal_interval: Duration,
274 pub failed_address_kademlia_removal_interval: Duration,
276 pub stale_known_peers_timeout: Duration,
278}
279
280impl Default for KnownPeersManagerConfig {
281 fn default() -> Self {
282 Self {
283 enable_known_peers_source: true,
284 cache_size: KNOWN_PEERS_CACHE_SIZE,
285 ignore_peer_list: Default::default(),
286 path: None,
287 failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
288 failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
289 stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
290 }
291 }
292}
293
294#[derive(Debug, Error)]
296pub enum KnownPeersManagerPersistenceError {
297 #[error("I/O error: {0}")]
299 Io(#[from] io::Error),
300 #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
302 CantPreallocateKnownPeersFile(io::Error),
303}
304
305pub struct KnownPeersManager {
307 cache_need_saving: bool,
309 known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
311 networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
313 known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
315 address_removed: Handler<PeerAddressRemovedEvent>,
317 config: KnownPeersManagerConfig,
319}
320
321impl Drop for KnownPeersManager {
322 fn drop(&mut self) {
323 if self.cache_need_saving
324 && let Some(known_peers_slots) = &self.known_peers_slots
325 {
326 known_peers_slots
327 .lock()
328 .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
329 &self.known_peers,
330 self.config.cache_size,
331 ));
332 }
333 }
334}
335
336impl KnownPeersManager {
337 fn init_file(
338 path: &Path,
339 cache_size: u32,
340 ) -> Result<
341 (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
342 KnownPeersManagerPersistenceError,
343 > {
344 let mut file = OpenOptions::new()
345 .read(true)
346 .write(true)
347 .create(true)
348 .truncate(false)
349 .open(path)?;
350
351 let known_addresses_size = Self::known_addresses_size(cache_size);
352 let file_size = Self::file_size(cache_size);
353 let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
355
356 {
357 let mut file_contents = Vec::with_capacity(file_size);
358 file.read_to_end(&mut file_contents)?;
359 if !file_contents.is_empty() {
360 for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
361 let known_addresses =
362 match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
363 Ok(known_addresses) => known_addresses,
364 Err(error) => {
365 debug!(%error, "Failed to decode encodable known peers");
366 continue;
367 }
368 };
369
370 let (encoded_bytes, remaining_bytes) =
371 known_addresses_bytes.split_at(known_addresses.encoded_size());
372 if remaining_bytes.len() < Blake3Hash::SIZE {
373 debug!(
374 remaining_bytes = %remaining_bytes.len(),
375 "Not enough bytes to decode checksum, file was likely corrupted"
376 );
377 continue;
378 }
379
380 let actual_checksum = blake3_hash(encoded_bytes);
382 let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
383 if *actual_checksum != *expected_checksum {
384 debug!(
385 encoded_bytes_len = %encoded_bytes.len(),
386 actual_checksum = %hex::encode(actual_checksum),
387 expected_checksum = %hex::encode(expected_checksum),
388 "Hash doesn't match, possible disk corruption or file was just \
389 created, ignoring"
390 );
391 continue;
392 }
393
394 match &mut maybe_newest_known_addresses {
395 Some(newest_known_addresses) => {
396 if newest_known_addresses.timestamp < known_addresses.timestamp {
397 *newest_known_addresses = known_addresses;
398 }
399 }
400 None => {
401 maybe_newest_known_addresses.replace(known_addresses);
402 }
403 }
404 }
405 }
406 }
407
408 let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
410 file.allocate(file_size as u64)
413 .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
414 file.set_len(file_size as u64)?;
416 true
417 } else {
418 false
419 };
420
421 let mut a_mmap = unsafe {
422 MmapOptions::new()
423 .len(known_addresses_size)
424 .map_mut(&file)?
425 };
426 let mut b_mmap = unsafe {
427 MmapOptions::new()
428 .offset(known_addresses_size as u64)
429 .len(known_addresses_size)
430 .map_mut(&file)?
431 };
432
433 if file_resized {
434 if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
436 let bytes = newest_known_addresses.encode();
437 a_mmap[..bytes.len()].copy_from_slice(&bytes);
438 a_mmap.flush()?;
439 b_mmap[..bytes.len()].copy_from_slice(&bytes);
440 b_mmap.flush()?;
441 }
442 }
443
444 let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
445 a: a_mmap,
446 b: b_mmap,
447 }));
448
449 Ok((maybe_newest_known_addresses, known_peers_slots))
450 }
451
452 pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
454 let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
455 Self::init_file(path, config.cache_size)
456 .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
457 } else {
458 (None, None)
459 };
460
461 let known_peers = maybe_newest_known_addresses
462 .filter(|newest_known_addresses| {
463 let time_since_unix_epoch = SystemTime::now()
464 .duration_since(SystemTime::UNIX_EPOCH)
465 .expect("Never before Unix epoch; qed");
466 let known_peers_age = time_since_unix_epoch
467 .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
468
469 known_peers_age <= config.stale_known_peers_timeout
470 })
471 .map(EncodableKnownPeers::into_cache)
472 .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
473
474 Ok(Self {
475 cache_need_saving: false,
476 known_peers,
477 networking_parameters_save_delay: Self::default_delay(),
478 known_peers_slots,
479 address_removed: Default::default(),
480 config,
481 })
482 }
483
484 pub fn file_size(cache_size: u32) -> usize {
486 Self::known_addresses_size(cache_size) * 2
488 }
489
490 pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
492 Box::new(self)
493 }
494
495 fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
497 Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
498 }
499
500 fn single_peer_encoded_address_size() -> usize {
501 let multiaddr = Multiaddr::from_str(
502 "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
503 )
504 .expect("Valid multiaddr; qed");
505 multiaddr.to_vec().encoded_size() * 3
508 }
509
510 fn single_peer_encoded_size() -> usize {
513 PeerId::random().to_bytes().encoded_size()
516 + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
517 + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
518 * ADDRESSES_CACHE_SIZE as usize
519 }
520
521 fn known_addresses_size(cache_size: u32) -> usize {
526 mem::size_of::<u64>()
529 + Compact::compact_len(&(cache_size))
530 + Self::single_peer_encoded_size() * cache_size as usize
531 + Blake3Hash::SIZE
532 }
533
534 fn persistent_enabled(&self) -> bool {
535 self.config.path.is_some()
536 }
537
538 #[cfg(test)]
539 pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
540 self.known_peers
541 .peek(peer_id)
542 .map(|addresses| addresses.peek(address).is_some())
543 .unwrap_or_default()
544 }
545}
546
547#[async_trait]
548impl KnownPeersRegistry for KnownPeersManager {
549 async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
550 if self.config.ignore_peer_list.contains(&peer_id) {
551 debug!(
552 %peer_id,
553 addr_num=addresses.len(),
554 "Adding new peer addresses canceled (ignore list): {:?}",
555 addresses
556 );
557
558 return;
559 }
560
561 debug!(
562 %peer_id,
563 addr_num=addresses.len(),
564 "Add new peer addresses to the networking parameters registry: {:?}",
565 addresses
566 );
567
568 addresses
569 .iter()
570 .filter(|addr| {
571 !addr
573 .into_iter()
574 .any(|protocol| matches!(protocol, Protocol::Memory(..)))
575 })
576 .cloned()
577 .map(remove_p2p_suffix)
578 .for_each(|addr| {
579 self.known_peers
581 .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
582
583 if let Some(addresses) = self.known_peers.get(&peer_id) {
584 let previous_entry = addresses.peek(&addr).cloned().flatten();
585 addresses.insert(addr, None);
586 if let Some(previous_entry) = previous_entry {
587 trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
588 }
589 }
590 });
591
592 self.cache_need_saving = true;
593 }
594
595 async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
596 trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
597
598 let removed_addresses = remove_known_peer_addresses_internal(
599 &mut self.known_peers,
600 peer_id,
601 addresses,
602 self.config.failed_address_cache_removal_interval,
603 self.config.failed_address_kademlia_removal_interval,
604 );
605
606 for event in removed_addresses {
607 self.address_removed.call_simple(&event);
608 }
609
610 self.cache_need_saving = true;
611 }
612
613 fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
614 trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
615
616 self.known_peers.remove(&peer_id);
617
618 self.cache_need_saving = true;
619 }
620
621 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
622 if !self.config.enable_known_peers_source {
623 return Vec::new();
624 }
625
626 self.known_peers
627 .iter()
628 .map(|(&peer_id, addresses)| {
629 (
630 peer_id,
631 addresses
632 .iter()
633 .map(|(addr, _failure_time)| addr.clone())
634 .collect(),
635 )
636 })
637 .collect()
638 }
639
640 fn count_known_peers(&mut self) -> (usize, usize) {
641 if !self.config.enable_known_peers_source {
642 return (0, 0);
643 }
644
645 (
646 self.known_peers.len(),
647 self.known_peers
648 .iter()
649 .map(|(_peer, addresses)| addresses.len())
650 .sum(),
651 )
652 }
653
654 async fn run(&mut self) {
655 if !self.persistent_enabled() {
656 pending().await
657 }
658
659 loop {
660 (&mut self.networking_parameters_save_delay).await;
661
662 if let Some(known_peers_slots) = &self.known_peers_slots
663 && self.cache_need_saving
664 {
665 let known_peers =
666 EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
667 let known_peers_slots = Arc::clone(known_peers_slots);
668 let write_known_peers_fut = AsyncJoinOnDrop::new(
669 tokio::task::spawn_blocking(move || {
670 known_peers_slots
671 .lock()
672 .write_to_inactive_slot(&known_peers);
673 }),
674 false,
676 );
677
678 if let Err(error) = write_known_peers_fut.await {
679 error!(%error, "Failed to write known peers");
680 }
681
682 self.cache_need_saving = false;
683 }
684 self.networking_parameters_save_delay = KnownPeersManager::default_delay();
685 }
686 }
687
688 fn on_unreachable_address(
689 &mut self,
690 handler: HandlerFn<PeerAddressRemovedEvent>,
691 ) -> Option<HandlerId> {
692 let handler_id = self.address_removed.add(handler);
693
694 Some(handler_id)
695 }
696}
697
698pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
700 let last_protocol = address.pop();
701
702 if let Some(Protocol::P2p(_)) = &last_protocol {
703 return address;
704 }
705
706 if let Some(protocol) = last_protocol {
707 address.push(protocol)
708 }
709
710 address
711}
712
713pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
715 let last_protocol = address.pop();
716
717 if let Some(protocol) = last_protocol
718 && !matches!(protocol, Protocol::P2p(..))
719 {
720 address.push(protocol)
721 }
722 address.push(Protocol::P2p(peer_id));
723
724 address
725}
726
727pub(super) fn remove_known_peer_addresses_internal(
729 known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
730 peer_id: PeerId,
731 addresses: Vec<Multiaddr>,
732 expired_address_duration_persistent_storage: Duration,
733 expired_address_duration_kademlia: Duration,
734) -> Vec<PeerAddressRemovedEvent> {
735 let mut address_removed_events = Vec::new();
736 let now = SystemTime::now();
737
738 addresses
739 .into_iter()
740 .map(remove_p2p_suffix)
741 .for_each(|addr| {
742 if let Some(addresses) = known_peers.peek_mut(&peer_id) {
744 let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
745 if let Some(first_failed_time) = addresses.peek_mut(&addr) {
748 if let Some(time) = first_failed_time {
750 if *time + expired_address_duration_kademlia < now {
752 let address_removed = PeerAddressRemovedEvent {
753 peer_id,
754 address: addr.clone(),
755 };
756
757 address_removed_events.push(address_removed);
758
759 trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
760 }
761
762 if *time + expired_address_duration_persistent_storage < now {
764 addresses.remove(&addr);
766
767 if last_address {
769 known_peers.remove(&peer_id);
770
771 trace!(%peer_id, "Peer removed from the cache");
772 }
773
774 trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
775 } else {
776 trace!(
777 %peer_id, "Saving failed connection attempt to a peer: {:?}",
778 addr
779 );
780 }
781 } else {
782 first_failed_time.replace(now);
784
785 trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
786 }
787 }
788 }
789 });
790
791 address_removed_events
792}