subspace_networking/utils/
rate_limiter.rs

1use std::num::NonZeroUsize;
2use std::sync::Arc;
3use tokio::sync::{OwnedSemaphorePermit, Semaphore};
4use tracing::debug;
5
6/// Defines the minimum size of the "connection limit semaphore".
7const MINIMUM_CONNECTIONS_SEMAPHORE_SIZE: usize = 3;
8
9/// Empiric parameter for connection timeout and retry parameters (total retries and backoff time).
10const CONNECTION_TIMEOUT_PARAMETER: usize = 2;
11
12#[derive(Debug)]
13pub(crate) struct RateLimiter {
14    connections_semaphore: Arc<Semaphore>,
15}
16
17impl RateLimiter {
18    pub(crate) fn new(out_connections: u32, pending_out_connections: u32) -> Self {
19        let permits = Self::calculate_connection_semaphore_size(
20            out_connections as usize,
21            pending_out_connections as usize,
22        );
23
24        debug!(%out_connections, %pending_out_connections, %permits, "Rate limiter was instantiated.");
25
26        Self {
27            connections_semaphore: Arc::new(Semaphore::new(permits.get())),
28        }
29    }
30
31    /// Calculates an empiric formula for the semaphore size based on the connection parameters and
32    /// existing constants.
33    fn calculate_connection_semaphore_size(
34        out_connections: usize,
35        pending_out_connections: usize,
36    ) -> NonZeroUsize {
37        let connections = out_connections.min(pending_out_connections);
38
39        // Number of "in-flight" parallel requests for each query
40        let kademlia_parallelism_level = libp2p::kad::ALPHA_VALUE.get();
41
42        let permits_number =
43            (connections / (kademlia_parallelism_level * CONNECTION_TIMEOUT_PARAMETER)).max(1);
44
45        let minimum_semaphore_size =
46            NonZeroUsize::new(MINIMUM_CONNECTIONS_SEMAPHORE_SIZE).expect("Manual setting");
47
48        NonZeroUsize::new(permits_number)
49            .expect("The value is at least 1")
50            .max(minimum_semaphore_size)
51    }
52
53    pub(crate) async fn acquire_permit(&self) -> OwnedSemaphorePermit {
54        self.connections_semaphore
55            .clone()
56            .acquire_owned()
57            .await
58            .expect("We never close semaphore.")
59    }
60}