blob: 155a34d9cd402300d02b96fbcc15e38e567dec10 [file] [log] [blame]
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
/// Manages a collection of threads.
///
/// A new thread is created every time all the existing threads are full.
/// Any idle thread will automatically die after a few seconds.
pub struct TaskPool {
sharing: Arc<Sharing>,
}
struct Sharing {
// list of the tasks to be done by worker threads
todo: Mutex<VecDeque<Box<dyn FnMut() + Send>>>,
// condvar that will be notified whenever a task is added to `todo`
condvar: Condvar,
// number of total worker threads running
active_tasks: AtomicUsize,
// number of idle worker threads
waiting_tasks: AtomicUsize,
}
/// Minimum number of active threads.
static MIN_THREADS: usize = 4;
struct Registration<'a> {
nb: &'a AtomicUsize,
}
impl<'a> Registration<'a> {
fn new(nb: &'a AtomicUsize) -> Registration<'a> {
nb.fetch_add(1, Ordering::Release);
Registration { nb }
}
}
impl<'a> Drop for Registration<'a> {
fn drop(&mut self) {
self.nb.fetch_sub(1, Ordering::Release);
}
}
impl TaskPool {
pub fn new() -> TaskPool {
let pool = TaskPool {
sharing: Arc::new(Sharing {
todo: Mutex::new(VecDeque::new()),
condvar: Condvar::new(),
active_tasks: AtomicUsize::new(0),
waiting_tasks: AtomicUsize::new(0),
}),
};
for _ in 0..MIN_THREADS {
pool.add_thread(None)
}
pool
}
/// Executes a function in a thread.
/// If no thread is available, spawns a new one.
pub fn spawn(&self, code: Box<dyn FnMut() + Send>) {
let mut queue = self.sharing.todo.lock().unwrap();
if self.sharing.waiting_tasks.load(Ordering::Acquire) == 0 {
self.add_thread(Some(code));
} else {
queue.push_back(code);
self.sharing.condvar.notify_one();
}
}
fn add_thread(&self, initial_fn: Option<Box<dyn FnMut() + Send>>) {
let sharing = self.sharing.clone();
thread::spawn(move || {
let sharing = sharing;
let _active_guard = Registration::new(&sharing.active_tasks);
if let Some(mut f) = initial_fn {
f();
}
loop {
let mut task: Box<dyn FnMut() + Send> = {
let mut todo = sharing.todo.lock().unwrap();
let task;
loop {
if let Some(poped_task) = todo.pop_front() {
task = poped_task;
break;
}
let _waiting_guard = Registration::new(&sharing.waiting_tasks);
let received =
if sharing.active_tasks.load(Ordering::Acquire) <= MIN_THREADS {
todo = sharing.condvar.wait(todo).unwrap();
true
} else {
let (new_lock, waitres) = sharing
.condvar
.wait_timeout(todo, Duration::from_millis(5000))
.unwrap();
todo = new_lock;
!waitres.timed_out()
};
if !received && todo.is_empty() {
return;
}
}
task
};
task();
}
});
}
}
impl Drop for TaskPool {
fn drop(&mut self) {
self.sharing
.active_tasks
.store(999_999_999, Ordering::Release);
self.sharing.condvar.notify_all();
}
}