subspace_networking/utils/
rate_limiter.rs1use std::num::NonZeroUsize;
2use std::sync::Arc;
3use tokio::sync::{OwnedSemaphorePermit, Semaphore};
4use tracing::debug;
5
6const MINIMUM_CONNECTIONS_SEMAPHORE_SIZE: usize = 3;
8
9const CONNECTION_TIMEOUT_PARAMETER: usize = 2;
11
12#[derive(Debug)]
13pub(crate) struct RateLimiter {
14 connections_semaphore: Arc<Semaphore>,
15}
16
17impl RateLimiter {
18 pub(crate) fn new(out_connections: u32, pending_out_connections: u32) -> Self {
19 let permits = Self::calculate_connection_semaphore_size(
20 out_connections as usize,
21 pending_out_connections as usize,
22 );
23
24 debug!(%out_connections, %pending_out_connections, %permits, "Rate limiter was instantiated.");
25
26 Self {
27 connections_semaphore: Arc::new(Semaphore::new(permits.get())),
28 }
29 }
30
31 fn calculate_connection_semaphore_size(
34 out_connections: usize,
35 pending_out_connections: usize,
36 ) -> NonZeroUsize {
37 let connections = out_connections.min(pending_out_connections);
38
39 let kademlia_parallelism_level = libp2p::kad::ALPHA_VALUE.get();
41
42 let permits_number =
43 (connections / (kademlia_parallelism_level * CONNECTION_TIMEOUT_PARAMETER)).max(1);
44
45 let minimum_semaphore_size =
46 NonZeroUsize::new(MINIMUM_CONNECTIONS_SEMAPHORE_SIZE).expect("Manual setting");
47
48 NonZeroUsize::new(permits_number)
49 .expect("The value is at least 1")
50 .max(minimum_semaphore_size)
51 }
52
53 pub(crate) async fn acquire_permit(&self) -> OwnedSemaphorePermit {
54 self.connections_semaphore
55 .clone()
56 .acquire_owned()
57 .await
58 .expect("We never close semaphore.")
59 }
60}