| //! Contains support for user-managed thread pools, represented by the |
| //! the [`ThreadPool`] type (see that struct for details). |
| //! |
| //! [`ThreadPool`]: struct.ThreadPool.html |
| |
| use crate::broadcast::{self, BroadcastContext}; |
| use crate::join; |
| use crate::registry::{Registry, ThreadSpawn, WorkerThread}; |
| use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; |
| use crate::spawn; |
| use crate::{scope, Scope}; |
| use crate::{scope_fifo, ScopeFifo}; |
| use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; |
| use std::error::Error; |
| use std::fmt; |
| use std::sync::Arc; |
| |
| mod test; |
| |
| /// Represents a user created [thread-pool]. |
| /// |
| /// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads |
| /// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then |
| /// execute functions explicitly within this [`ThreadPool`] using |
| /// [`ThreadPool::install()`]. By contrast, top level rayon functions |
| /// (like `join()`) will execute implicitly within the current thread-pool. |
| /// |
| /// |
| /// ## Creating a ThreadPool |
| /// |
| /// ```rust |
| /// # use rayon_core as rayon; |
| /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); |
| /// ``` |
| /// |
| /// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s |
| /// threads. In addition, any other rayon operations called inside of `install()` will also |
| /// execute in the context of the `ThreadPool`. |
| /// |
| /// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate, |
| /// they will complete executing any remaining work that you have spawned, and automatically |
| /// terminate. |
| /// |
| /// |
| /// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool |
| /// [`ThreadPool`]: struct.ThreadPool.html |
| /// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new |
| /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html |
| /// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build |
| /// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install |
| pub struct ThreadPool { |
| registry: Arc<Registry>, |
| } |
| |
| impl ThreadPool { |
| #[deprecated(note = "Use `ThreadPoolBuilder::build`")] |
| #[allow(deprecated)] |
| /// Deprecated in favor of `ThreadPoolBuilder::build`. |
| pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> { |
| Self::build(configuration.into_builder()).map_err(Box::from) |
| } |
| |
| pub(super) fn build<S>( |
| builder: ThreadPoolBuilder<S>, |
| ) -> Result<ThreadPool, ThreadPoolBuildError> |
| where |
| S: ThreadSpawn, |
| { |
| let registry = Registry::new(builder)?; |
| Ok(ThreadPool { registry }) |
| } |
| |
| /// Executes `op` within the threadpool. Any attempts to use |
| /// `join`, `scope`, or parallel iterators will then operate |
| /// within that threadpool. |
| /// |
| /// # Warning: thread-local data |
| /// |
| /// Because `op` is executing within the Rayon thread-pool, |
| /// thread-local data from the current thread will not be |
| /// accessible. |
| /// |
| /// # Panics |
| /// |
| /// If `op` should panic, that panic will be propagated. |
| /// |
| /// ## Using `install()` |
| /// |
| /// ```rust |
| /// # use rayon_core as rayon; |
| /// fn main() { |
| /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); |
| /// let n = pool.install(|| fib(20)); |
| /// println!("{}", n); |
| /// } |
| /// |
| /// fn fib(n: usize) -> usize { |
| /// if n == 0 || n == 1 { |
| /// return n; |
| /// } |
| /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool` |
| /// return a + b; |
| /// } |
| /// ``` |
| pub fn install<OP, R>(&self, op: OP) -> R |
| where |
| OP: FnOnce() -> R + Send, |
| R: Send, |
| { |
| self.registry.in_worker(|_, _| op()) |
| } |
| |
| /// Executes `op` within every thread in the threadpool. Any attempts to use |
| /// `join`, `scope`, or parallel iterators will then operate within that |
| /// threadpool. |
| /// |
| /// Broadcasts are executed on each thread after they have exhausted their |
| /// local work queue, before they attempt work-stealing from other threads. |
| /// The goal of that strategy is to run everywhere in a timely manner |
| /// *without* being too disruptive to current work. There may be alternative |
| /// broadcast styles added in the future for more or less aggressive |
| /// injection, if the need arises. |
| /// |
| /// # Warning: thread-local data |
| /// |
| /// Because `op` is executing within the Rayon thread-pool, |
| /// thread-local data from the current thread will not be |
| /// accessible. |
| /// |
| /// # Panics |
| /// |
| /// If `op` should panic on one or more threads, exactly one panic |
| /// will be propagated, only after all threads have completed |
| /// (or panicked) their own `op`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// # use rayon_core as rayon; |
| /// use std::sync::atomic::{AtomicUsize, Ordering}; |
| /// |
| /// fn main() { |
| /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap(); |
| /// |
| /// // The argument gives context, including the index of each thread. |
| /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index()); |
| /// assert_eq!(v, &[0, 1, 4, 9, 16]); |
| /// |
| /// // The closure can reference the local stack |
| /// let count = AtomicUsize::new(0); |
| /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed)); |
| /// assert_eq!(count.into_inner(), 5); |
| /// } |
| /// ``` |
| pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R> |
| where |
| OP: Fn(BroadcastContext<'_>) -> R + Sync, |
| R: Send, |
| { |
| // We assert that `self.registry` has not terminated. |
| unsafe { broadcast::broadcast_in(op, &self.registry) } |
| } |
| |
| /// Returns the (current) number of threads in the thread pool. |
| /// |
| /// # Future compatibility note |
| /// |
| /// Note that unless this thread-pool was created with a |
| /// [`ThreadPoolBuilder`] that specifies the number of threads, |
| /// then this number may vary over time in future versions (see [the |
| /// `num_threads()` method for details][snt]). |
| /// |
| /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
| /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html |
| #[inline] |
| pub fn current_num_threads(&self) -> usize { |
| self.registry.num_threads() |
| } |
| |
| /// If called from a Rayon worker thread in this thread-pool, |
| /// returns the index of that thread; if not called from a Rayon |
| /// thread, or called from a Rayon thread that belongs to a |
| /// different thread-pool, returns `None`. |
| /// |
| /// The index for a given thread will not change over the thread's |
| /// lifetime. However, multiple threads may share the same index if |
| /// they are in distinct thread-pools. |
| /// |
| /// # Future compatibility note |
| /// |
| /// Currently, every thread-pool (including the global |
| /// thread-pool) has a fixed number of threads, but this may |
| /// change in future Rayon versions (see [the `num_threads()` method |
| /// for details][snt]). In that case, the index for a |
| /// thread would not change during its lifetime, but thread |
| /// indices may wind up being reused if threads are terminated and |
| /// restarted. |
| /// |
| /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
| #[inline] |
| pub fn current_thread_index(&self) -> Option<usize> { |
| let curr = self.registry.current_thread()?; |
| Some(curr.index()) |
| } |
| |
| /// Returns true if the current worker thread currently has "local |
| /// tasks" pending. This can be useful as part of a heuristic for |
| /// deciding whether to spawn a new task or execute code on the |
| /// current thread, particularly in breadth-first |
| /// schedulers. However, keep in mind that this is an inherently |
| /// racy check, as other worker threads may be actively "stealing" |
| /// tasks from our local deque. |
| /// |
| /// **Background:** Rayon's uses a [work-stealing] scheduler. The |
| /// key idea is that each thread has its own [deque] of |
| /// tasks. Whenever a new task is spawned -- whether through |
| /// `join()`, `Scope::spawn()`, or some other means -- that new |
| /// task is pushed onto the thread's *local* deque. Worker threads |
| /// have a preference for executing their own tasks; if however |
| /// they run out of tasks, they will go try to "steal" tasks from |
| /// other threads. This function therefore has an inherent race |
| /// with other active worker threads, which may be removing items |
| /// from the local deque. |
| /// |
| /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing |
| /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue |
| #[inline] |
| pub fn current_thread_has_pending_tasks(&self) -> Option<bool> { |
| let curr = self.registry.current_thread()?; |
| Some(!curr.local_deque_is_empty()) |
| } |
| |
| /// Execute `oper_a` and `oper_b` in the thread-pool and return |
| /// the results. Equivalent to `self.install(|| join(oper_a, |
| /// oper_b))`. |
| pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) |
| where |
| A: FnOnce() -> RA + Send, |
| B: FnOnce() -> RB + Send, |
| RA: Send, |
| RB: Send, |
| { |
| self.install(|| join(oper_a, oper_b)) |
| } |
| |
| /// Creates a scope that executes within this thread-pool. |
| /// Equivalent to `self.install(|| scope(...))`. |
| /// |
| /// See also: [the `scope()` function][scope]. |
| /// |
| /// [scope]: fn.scope.html |
| pub fn scope<'scope, OP, R>(&self, op: OP) -> R |
| where |
| OP: FnOnce(&Scope<'scope>) -> R + Send, |
| R: Send, |
| { |
| self.install(|| scope(op)) |
| } |
| |
| /// Creates a scope that executes within this thread-pool. |
| /// Spawns from the same thread are prioritized in relative FIFO order. |
| /// Equivalent to `self.install(|| scope_fifo(...))`. |
| /// |
| /// See also: [the `scope_fifo()` function][scope_fifo]. |
| /// |
| /// [scope_fifo]: fn.scope_fifo.html |
| pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R |
| where |
| OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, |
| R: Send, |
| { |
| self.install(|| scope_fifo(op)) |
| } |
| |
| /// Creates a scope that spawns work into this thread-pool. |
| /// |
| /// See also: [the `in_place_scope()` function][in_place_scope]. |
| /// |
| /// [in_place_scope]: fn.in_place_scope.html |
| pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R |
| where |
| OP: FnOnce(&Scope<'scope>) -> R, |
| { |
| do_in_place_scope(Some(&self.registry), op) |
| } |
| |
| /// Creates a scope that spawns work into this thread-pool in FIFO order. |
| /// |
| /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo]. |
| /// |
| /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html |
| pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R |
| where |
| OP: FnOnce(&ScopeFifo<'scope>) -> R, |
| { |
| do_in_place_scope_fifo(Some(&self.registry), op) |
| } |
| |
| /// Spawns an asynchronous task in this thread-pool. This task will |
| /// run in the implicit, global scope, which means that it may outlast |
| /// the current stack frame -- therefore, it cannot capture any references |
| /// onto the stack (you will likely need a `move` closure). |
| /// |
| /// See also: [the `spawn()` function defined on scopes][spawn]. |
| /// |
| /// [spawn]: struct.Scope.html#method.spawn |
| pub fn spawn<OP>(&self, op: OP) |
| where |
| OP: FnOnce() + Send + 'static, |
| { |
| // We assert that `self.registry` has not terminated. |
| unsafe { spawn::spawn_in(op, &self.registry) } |
| } |
| |
| /// Spawns an asynchronous task in this thread-pool. This task will |
| /// run in the implicit, global scope, which means that it may outlast |
| /// the current stack frame -- therefore, it cannot capture any references |
| /// onto the stack (you will likely need a `move` closure). |
| /// |
| /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo]. |
| /// |
| /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo |
| pub fn spawn_fifo<OP>(&self, op: OP) |
| where |
| OP: FnOnce() + Send + 'static, |
| { |
| // We assert that `self.registry` has not terminated. |
| unsafe { spawn::spawn_fifo_in(op, &self.registry) } |
| } |
| |
| /// Spawns an asynchronous task on every thread in this thread-pool. This task |
| /// will run in the implicit, global scope, which means that it may outlast the |
| /// current stack frame -- therefore, it cannot capture any references onto the |
| /// stack (you will likely need a `move` closure). |
| pub fn spawn_broadcast<OP>(&self, op: OP) |
| where |
| OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, |
| { |
| // We assert that `self.registry` has not terminated. |
| unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } |
| } |
| |
| /// Cooperatively yields execution to Rayon. |
| /// |
| /// This is similar to the general [`yield_now()`], but only if the current |
| /// thread is part of *this* thread pool. |
| /// |
| /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
| /// nothing was available, or `None` if the current thread is not part this pool. |
| pub fn yield_now(&self) -> Option<Yield> { |
| let curr = self.registry.current_thread()?; |
| Some(curr.yield_now()) |
| } |
| |
| /// Cooperatively yields execution to local Rayon work. |
| /// |
| /// This is similar to the general [`yield_local()`], but only if the current |
| /// thread is part of *this* thread pool. |
| /// |
| /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
| /// nothing was available, or `None` if the current thread is not part this pool. |
| pub fn yield_local(&self) -> Option<Yield> { |
| let curr = self.registry.current_thread()?; |
| Some(curr.yield_local()) |
| } |
| } |
| |
| impl Drop for ThreadPool { |
| fn drop(&mut self) { |
| self.registry.terminate(); |
| } |
| } |
| |
| impl fmt::Debug for ThreadPool { |
| fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| fmt.debug_struct("ThreadPool") |
| .field("num_threads", &self.current_num_threads()) |
| .field("id", &self.registry.id()) |
| .finish() |
| } |
| } |
| |
| /// If called from a Rayon worker thread, returns the index of that |
| /// thread within its current pool; if not called from a Rayon thread, |
| /// returns `None`. |
| /// |
| /// The index for a given thread will not change over the thread's |
| /// lifetime. However, multiple threads may share the same index if |
| /// they are in distinct thread-pools. |
| /// |
| /// See also: [the `ThreadPool::current_thread_index()` method]. |
| /// |
| /// [m]: struct.ThreadPool.html#method.current_thread_index |
| /// |
| /// # Future compatibility note |
| /// |
| /// Currently, every thread-pool (including the global |
| /// thread-pool) has a fixed number of threads, but this may |
| /// change in future Rayon versions (see [the `num_threads()` method |
| /// for details][snt]). In that case, the index for a |
| /// thread would not change during its lifetime, but thread |
| /// indices may wind up being reused if threads are terminated and |
| /// restarted. |
| /// |
| /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
| #[inline] |
| pub fn current_thread_index() -> Option<usize> { |
| unsafe { |
| let curr = WorkerThread::current().as_ref()?; |
| Some(curr.index()) |
| } |
| } |
| |
| /// If called from a Rayon worker thread, indicates whether that |
| /// thread's local deque still has pending tasks. Otherwise, returns |
| /// `None`. For more information, see [the |
| /// `ThreadPool::current_thread_has_pending_tasks()` method][m]. |
| /// |
| /// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks |
| #[inline] |
| pub fn current_thread_has_pending_tasks() -> Option<bool> { |
| unsafe { |
| let curr = WorkerThread::current().as_ref()?; |
| Some(!curr.local_deque_is_empty()) |
| } |
| } |
| |
| /// Cooperatively yields execution to Rayon. |
| /// |
| /// If the current thread is part of a rayon thread pool, this looks for a |
| /// single unit of pending work in the pool, then executes it. Completion of |
| /// that work might include nested work or further work stealing. |
| /// |
| /// This is similar to [`std::thread::yield_now()`], but does not literally make |
| /// that call. If you are implementing a polling loop, you may want to also |
| /// yield to the OS scheduler yourself if no Rayon work was found. |
| /// |
| /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
| /// nothing was available, or `None` if this thread is not part of any pool at all. |
| pub fn yield_now() -> Option<Yield> { |
| unsafe { |
| let thread = WorkerThread::current().as_ref()?; |
| Some(thread.yield_now()) |
| } |
| } |
| |
| /// Cooperatively yields execution to local Rayon work. |
| /// |
| /// If the current thread is part of a rayon thread pool, this looks for a |
| /// single unit of pending work in this thread's queue, then executes it. |
| /// Completion of that work might include nested work or further work stealing. |
| /// |
| /// This is similar to [`yield_now()`], but does not steal from other threads. |
| /// |
| /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
| /// nothing was available, or `None` if this thread is not part of any pool at all. |
| pub fn yield_local() -> Option<Yield> { |
| unsafe { |
| let thread = WorkerThread::current().as_ref()?; |
| Some(thread.yield_local()) |
| } |
| } |
| |
| /// Result of [`yield_now()`] or [`yield_local()`]. |
| #[derive(Clone, Copy, Debug, PartialEq, Eq)] |
| pub enum Yield { |
| /// Work was found and executed. |
| Executed, |
| /// No available work was found. |
| Idle, |
| } |