subspace_farmer/
thread_pool_manager.rs1use event_listener::Event;
4use parking_lot::Mutex;
5use rayon::{ThreadPool, ThreadPoolBuildError};
6use std::num::NonZeroUsize;
7use std::ops::Deref;
8use std::sync::Arc;
9
10#[derive(Debug)]
12pub struct PlottingThreadPoolPair {
13 pub plotting: ThreadPool,
15 pub replotting: ThreadPool,
17}
18
19#[derive(Debug)]
20struct Inner {
21 thread_pool_pairs: Vec<PlottingThreadPoolPair>,
22}
23
24#[derive(Debug)]
27#[must_use]
28pub struct PlottingThreadPoolsGuard {
29 inner: Arc<(Mutex<Inner>, Event)>,
30 thread_pool_pair: Option<PlottingThreadPoolPair>,
31}
32
33impl Deref for PlottingThreadPoolsGuard {
34 type Target = PlottingThreadPoolPair;
35
36 #[inline]
37 fn deref(&self) -> &Self::Target {
38 self.thread_pool_pair
39 .as_ref()
40 .expect("Value exists until `Drop`; qed")
41 }
42}
43
44impl Drop for PlottingThreadPoolsGuard {
45 #[inline]
46 fn drop(&mut self) {
47 let (mutex, event) = &*self.inner;
48 mutex.lock().thread_pool_pairs.push(
49 self.thread_pool_pair
50 .take()
51 .expect("Happens only once in `Drop`; qed"),
52 );
53 event.notify_additional(1);
54 }
55}
56
57#[derive(Debug, Clone)]
70pub struct PlottingThreadPoolManager {
71 inner: Arc<(Mutex<Inner>, Event)>,
72 thread_pool_pairs: NonZeroUsize,
73}
74
75impl PlottingThreadPoolManager {
76 pub fn new<C>(
81 create_thread_pools: C,
82 thread_pool_pairs: NonZeroUsize,
83 ) -> Result<Self, ThreadPoolBuildError>
84 where
85 C: FnMut(usize) -> Result<PlottingThreadPoolPair, ThreadPoolBuildError>,
86 {
87 let inner = Inner {
88 thread_pool_pairs: (0..thread_pool_pairs.get())
89 .map(create_thread_pools)
90 .collect::<Result<Vec<_>, _>>()?,
91 };
92
93 Ok(Self {
94 inner: Arc::new((Mutex::new(inner), Event::new())),
95 thread_pool_pairs,
96 })
97 }
98
99 pub fn thread_pool_pairs(&self) -> NonZeroUsize {
101 self.thread_pool_pairs
102 }
103
104 pub async fn get_thread_pools(&self) -> PlottingThreadPoolsGuard {
106 let (mutex, event) = &*self.inner;
107
108 let thread_pool_pair = loop {
109 let listener = event.listen();
110
111 if let Some(thread_pool_pair) = mutex.lock().thread_pool_pairs.pop() {
112 break thread_pool_pair;
113 }
114
115 listener.await;
116 };
117
118 PlottingThreadPoolsGuard {
119 inner: Arc::clone(&self.inner),
120 thread_pool_pair: Some(thread_pool_pair),
121 }
122 }
123}