pub mod ss58;
#[cfg(test)]
mod tests;
use crate::thread_pool_manager::{PlottingThreadPoolManager, PlottingThreadPoolPair};
use futures::channel::oneshot;
use futures::channel::oneshot::Canceled;
use futures::future::Either;
use rayon::{
current_thread_index, ThreadBuilder, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder,
};
use std::future::Future;
use std::num::NonZeroUsize;
use std::ops::Deref;
use std::pin::{pin, Pin};
use std::process::exit;
use std::task::{Context, Poll};
use std::{fmt, io, iter, thread};
use thread_priority::{set_current_thread_priority, ThreadPriority};
use tokio::runtime::Handle;
use tokio::task;
use tracing::{debug, warn};
const MAX_DEFAULT_FARMING_THREADS: usize = 32;
#[derive(Debug)]
pub struct AsyncJoinOnDrop<T> {
handle: Option<task::JoinHandle<T>>,
abort_on_drop: bool,
}
impl<T> Drop for AsyncJoinOnDrop<T> {
#[inline]
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
if self.abort_on_drop {
handle.abort();
}
if !handle.is_finished() {
task::block_in_place(move || {
let _ = Handle::current().block_on(handle);
});
}
}
}
}
impl<T> AsyncJoinOnDrop<T> {
#[inline]
pub fn new(handle: task::JoinHandle<T>, abort_on_drop: bool) -> Self {
Self {
handle: Some(handle),
abort_on_drop,
}
}
}
impl<T> Future for AsyncJoinOnDrop<T> {
type Output = Result<T, task::JoinError>;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(
self.handle
.as_mut()
.expect("Only dropped in Drop impl; qed"),
)
.poll(cx)
}
}
pub(crate) struct JoinOnDrop(Option<thread::JoinHandle<()>>);
impl Drop for JoinOnDrop {
#[inline]
fn drop(&mut self) {
self.0
.take()
.expect("Always called exactly once; qed")
.join()
.expect("Panic if background thread panicked");
}
}
impl JoinOnDrop {
#[inline]
pub(crate) fn new(handle: thread::JoinHandle<()>) -> Self {
Self(Some(handle))
}
}
impl Deref for JoinOnDrop {
type Target = thread::JoinHandle<()>;
#[inline]
fn deref(&self) -> &Self::Target {
self.0.as_ref().expect("Only dropped in Drop impl; qed")
}
}
pub fn run_future_in_dedicated_thread<CreateFut, Fut, T>(
create_future: CreateFut,
thread_name: String,
) -> io::Result<impl Future<Output = Result<T, Canceled>> + Send>
where
CreateFut: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = T> + 'static,
T: Send + 'static,
{
let (drop_tx, drop_rx) = oneshot::channel::<()>();
let (result_tx, result_rx) = oneshot::channel();
let handle = Handle::current();
let join_handle = thread::Builder::new().name(thread_name).spawn(move || {
let _tokio_handle_guard = handle.enter();
let future = pin!(create_future());
let result = match handle.block_on(futures::future::select(future, drop_rx)) {
Either::Left((result, _)) => result,
Either::Right(_) => {
return;
}
};
if let Err(_error) = result_tx.send(result) {
debug!(
thread_name = ?thread::current().name(),
"Future finished, but receiver was already dropped",
);
}
})?;
let join_on_drop = JoinOnDrop::new(join_handle);
Ok(async move {
let result = result_rx.await;
drop(drop_tx);
drop(join_on_drop);
result
})
}
#[derive(Clone)]
pub struct CpuCoreSet {
cores: Vec<usize>,
#[cfg(feature = "numa")]
topology: Option<std::sync::Arc<hwlocality::Topology>>,
}
impl fmt::Debug for CpuCoreSet {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut s = f.debug_struct("CpuCoreSet");
#[cfg(not(feature = "numa"))]
if self.cores.array_windows::<2>().all(|&[a, b]| a + 1 == b) {
s.field(
"cores",
&format!(
"{}-{}",
self.cores.first().expect("List of cores is not empty; qed"),
self.cores.last().expect("List of cores is not empty; qed")
),
);
} else {
s.field(
"cores",
&self
.cores
.iter()
.map(usize::to_string)
.collect::<Vec<_>>()
.join(","),
);
}
#[cfg(feature = "numa")]
{
use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::ffi::PositiveInt;
s.field(
"cores",
&CpuSet::from_iter(
self.cores.iter().map(|&core| {
PositiveInt::try_from(core).expect("Valid CPU core index; qed")
}),
),
);
}
s.finish_non_exhaustive()
}
}
impl CpuCoreSet {
pub fn cpu_cores(&self) -> &[usize] {
&self.cores
}
pub fn truncate(&mut self, num_cores: usize) {
let num_cores = num_cores.clamp(1, self.cores.len());
#[cfg(feature = "numa")]
if let Some(topology) = &self.topology {
use hwlocality::object::attributes::ObjectAttributes;
use hwlocality::object::types::ObjectType;
let mut grouped_by_l2_cache_size_and_core_count =
std::collections::HashMap::<(usize, usize), Vec<usize>>::new();
topology
.objects_with_type(ObjectType::L2Cache)
.for_each(|object| {
let l2_cache_size =
if let Some(ObjectAttributes::Cache(cache)) = object.attributes() {
cache
.size()
.map(|size| size.get() as usize)
.unwrap_or_default()
} else {
0
};
if let Some(cpuset) = object.complete_cpuset() {
let cpuset = cpuset
.into_iter()
.map(usize::from)
.filter(|core| self.cores.contains(core))
.collect::<Vec<_>>();
let cpuset_len = cpuset.len();
if !cpuset.is_empty() {
grouped_by_l2_cache_size_and_core_count
.entry((l2_cache_size, cpuset_len))
.or_default()
.extend(cpuset);
}
}
});
if grouped_by_l2_cache_size_and_core_count
.values()
.flatten()
.count()
== self.cores.len()
{
self.cores = grouped_by_l2_cache_size_and_core_count
.into_values()
.flat_map(|cores| {
let limit = cores.len() * num_cores / self.cores.len();
cores.into_iter().take(limit.max(1))
})
.collect();
self.cores.sort();
return;
}
}
self.cores.truncate(num_cores);
}
pub fn pin_current_thread(&self) {
#[cfg(feature = "numa")]
if let Some(topology) = &self.topology {
use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::current_thread_id;
use hwlocality::ffi::PositiveInt;
let cpu_cores = CpuSet::from_iter(
self.cores
.iter()
.map(|&core| PositiveInt::try_from(core).expect("Valid CPU core index; qed")),
);
if let Err(error) =
topology.bind_thread_cpu(current_thread_id(), &cpu_cores, CpuBindingFlags::empty())
{
warn!(%error, ?cpu_cores, "Failed to pin thread to CPU cores")
}
}
}
}
pub fn recommended_number_of_farming_threads() -> usize {
#[cfg(feature = "numa")]
match hwlocality::Topology::new().map(std::sync::Arc::new) {
Ok(topology) => {
return topology
.objects_at_depth(hwlocality::object::depth::Depth::NUMANode)
.filter_map(|node| node.cpuset())
.map(|cpuset| cpuset.iter_set().count())
.find(|&count| count > 0)
.unwrap_or_else(num_cpus::get)
.min(MAX_DEFAULT_FARMING_THREADS);
}
Err(error) => {
warn!(%error, "Failed to get NUMA topology");
}
}
num_cpus::get().min(MAX_DEFAULT_FARMING_THREADS)
}
pub fn all_cpu_cores() -> Vec<CpuCoreSet> {
#[cfg(feature = "numa")]
match hwlocality::Topology::new().map(std::sync::Arc::new) {
Ok(topology) => {
let cpu_cores = topology
.objects_with_type(hwlocality::object::types::ObjectType::L3Cache)
.filter_map(|node| node.cpuset())
.map(|cpuset| cpuset.iter_set().map(usize::from).collect::<Vec<_>>())
.filter(|cores| !cores.is_empty())
.map(|cores| CpuCoreSet {
cores,
topology: Some(std::sync::Arc::clone(&topology)),
})
.collect::<Vec<_>>();
if !cpu_cores.is_empty() {
return cpu_cores;
}
}
Err(error) => {
warn!(%error, "Failed to get L3 cache topology");
}
}
vec![CpuCoreSet {
cores: (0..num_cpus::get()).collect(),
#[cfg(feature = "numa")]
topology: None,
}]
}
pub fn parse_cpu_cores_sets(
s: &str,
) -> Result<Vec<CpuCoreSet>, Box<dyn std::error::Error + Send + Sync>> {
#[cfg(feature = "numa")]
let topology = hwlocality::Topology::new().map(std::sync::Arc::new).ok();
s.split(' ')
.map(|s| {
let mut cores = Vec::new();
for s in s.split(',') {
let mut parts = s.split('-');
let range_start = parts
.next()
.ok_or(
"Bad string format, must be comma separated list of CPU cores or ranges",
)?
.parse()?;
if let Some(range_end) = parts.next() {
let range_end = range_end.parse()?;
cores.extend(range_start..=range_end);
} else {
cores.push(range_start);
}
}
Ok(CpuCoreSet {
cores,
#[cfg(feature = "numa")]
topology: topology.clone(),
})
})
.collect()
}
pub fn thread_pool_core_indices(
thread_pool_size: Option<NonZeroUsize>,
thread_pools: Option<NonZeroUsize>,
) -> Vec<CpuCoreSet> {
thread_pool_core_indices_internal(all_cpu_cores(), thread_pool_size, thread_pools)
}
fn thread_pool_core_indices_internal(
all_cpu_cores: Vec<CpuCoreSet>,
thread_pool_size: Option<NonZeroUsize>,
thread_pools: Option<NonZeroUsize>,
) -> Vec<CpuCoreSet> {
#[cfg(feature = "numa")]
let topology = &all_cpu_cores
.first()
.expect("Not empty according to function description; qed")
.topology;
let thread_pools = thread_pools
.map(|thread_pools| thread_pools.get())
.or_else(|| thread_pool_size.map(|_| all_cpu_cores.len()));
if let Some(thread_pools) = thread_pools {
let mut thread_pool_core_indices = Vec::<CpuCoreSet>::with_capacity(thread_pools);
let total_cpu_cores = all_cpu_cores.iter().flat_map(|set| set.cpu_cores()).count();
if let Some(thread_pool_size) = thread_pool_size {
let mut cpu_cores_iterator = iter::repeat(
all_cpu_cores
.iter()
.flat_map(|cpu_core_set| cpu_core_set.cores.iter())
.copied(),
)
.flatten();
for _ in 0..thread_pools {
let cpu_cores = cpu_cores_iterator
.by_ref()
.take(thread_pool_size.get())
.map(|core_index| core_index % total_cpu_cores)
.collect();
thread_pool_core_indices.push(CpuCoreSet {
cores: cpu_cores,
#[cfg(feature = "numa")]
topology: topology.clone(),
});
}
} else {
let all_cpu_cores = all_cpu_cores
.iter()
.flat_map(|cpu_core_set| cpu_core_set.cores.iter())
.copied()
.collect::<Vec<_>>();
thread_pool_core_indices = all_cpu_cores
.chunks(total_cpu_cores.div_ceil(thread_pools))
.map(|cpu_cores| CpuCoreSet {
cores: cpu_cores.to_vec(),
#[cfg(feature = "numa")]
topology: topology.clone(),
})
.collect();
}
thread_pool_core_indices
} else {
all_cpu_cores
}
}
fn create_plotting_thread_pool_manager_thread_pool_pair(
thread_prefix: &'static str,
thread_pool_index: usize,
cpu_core_set: CpuCoreSet,
thread_priority: Option<ThreadPriority>,
) -> Result<ThreadPool, ThreadPoolBuildError> {
let thread_name =
move |thread_index| format!("{thread_prefix}-{thread_pool_index}.{thread_index}");
let panic_handler = move |panic_info| {
if let Some(index) = current_thread_index() {
eprintln!("panic on thread {}: {:?}", thread_name(index), panic_info);
} else {
eprintln!(
"rayon panic handler called on non-rayon thread: {:?}",
panic_info
);
}
exit(1);
};
ThreadPoolBuilder::new()
.thread_name(thread_name)
.num_threads(cpu_core_set.cpu_cores().len())
.panic_handler(panic_handler)
.spawn_handler({
let handle = Handle::current();
rayon_custom_spawn_handler(move |thread| {
let cpu_core_set = cpu_core_set.clone();
let handle = handle.clone();
move || {
cpu_core_set.pin_current_thread();
if let Some(thread_priority) = thread_priority {
if let Err(error) = set_current_thread_priority(thread_priority) {
warn!(%error, "Failed to set thread priority");
}
}
drop(cpu_core_set);
let _guard = handle.enter();
task::block_in_place(|| thread.run())
}
})
})
.build()
}
pub fn create_plotting_thread_pool_manager<I>(
mut cpu_core_sets: I,
thread_priority: Option<ThreadPriority>,
) -> Result<PlottingThreadPoolManager, ThreadPoolBuildError>
where
I: ExactSizeIterator<Item = (CpuCoreSet, CpuCoreSet)>,
{
let total_thread_pools = cpu_core_sets.len();
PlottingThreadPoolManager::new(
|thread_pool_index| {
let (plotting_cpu_core_set, replotting_cpu_core_set) = cpu_core_sets
.next()
.expect("Number of thread pools is the same as cpu core sets; qed");
Ok(PlottingThreadPoolPair {
plotting: create_plotting_thread_pool_manager_thread_pool_pair(
"plotting",
thread_pool_index,
plotting_cpu_core_set,
thread_priority,
)?,
replotting: create_plotting_thread_pool_manager_thread_pool_pair(
"replotting",
thread_pool_index,
replotting_cpu_core_set,
thread_priority,
)?,
})
},
NonZeroUsize::new(total_thread_pools)
.expect("Thread pool is guaranteed to be non-empty; qed"),
)
}
pub fn rayon_custom_spawn_handler<SpawnHandlerBuilder, SpawnHandler, SpawnHandlerResult>(
mut spawn_handler_builder: SpawnHandlerBuilder,
) -> impl FnMut(ThreadBuilder) -> io::Result<()>
where
SpawnHandlerBuilder: (FnMut(ThreadBuilder) -> SpawnHandler) + Clone,
SpawnHandler: (FnOnce() -> SpawnHandlerResult) + Send + 'static,
SpawnHandlerResult: Send + 'static,
{
move |thread: ThreadBuilder| {
let mut b = thread::Builder::new();
if let Some(name) = thread.name() {
b = b.name(name.to_owned());
}
if let Some(stack_size) = thread.stack_size() {
b = b.stack_size(stack_size);
}
b.spawn(spawn_handler_builder(thread))?;
Ok(())
}
}
pub fn tokio_rayon_spawn_handler() -> impl FnMut(ThreadBuilder) -> io::Result<()> {
let handle = Handle::current();
rayon_custom_spawn_handler(move |thread| {
let handle = handle.clone();
move || {
let _guard = handle.enter();
task::block_in_place(|| thread.run())
}
})
}