subspace_farmer/
thread_pool_manager.rs

1//! Thread pool managing utilities for plotting purposes
2
3use event_listener::Event;
4use parking_lot::Mutex;
5use rayon::{ThreadPool, ThreadPoolBuildError};
6use std::num::NonZeroUsize;
7use std::ops::Deref;
8use std::sync::Arc;
9
10/// A wrapper around thread pool pair for plotting purposes
11#[derive(Debug)]
12pub struct PlottingThreadPoolPair {
13    /// Plotting thread pool
14    pub plotting: ThreadPool,
15    /// Replotting thread pool
16    pub replotting: ThreadPool,
17}
18
19#[derive(Debug)]
20struct Inner {
21    thread_pool_pairs: Vec<PlottingThreadPoolPair>,
22}
23
24/// Wrapper around [`PlottingThreadPoolPair`] that on `Drop` will return thread pool back into
25/// corresponding [`PlottingThreadPoolManager`].
26#[derive(Debug)]
27#[must_use]
28pub struct PlottingThreadPoolsGuard {
29    inner: Arc<(Mutex<Inner>, Event)>,
30    thread_pool_pair: Option<PlottingThreadPoolPair>,
31}
32
33impl Deref for PlottingThreadPoolsGuard {
34    type Target = PlottingThreadPoolPair;
35
36    #[inline]
37    fn deref(&self) -> &Self::Target {
38        self.thread_pool_pair
39            .as_ref()
40            .expect("Value exists until `Drop`; qed")
41    }
42}
43
44impl Drop for PlottingThreadPoolsGuard {
45    #[inline]
46    fn drop(&mut self) {
47        let (mutex, event) = &*self.inner;
48        mutex.lock().thread_pool_pairs.push(
49            self.thread_pool_pair
50                .take()
51                .expect("Happens only once in `Drop`; qed"),
52        );
53        event.notify_additional(1);
54    }
55}
56
57/// Plotting thread pool manager.
58///
59/// This abstraction wraps a set of thread pool pairs and allows to use them one at a time.
60///
61/// Each pair contains one thread pool for plotting purposes and one for replotting, this is because
62/// they'll share the same set of CPU cores in most cases, and it would be inefficient to use them
63/// concurrently.
64///
65/// For example on machine with 64 logical cores and 4 NUMA nodes it would be recommended to create
66/// 4 thread pools with 16 threads each plotting thread pool and 8 threads in each replotting thread
67/// pool, which would mean work done within thread pool is tied to CPU cores dedicated for that
68/// thread pool.
69#[derive(Debug, Clone)]
70pub struct PlottingThreadPoolManager {
71    inner: Arc<(Mutex<Inner>, Event)>,
72    thread_pool_pairs: NonZeroUsize,
73}
74
75impl PlottingThreadPoolManager {
76    /// Create new thread pool manager by instantiating `thread_pools` thread pools using
77    /// `create_thread_pool`.
78    ///
79    /// `create_thread_pool` takes one argument `thread_pool_index`.
80    pub fn new<C>(
81        create_thread_pools: C,
82        thread_pool_pairs: NonZeroUsize,
83    ) -> Result<Self, ThreadPoolBuildError>
84    where
85        C: FnMut(usize) -> Result<PlottingThreadPoolPair, ThreadPoolBuildError>,
86    {
87        let inner = Inner {
88            thread_pool_pairs: (0..thread_pool_pairs.get())
89                .map(create_thread_pools)
90                .collect::<Result<Vec<_>, _>>()?,
91        };
92
93        Ok(Self {
94            inner: Arc::new((Mutex::new(inner), Event::new())),
95            thread_pool_pairs,
96        })
97    }
98
99    /// How many thread pool pairs are being managed here
100    pub fn thread_pool_pairs(&self) -> NonZeroUsize {
101        self.thread_pool_pairs
102    }
103
104    /// Get one of inner thread pool pairs, will wait until one is available if needed
105    pub async fn get_thread_pools(&self) -> PlottingThreadPoolsGuard {
106        let (mutex, event) = &*self.inner;
107
108        let thread_pool_pair = loop {
109            let listener = event.listen();
110
111            if let Some(thread_pool_pair) = mutex.lock().thread_pool_pairs.pop() {
112                break thread_pool_pair;
113            }
114
115            listener.await;
116        };
117
118        PlottingThreadPoolsGuard {
119            inner: Arc::clone(&self.inner),
120            thread_pool_pair: Some(thread_pool_pair),
121        }
122    }
123}