use crate::utils::{AsyncJoinOnDrop, Handler, HandlerFn};
use async_trait::async_trait;
use event_listener_primitives::HandlerId;
use fs2::FileExt;
use futures::future::{pending, Fuse};
use futures::FutureExt;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use memmap2::{MmapMut, MmapOptions};
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
use parking_lot::Mutex;
use schnellru::{ByLength, LruMap};
use std::collections::HashSet;
use std::fs::OpenOptions;
use std::io::{Read, Seek, SeekFrom};
use std::path::Path;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::{io, mem};
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use thiserror::Error;
use tokio::time::{sleep, Sleep};
use tracing::{debug, error, trace, warn};
type FailureTime = Option<SystemTime>;
const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
const ADDRESSES_CACHE_SIZE: u32 = 30;
const DATA_FLUSH_DURATION_SECS: u64 = 5;
const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
#[derive(Debug, Clone)]
pub struct PeerAddressRemovedEvent {
pub peer_id: PeerId,
pub address: Multiaddr,
}
#[derive(Debug, Encode, Decode)]
struct EncodableKnownPeerAddress {
multiaddr: Vec<u8>,
failure_time: Option<u64>,
}
#[derive(Debug, Encode, Decode)]
struct EncodableKnownPeers {
cache_size: u32,
timestamp: u64,
known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
}
impl EncodableKnownPeers {
fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
self.known_peers
.sort_by_cached_key(|(_peer_id, addresses)| {
addresses.iter().fold(0u64, |acc, address| {
acc.max(address.failure_time.unwrap_or(u64::MAX))
})
});
'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
let mut peer_cache =
LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
let peer_id = match PeerId::from_bytes(&peer_id) {
Ok(peer_id) => peer_id,
Err(error) => {
debug!(%error, "Failed to decode known peer ID, skipping peer entry");
continue;
}
};
for address in addresses {
let multiaddr = match Multiaddr::try_from(address.multiaddr) {
Ok(multiaddr) => multiaddr,
Err(error) => {
debug!(
%error,
"Failed to decode known peer multiaddress, skipping peer entry"
);
continue 'peers;
}
};
peer_cache.insert(
multiaddr,
address.failure_time.map(|failure_time| {
SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
}),
);
}
peers_cache.insert(peer_id, peer_cache);
}
peers_cache
}
fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
let single_peer_encoded_address_size =
KnownPeersManager::single_peer_encoded_address_size();
Self {
cache_size,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Never before Unix epoch; qed")
.as_secs(),
known_peers: cache
.iter()
.map(|(peer_id, addresses)| {
(
peer_id.to_bytes(),
addresses
.iter()
.filter_map(|(multiaddr, failure_time)| {
let multiaddr_bytes = multiaddr.to_vec();
if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
{
debug!(
encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
limit = %single_peer_encoded_address_size,
?multiaddr,
"Unexpectedly large multiaddress"
);
return None;
}
Some(EncodableKnownPeerAddress {
multiaddr: multiaddr_bytes,
failure_time: failure_time.map(|failure_time| {
failure_time
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Never before Unix epoch; qed")
.as_secs()
}),
})
})
.collect(),
)
})
.collect(),
}
}
}
struct KnownPeersSlots {
a: MmapMut,
b: MmapMut,
}
impl KnownPeersSlots {
fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
let known_peers_bytes = encodable_known_peers.encode();
let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
encoded_bytes.copy_from_slice(&known_peers_bytes);
remaining_bytes[..Blake3Hash::SIZE]
.copy_from_slice(blake3_hash(&known_peers_bytes).as_ref());
if let Err(error) = self.a.flush() {
warn!(%error, "Failed to flush known peers to disk");
}
mem::swap(&mut self.a, &mut self.b);
}
}
#[async_trait]
pub trait KnownPeersRegistry: Send + Sync {
async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
async fn run(&mut self);
fn on_unreachable_address(
&mut self,
handler: HandlerFn<PeerAddressRemovedEvent>,
) -> Option<HandlerId>;
}
#[derive(Clone, Default)]
pub(crate) struct StubNetworkingParametersManager;
impl StubNetworkingParametersManager {
pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
Box::new(self)
}
}
#[async_trait]
impl KnownPeersRegistry for StubNetworkingParametersManager {
async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
Vec::new()
}
async fn run(&mut self) {
futures::future::pending().await
}
fn on_unreachable_address(
&mut self,
_handler: HandlerFn<PeerAddressRemovedEvent>,
) -> Option<HandlerId> {
None
}
}
#[derive(Debug, Clone)]
pub struct KnownPeersManagerConfig {
pub enable_known_peers_source: bool,
pub cache_size: u32,
pub ignore_peer_list: HashSet<PeerId>,
pub path: Option<Box<Path>>,
pub failed_address_cache_removal_interval: Duration,
pub failed_address_kademlia_removal_interval: Duration,
pub stale_known_peers_timeout: Duration,
}
impl Default for KnownPeersManagerConfig {
fn default() -> Self {
Self {
enable_known_peers_source: true,
cache_size: KNOWN_PEERS_CACHE_SIZE,
ignore_peer_list: Default::default(),
path: None,
failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
}
}
}
#[derive(Debug, Error)]
pub enum KnownPeersManagerPersistenceError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
CantPreallocateKnownPeersFile(io::Error),
}
pub struct KnownPeersManager {
cache_need_saving: bool,
known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
address_removed: Handler<PeerAddressRemovedEvent>,
config: KnownPeersManagerConfig,
}
impl Drop for KnownPeersManager {
fn drop(&mut self) {
if self.cache_need_saving {
if let Some(known_peers_slots) = &self.known_peers_slots {
known_peers_slots
.lock()
.write_to_inactive_slot(&EncodableKnownPeers::from_cache(
&self.known_peers,
self.config.cache_size,
));
}
}
}
}
impl KnownPeersManager {
fn init_file(
path: &Path,
cache_size: u32,
) -> Result<
(Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
KnownPeersManagerPersistenceError,
> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?;
let known_addresses_size = Self::known_addresses_size(cache_size);
let file_size = Self::file_size(cache_size);
let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
{
let mut file_contents = Vec::with_capacity(file_size);
file.read_to_end(&mut file_contents)?;
if !file_contents.is_empty() {
for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
let known_addresses =
match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
Ok(known_addresses) => known_addresses,
Err(error) => {
debug!(%error, "Failed to decode encodable known peers");
continue;
}
};
let (encoded_bytes, remaining_bytes) =
known_addresses_bytes.split_at(known_addresses.encoded_size());
if remaining_bytes.len() < Blake3Hash::SIZE {
debug!(
remaining_bytes = %remaining_bytes.len(),
"Not enough bytes to decode checksum, file was likely corrupted"
);
continue;
}
let actual_checksum = blake3_hash(encoded_bytes);
let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
if *actual_checksum != *expected_checksum {
debug!(
encoded_bytes_len = %encoded_bytes.len(),
actual_checksum = %hex::encode(actual_checksum),
expected_checksum = %hex::encode(expected_checksum),
"Hash doesn't match, possible disk corruption or file was just \
created, ignoring"
);
continue;
}
match &mut maybe_newest_known_addresses {
Some(newest_known_addresses) => {
if newest_known_addresses.timestamp < known_addresses.timestamp {
*newest_known_addresses = known_addresses;
}
}
None => {
maybe_newest_known_addresses.replace(known_addresses);
}
}
}
}
}
let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
file.allocate(file_size as u64)
.map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
file.set_len(file_size as u64)?;
true
} else {
false
};
let mut a_mmap = unsafe {
MmapOptions::new()
.len(known_addresses_size)
.map_mut(&file)?
};
let mut b_mmap = unsafe {
MmapOptions::new()
.offset(known_addresses_size as u64)
.len(known_addresses_size)
.map_mut(&file)?
};
if file_resized {
if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
let bytes = newest_known_addresses.encode();
a_mmap[..bytes.len()].copy_from_slice(&bytes);
a_mmap.flush()?;
b_mmap[..bytes.len()].copy_from_slice(&bytes);
b_mmap.flush()?;
}
}
let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
a: a_mmap,
b: b_mmap,
}));
Ok((maybe_newest_known_addresses, known_peers_slots))
}
pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
Self::init_file(path, config.cache_size)
.map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
} else {
(None, None)
};
let known_peers = maybe_newest_known_addresses
.filter(|newest_known_addresses| {
let time_since_unix_epoch = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Never before Unix epoch; qed");
let known_peers_age = time_since_unix_epoch
.saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
known_peers_age <= config.stale_known_peers_timeout
})
.map(EncodableKnownPeers::into_cache)
.unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
Ok(Self {
cache_need_saving: false,
known_peers,
networking_parameters_save_delay: Self::default_delay(),
known_peers_slots,
address_removed: Default::default(),
config,
})
}
pub fn file_size(cache_size: u32) -> usize {
Self::known_addresses_size(cache_size) * 2
}
pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
Box::new(self)
}
fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
}
fn single_peer_encoded_address_size() -> usize {
let multiaddr = Multiaddr::from_str(
"/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
)
.expect("Valid multiaddr; qed");
multiaddr.to_vec().encoded_size() * 3
}
fn single_peer_encoded_size() -> usize {
PeerId::random().to_bytes().encoded_size()
+ Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
+ (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
* ADDRESSES_CACHE_SIZE as usize
}
fn known_addresses_size(cache_size: u32) -> usize {
mem::size_of::<u64>()
+ Compact::compact_len(&(cache_size))
+ Self::single_peer_encoded_size() * cache_size as usize
+ Blake3Hash::SIZE
}
fn persistent_enabled(&self) -> bool {
self.config.path.is_some()
}
#[cfg(test)]
pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
self.known_peers
.peek(peer_id)
.map(|addresses| addresses.peek(address).is_some())
.unwrap_or_default()
}
}
#[async_trait]
impl KnownPeersRegistry for KnownPeersManager {
async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
if self.config.ignore_peer_list.contains(&peer_id) {
debug!(
%peer_id,
addr_num=addresses.len(),
"Adding new peer addresses canceled (ignore list): {:?}",
addresses
);
return;
}
debug!(
%peer_id,
addr_num=addresses.len(),
"Add new peer addresses to the networking parameters registry: {:?}",
addresses
);
addresses
.iter()
.filter(|addr| {
!addr
.into_iter()
.any(|protocol| matches!(protocol, Protocol::Memory(..)))
})
.cloned()
.map(remove_p2p_suffix)
.for_each(|addr| {
self.known_peers
.get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
if let Some(addresses) = self.known_peers.get(&peer_id) {
let previous_entry = addresses.peek(&addr).cloned().flatten();
addresses.insert(addr, None);
if let Some(previous_entry) = previous_entry {
trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
}
}
});
self.cache_need_saving = true;
}
async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
let removed_addresses = remove_known_peer_addresses_internal(
&mut self.known_peers,
peer_id,
addresses,
self.config.failed_address_cache_removal_interval,
self.config.failed_address_kademlia_removal_interval,
);
for event in removed_addresses {
self.address_removed.call_simple(&event);
}
self.cache_need_saving = true;
}
fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
self.known_peers.remove(&peer_id);
self.cache_need_saving = true;
}
async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
if !self.config.enable_known_peers_source {
return Vec::new();
}
self.known_peers
.iter()
.map(|(&peer_id, addresses)| {
(
peer_id,
addresses
.iter()
.map(|(addr, _failure_time)| addr.clone())
.collect(),
)
})
.collect()
}
async fn run(&mut self) {
if !self.persistent_enabled() {
pending().await
}
loop {
(&mut self.networking_parameters_save_delay).await;
if let Some(known_peers_slots) = &self.known_peers_slots {
if self.cache_need_saving {
let known_peers =
EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
let known_peers_slots = Arc::clone(known_peers_slots);
let write_known_peers_fut =
AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || {
known_peers_slots
.lock()
.write_to_inactive_slot(&known_peers);
}));
if let Err(error) = write_known_peers_fut.await {
error!(%error, "Failed to write known peers");
}
self.cache_need_saving = false;
}
}
self.networking_parameters_save_delay = KnownPeersManager::default_delay();
}
}
fn on_unreachable_address(
&mut self,
handler: HandlerFn<PeerAddressRemovedEvent>,
) -> Option<HandlerId> {
let handler_id = self.address_removed.add(handler);
Some(handler_id)
}
}
pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
let last_protocol = address.pop();
if let Some(Protocol::P2p(_)) = &last_protocol {
return address;
}
if let Some(protocol) = last_protocol {
address.push(protocol)
}
address
}
pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
let last_protocol = address.pop();
if let Some(protocol) = last_protocol {
if !matches!(protocol, Protocol::P2p(..)) {
address.push(protocol)
}
}
address.push(Protocol::P2p(peer_id));
address
}
pub(super) fn remove_known_peer_addresses_internal(
known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
peer_id: PeerId,
addresses: Vec<Multiaddr>,
expired_address_duration_persistent_storage: Duration,
expired_address_duration_kademlia: Duration,
) -> Vec<PeerAddressRemovedEvent> {
let mut address_removed_events = Vec::new();
let now = SystemTime::now();
addresses
.into_iter()
.map(remove_p2p_suffix)
.for_each(|addr| {
if let Some(addresses) = known_peers.peek_mut(&peer_id) {
let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
if let Some(first_failed_time) = addresses.peek_mut(&addr) {
if let Some(time) = first_failed_time {
if *time + expired_address_duration_kademlia < now {
let address_removed = PeerAddressRemovedEvent {
peer_id,
address: addr.clone(),
};
address_removed_events.push(address_removed);
trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
}
if *time + expired_address_duration_persistent_storage < now {
addresses.remove(&addr);
if last_address {
known_peers.remove(&peer_id);
trace!(%peer_id, "Peer removed from the cache");
}
trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
} else {
trace!(
%peer_id, "Saving failed connection attempt to a peer: {:?}",
addr
);
}
} else {
first_failed_time.replace(now);
trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
}
}
}
});
address_removed_events
}