1use crate::utils::{AsyncJoinOnDrop, Handler, HandlerFn};
2use async_trait::async_trait;
3use event_listener_primitives::HandlerId;
4use fs2::FileExt;
5use futures::future::{pending, Fuse};
6use futures::FutureExt;
7use libp2p::multiaddr::Protocol;
8use libp2p::{Multiaddr, PeerId};
9use memmap2::{MmapMut, MmapOptions};
10use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
11use parking_lot::Mutex;
12use schnellru::{ByLength, LruMap};
13use std::collections::HashSet;
14use std::fs::OpenOptions;
15use std::io::{Read, Seek, SeekFrom};
16use std::path::Path;
17use std::pin::Pin;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21use std::{io, mem};
22use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
23use thiserror::Error;
24use tokio::time::{sleep, Sleep};
25use tracing::{debug, error, trace, warn};
26
27type FailureTime = Option<SystemTime>;
29
30const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
32const ADDRESSES_CACHE_SIZE: u32 = 30;
34const DATA_FLUSH_DURATION_SECS: u64 = 5;
36const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
38const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
40const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
42
43#[derive(Debug, Clone)]
45pub struct PeerAddressRemovedEvent {
46 pub peer_id: PeerId,
48 pub address: Multiaddr,
50}
51
52#[derive(Debug, Encode, Decode)]
53struct EncodableKnownPeerAddress {
54 multiaddr: Vec<u8>,
55 failure_time: Option<u64>,
57}
58
59#[derive(Debug, Encode, Decode)]
60struct EncodableKnownPeers {
61 cache_size: u32,
62 timestamp: u64,
63 known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
65}
66
67impl EncodableKnownPeers {
68 fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
69 let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
70
71 self.known_peers
73 .sort_by_cached_key(|(_peer_id, addresses)| {
74 addresses.iter().fold(0u64, |acc, address| {
75 acc.max(address.failure_time.unwrap_or(u64::MAX))
76 })
77 });
78
79 'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
81 let mut peer_cache =
82 LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
83
84 let peer_id = match PeerId::from_bytes(&peer_id) {
85 Ok(peer_id) => peer_id,
86 Err(error) => {
87 debug!(%error, "Failed to decode known peer ID, skipping peer entry");
88 continue;
89 }
90 };
91 for address in addresses {
92 let multiaddr = match Multiaddr::try_from(address.multiaddr) {
93 Ok(multiaddr) => multiaddr,
94 Err(error) => {
95 debug!(
96 %error,
97 "Failed to decode known peer multiaddress, skipping peer entry"
98 );
99 continue 'peers;
100 }
101 };
102
103 peer_cache.insert(
104 multiaddr,
105 address.failure_time.map(|failure_time| {
106 SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
107 }),
108 );
109 }
110
111 peers_cache.insert(peer_id, peer_cache);
112 }
113
114 peers_cache
115 }
116
117 fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
118 let single_peer_encoded_address_size =
119 KnownPeersManager::single_peer_encoded_address_size();
120 Self {
121 cache_size,
122 timestamp: SystemTime::now()
123 .duration_since(SystemTime::UNIX_EPOCH)
124 .expect("Never before Unix epoch; qed")
125 .as_secs(),
126 known_peers: cache
127 .iter()
128 .map(|(peer_id, addresses)| {
129 (
130 peer_id.to_bytes(),
131 addresses
132 .iter()
133 .filter_map(|(multiaddr, failure_time)| {
134 let multiaddr_bytes = multiaddr.to_vec();
135
136 if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
137 {
138 debug!(
140 encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
141 limit = %single_peer_encoded_address_size,
142 ?multiaddr,
143 "Unexpectedly large multiaddress"
144 );
145 return None;
146 }
147
148 Some(EncodableKnownPeerAddress {
149 multiaddr: multiaddr_bytes,
150 failure_time: failure_time.map(|failure_time| {
151 failure_time
152 .duration_since(SystemTime::UNIX_EPOCH)
153 .expect("Never before Unix epoch; qed")
154 .as_secs()
155 }),
156 })
157 })
158 .collect(),
159 )
160 })
161 .collect(),
162 }
163 }
164}
165
166struct KnownPeersSlots {
168 a: MmapMut,
169 b: MmapMut,
170}
171
172impl KnownPeersSlots {
173 fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
174 let known_peers_bytes = encodable_known_peers.encode();
175 let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
176 encoded_bytes.copy_from_slice(&known_peers_bytes);
177 remaining_bytes[..Blake3Hash::SIZE]
179 .copy_from_slice(blake3_hash(&known_peers_bytes).as_ref());
180 if let Err(error) = self.a.flush() {
181 warn!(%error, "Failed to flush known peers to disk");
182 }
183
184 mem::swap(&mut self.a, &mut self.b);
186 }
187}
188
189#[async_trait]
191pub trait KnownPeersRegistry: Send + Sync {
192 async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
194
195 async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
197
198 fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
200
201 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
203
204 async fn run(&mut self);
206
207 fn on_unreachable_address(
212 &mut self,
213 handler: HandlerFn<PeerAddressRemovedEvent>,
214 ) -> Option<HandlerId>;
215}
216
217#[derive(Clone, Default)]
219pub(crate) struct StubNetworkingParametersManager;
220
221impl StubNetworkingParametersManager {
222 pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
224 Box::new(self)
225 }
226}
227
228#[async_trait]
229impl KnownPeersRegistry for StubNetworkingParametersManager {
230 async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
231
232 async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
233
234 fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
235
236 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
237 Vec::new()
238 }
239
240 async fn run(&mut self) {
241 futures::future::pending().await
243 }
244
245 fn on_unreachable_address(
246 &mut self,
247 _handler: HandlerFn<PeerAddressRemovedEvent>,
248 ) -> Option<HandlerId> {
249 None
250 }
251}
252
253#[derive(Debug, Clone)]
255pub struct KnownPeersManagerConfig {
256 pub enable_known_peers_source: bool,
258 pub cache_size: u32,
260 pub ignore_peer_list: HashSet<PeerId>,
262 pub path: Option<Box<Path>>,
264 pub failed_address_cache_removal_interval: Duration,
266 pub failed_address_kademlia_removal_interval: Duration,
268 pub stale_known_peers_timeout: Duration,
270}
271
272impl Default for KnownPeersManagerConfig {
273 fn default() -> Self {
274 Self {
275 enable_known_peers_source: true,
276 cache_size: KNOWN_PEERS_CACHE_SIZE,
277 ignore_peer_list: Default::default(),
278 path: None,
279 failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
280 failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
281 stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
282 }
283 }
284}
285
286#[derive(Debug, Error)]
288pub enum KnownPeersManagerPersistenceError {
289 #[error("I/O error: {0}")]
291 Io(#[from] io::Error),
292 #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
294 CantPreallocateKnownPeersFile(io::Error),
295}
296
297pub struct KnownPeersManager {
299 cache_need_saving: bool,
301 known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
303 networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
305 known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
307 address_removed: Handler<PeerAddressRemovedEvent>,
309 config: KnownPeersManagerConfig,
311}
312
313impl Drop for KnownPeersManager {
314 fn drop(&mut self) {
315 if self.cache_need_saving {
316 if let Some(known_peers_slots) = &self.known_peers_slots {
317 known_peers_slots
318 .lock()
319 .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
320 &self.known_peers,
321 self.config.cache_size,
322 ));
323 }
324 }
325 }
326}
327
328impl KnownPeersManager {
329 fn init_file(
330 path: &Path,
331 cache_size: u32,
332 ) -> Result<
333 (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
334 KnownPeersManagerPersistenceError,
335 > {
336 let mut file = OpenOptions::new()
337 .read(true)
338 .write(true)
339 .create(true)
340 .truncate(false)
341 .open(path)?;
342
343 let known_addresses_size = Self::known_addresses_size(cache_size);
344 let file_size = Self::file_size(cache_size);
345 let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
347
348 {
349 let mut file_contents = Vec::with_capacity(file_size);
350 file.read_to_end(&mut file_contents)?;
351 if !file_contents.is_empty() {
352 for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
353 let known_addresses =
354 match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
355 Ok(known_addresses) => known_addresses,
356 Err(error) => {
357 debug!(%error, "Failed to decode encodable known peers");
358 continue;
359 }
360 };
361
362 let (encoded_bytes, remaining_bytes) =
363 known_addresses_bytes.split_at(known_addresses.encoded_size());
364 if remaining_bytes.len() < Blake3Hash::SIZE {
365 debug!(
366 remaining_bytes = %remaining_bytes.len(),
367 "Not enough bytes to decode checksum, file was likely corrupted"
368 );
369 continue;
370 }
371
372 let actual_checksum = blake3_hash(encoded_bytes);
374 let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
375 if *actual_checksum != *expected_checksum {
376 debug!(
377 encoded_bytes_len = %encoded_bytes.len(),
378 actual_checksum = %hex::encode(actual_checksum),
379 expected_checksum = %hex::encode(expected_checksum),
380 "Hash doesn't match, possible disk corruption or file was just \
381 created, ignoring"
382 );
383 continue;
384 }
385
386 match &mut maybe_newest_known_addresses {
387 Some(newest_known_addresses) => {
388 if newest_known_addresses.timestamp < known_addresses.timestamp {
389 *newest_known_addresses = known_addresses;
390 }
391 }
392 None => {
393 maybe_newest_known_addresses.replace(known_addresses);
394 }
395 }
396 }
397 }
398 }
399
400 let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
402 file.allocate(file_size as u64)
405 .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
406 file.set_len(file_size as u64)?;
408 true
409 } else {
410 false
411 };
412
413 let mut a_mmap = unsafe {
414 MmapOptions::new()
415 .len(known_addresses_size)
416 .map_mut(&file)?
417 };
418 let mut b_mmap = unsafe {
419 MmapOptions::new()
420 .offset(known_addresses_size as u64)
421 .len(known_addresses_size)
422 .map_mut(&file)?
423 };
424
425 if file_resized {
426 if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
428 let bytes = newest_known_addresses.encode();
429 a_mmap[..bytes.len()].copy_from_slice(&bytes);
430 a_mmap.flush()?;
431 b_mmap[..bytes.len()].copy_from_slice(&bytes);
432 b_mmap.flush()?;
433 }
434 }
435
436 let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
437 a: a_mmap,
438 b: b_mmap,
439 }));
440
441 Ok((maybe_newest_known_addresses, known_peers_slots))
442 }
443
444 pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
446 let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
447 Self::init_file(path, config.cache_size)
448 .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
449 } else {
450 (None, None)
451 };
452
453 let known_peers = maybe_newest_known_addresses
454 .filter(|newest_known_addresses| {
455 let time_since_unix_epoch = SystemTime::now()
456 .duration_since(SystemTime::UNIX_EPOCH)
457 .expect("Never before Unix epoch; qed");
458 let known_peers_age = time_since_unix_epoch
459 .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
460
461 known_peers_age <= config.stale_known_peers_timeout
462 })
463 .map(EncodableKnownPeers::into_cache)
464 .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
465
466 Ok(Self {
467 cache_need_saving: false,
468 known_peers,
469 networking_parameters_save_delay: Self::default_delay(),
470 known_peers_slots,
471 address_removed: Default::default(),
472 config,
473 })
474 }
475
476 pub fn file_size(cache_size: u32) -> usize {
478 Self::known_addresses_size(cache_size) * 2
480 }
481
482 pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
484 Box::new(self)
485 }
486
487 fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
489 Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
490 }
491
492 fn single_peer_encoded_address_size() -> usize {
493 let multiaddr = Multiaddr::from_str(
494 "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
495 )
496 .expect("Valid multiaddr; qed");
497 multiaddr.to_vec().encoded_size() * 3
500 }
501
502 fn single_peer_encoded_size() -> usize {
505 PeerId::random().to_bytes().encoded_size()
508 + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
509 + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
510 * ADDRESSES_CACHE_SIZE as usize
511 }
512
513 fn known_addresses_size(cache_size: u32) -> usize {
518 mem::size_of::<u64>()
521 + Compact::compact_len(&(cache_size))
522 + Self::single_peer_encoded_size() * cache_size as usize
523 + Blake3Hash::SIZE
524 }
525
526 fn persistent_enabled(&self) -> bool {
527 self.config.path.is_some()
528 }
529
530 #[cfg(test)]
531 pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
532 self.known_peers
533 .peek(peer_id)
534 .map(|addresses| addresses.peek(address).is_some())
535 .unwrap_or_default()
536 }
537}
538
539#[async_trait]
540impl KnownPeersRegistry for KnownPeersManager {
541 async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
542 if self.config.ignore_peer_list.contains(&peer_id) {
543 debug!(
544 %peer_id,
545 addr_num=addresses.len(),
546 "Adding new peer addresses canceled (ignore list): {:?}",
547 addresses
548 );
549
550 return;
551 }
552
553 debug!(
554 %peer_id,
555 addr_num=addresses.len(),
556 "Add new peer addresses to the networking parameters registry: {:?}",
557 addresses
558 );
559
560 addresses
561 .iter()
562 .filter(|addr| {
563 !addr
565 .into_iter()
566 .any(|protocol| matches!(protocol, Protocol::Memory(..)))
567 })
568 .cloned()
569 .map(remove_p2p_suffix)
570 .for_each(|addr| {
571 self.known_peers
573 .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
574
575 if let Some(addresses) = self.known_peers.get(&peer_id) {
576 let previous_entry = addresses.peek(&addr).cloned().flatten();
577 addresses.insert(addr, None);
578 if let Some(previous_entry) = previous_entry {
579 trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
580 }
581 }
582 });
583
584 self.cache_need_saving = true;
585 }
586
587 async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
588 trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
589
590 let removed_addresses = remove_known_peer_addresses_internal(
591 &mut self.known_peers,
592 peer_id,
593 addresses,
594 self.config.failed_address_cache_removal_interval,
595 self.config.failed_address_kademlia_removal_interval,
596 );
597
598 for event in removed_addresses {
599 self.address_removed.call_simple(&event);
600 }
601
602 self.cache_need_saving = true;
603 }
604
605 fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
606 trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
607
608 self.known_peers.remove(&peer_id);
609
610 self.cache_need_saving = true;
611 }
612
613 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
614 if !self.config.enable_known_peers_source {
615 return Vec::new();
616 }
617
618 self.known_peers
619 .iter()
620 .map(|(&peer_id, addresses)| {
621 (
622 peer_id,
623 addresses
624 .iter()
625 .map(|(addr, _failure_time)| addr.clone())
626 .collect(),
627 )
628 })
629 .collect()
630 }
631
632 async fn run(&mut self) {
633 if !self.persistent_enabled() {
634 pending().await
635 }
636
637 loop {
638 (&mut self.networking_parameters_save_delay).await;
639
640 if let Some(known_peers_slots) = &self.known_peers_slots {
641 if self.cache_need_saving {
642 let known_peers =
643 EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
644 let known_peers_slots = Arc::clone(known_peers_slots);
645 let write_known_peers_fut =
646 AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || {
647 known_peers_slots
648 .lock()
649 .write_to_inactive_slot(&known_peers);
650 }));
651
652 if let Err(error) = write_known_peers_fut.await {
653 error!(%error, "Failed to write known peers");
654 }
655
656 self.cache_need_saving = false;
657 }
658 }
659 self.networking_parameters_save_delay = KnownPeersManager::default_delay();
660 }
661 }
662
663 fn on_unreachable_address(
664 &mut self,
665 handler: HandlerFn<PeerAddressRemovedEvent>,
666 ) -> Option<HandlerId> {
667 let handler_id = self.address_removed.add(handler);
668
669 Some(handler_id)
670 }
671}
672
673pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
675 let last_protocol = address.pop();
676
677 if let Some(Protocol::P2p(_)) = &last_protocol {
678 return address;
679 }
680
681 if let Some(protocol) = last_protocol {
682 address.push(protocol)
683 }
684
685 address
686}
687
688pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
690 let last_protocol = address.pop();
691
692 if let Some(protocol) = last_protocol {
693 if !matches!(protocol, Protocol::P2p(..)) {
694 address.push(protocol)
695 }
696 }
697 address.push(Protocol::P2p(peer_id));
698
699 address
700}
701
702pub(super) fn remove_known_peer_addresses_internal(
704 known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
705 peer_id: PeerId,
706 addresses: Vec<Multiaddr>,
707 expired_address_duration_persistent_storage: Duration,
708 expired_address_duration_kademlia: Duration,
709) -> Vec<PeerAddressRemovedEvent> {
710 let mut address_removed_events = Vec::new();
711 let now = SystemTime::now();
712
713 addresses
714 .into_iter()
715 .map(remove_p2p_suffix)
716 .for_each(|addr| {
717 if let Some(addresses) = known_peers.peek_mut(&peer_id) {
719 let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
720 if let Some(first_failed_time) = addresses.peek_mut(&addr) {
723 if let Some(time) = first_failed_time {
725 if *time + expired_address_duration_kademlia < now {
727 let address_removed = PeerAddressRemovedEvent {
728 peer_id,
729 address: addr.clone(),
730 };
731
732 address_removed_events.push(address_removed);
733
734 trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
735 }
736
737 if *time + expired_address_duration_persistent_storage < now {
739 addresses.remove(&addr);
741
742 if last_address {
744 known_peers.remove(&peer_id);
745
746 trace!(%peer_id, "Peer removed from the cache");
747 }
748
749 trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
750 } else {
751 trace!(
752 %peer_id, "Saving failed connection attempt to a peer: {:?}",
753 addr
754 );
755 }
756 } else {
757 first_failed_time.replace(now);
759
760 trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
761 }
762 }
763 }
764 });
765
766 address_removed_events
767}