| //! A simple crate for executing work on a thread pool, and getting back a |
| //! future. |
| //! |
| //! This crate provides a simple thread pool abstraction for running work |
| //! externally from the current thread that's running. An instance of `Future` |
| //! is handed back to represent that the work may be done later, and further |
| //! computations can be chained along with it as well. |
| //! |
| //! ```rust |
| //! extern crate futures; |
| //! extern crate futures_cpupool; |
| //! |
| //! use futures::Future; |
| //! use futures_cpupool::CpuPool; |
| //! |
| //! # fn long_running_future(a: u32) -> Box<futures::future::Future<Item = u32, Error = ()> + Send> { |
| //! # Box::new(futures::future::result(Ok(a))) |
| //! # } |
| //! # fn main() { |
| //! |
| //! // Create a worker thread pool with four threads |
| //! let pool = CpuPool::new(4); |
| //! |
| //! // Execute some work on the thread pool, optionally closing over data. |
| //! let a = pool.spawn(long_running_future(2)); |
| //! let b = pool.spawn(long_running_future(100)); |
| //! |
| //! // Express some further computation once the work is completed on the thread |
| //! // pool. |
| //! let c = a.join(b).map(|(a, b)| a + b).wait().unwrap(); |
| //! |
| //! // Print out the result |
| //! println!("{:?}", c); |
| //! # } |
| //! ``` |
| |
| #![deny(missing_docs)] |
| #![deny(missing_debug_implementations)] |
| |
| extern crate futures; |
| extern crate num_cpus; |
| |
| use std::panic::{self, AssertUnwindSafe}; |
| use std::sync::{Arc, Mutex}; |
| use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
| use std::sync::mpsc; |
| use std::thread; |
| use std::fmt; |
| |
| use futures::{IntoFuture, Future, Poll, Async}; |
| use futures::future::{lazy, Executor, ExecuteError}; |
| use futures::sync::oneshot::{channel, Sender, Receiver}; |
| use futures::executor::{self, Run, Executor as OldExecutor}; |
| |
| /// A thread pool intended to run CPU intensive work. |
| /// |
| /// This thread pool will hand out futures representing the completed work |
| /// that happens on the thread pool itself, and the futures can then be later |
| /// composed with other work as part of an overall computation. |
| /// |
| /// The worker threads associated with a thread pool are kept alive so long as |
| /// there is an open handle to the `CpuPool` or there is work running on them. Once |
| /// all work has been drained and all references have gone away the worker |
| /// threads will be shut down. |
| /// |
| /// Currently `CpuPool` implements `Clone` which just clones a new reference to |
| /// the underlying thread pool. |
| /// |
| /// **Note:** if you use CpuPool inside a library it's better accept a |
| /// `Builder` object for thread configuration rather than configuring just |
| /// pool size. This not only future proof for other settings but also allows |
| /// user to attach monitoring tools to lifecycle hooks. |
| pub struct CpuPool { |
| inner: Arc<Inner>, |
| } |
| |
| /// Thread pool configuration object |
| /// |
| /// Builder starts with a number of workers equal to the number |
| /// of CPUs on the host. But you can change it until you call `create()`. |
| pub struct Builder { |
| pool_size: usize, |
| stack_size: usize, |
| name_prefix: Option<String>, |
| after_start: Option<Arc<Fn() + Send + Sync>>, |
| before_stop: Option<Arc<Fn() + Send + Sync>>, |
| } |
| |
| struct MySender<F, T> { |
| fut: F, |
| tx: Option<Sender<T>>, |
| keep_running_flag: Arc<AtomicBool>, |
| } |
| |
| trait AssertSendSync: Send + Sync {} |
| impl AssertSendSync for CpuPool {} |
| |
| struct Inner { |
| tx: Mutex<mpsc::Sender<Message>>, |
| rx: Mutex<mpsc::Receiver<Message>>, |
| cnt: AtomicUsize, |
| size: usize, |
| } |
| |
| impl fmt::Debug for CpuPool { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("CpuPool") |
| .field("size", &self.inner.size) |
| .finish() |
| } |
| } |
| |
| impl fmt::Debug for Builder { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("Builder") |
| .field("pool_size", &self.pool_size) |
| .field("name_prefix", &self.name_prefix) |
| .finish() |
| } |
| } |
| |
| /// The type of future returned from the `CpuPool::spawn` function, which |
| /// proxies the futures running on the thread pool. |
| /// |
| /// This future will resolve in the same way as the underlying future, and it |
| /// will propagate panics. |
| #[must_use] |
| #[derive(Debug)] |
| pub struct CpuFuture<T, E> { |
| inner: Receiver<thread::Result<Result<T, E>>>, |
| keep_running_flag: Arc<AtomicBool>, |
| } |
| |
| enum Message { |
| Run(Run), |
| Close, |
| } |
| |
| impl CpuPool { |
| /// Creates a new thread pool with `size` worker threads associated with it. |
| /// |
| /// The returned handle can use `execute` to run work on this thread pool, |
| /// and clones can be made of it to get multiple references to the same |
| /// thread pool. |
| /// |
| /// This is a shortcut for: |
| /// |
| /// ```rust |
| /// # use futures_cpupool::{Builder, CpuPool}; |
| /// # |
| /// # fn new(size: usize) -> CpuPool { |
| /// Builder::new().pool_size(size).create() |
| /// # } |
| /// ``` |
| /// |
| /// # Panics |
| /// |
| /// Panics if `size == 0`. |
| pub fn new(size: usize) -> CpuPool { |
| Builder::new().pool_size(size).create() |
| } |
| |
| /// Creates a new thread pool with a number of workers equal to the number |
| /// of CPUs on the host. |
| /// |
| /// This is a shortcut for: |
| /// |
| /// ```rust |
| /// # use futures_cpupool::{Builder, CpuPool}; |
| /// # |
| /// # fn new_num_cpus() -> CpuPool { |
| /// Builder::new().create() |
| /// # } |
| /// ``` |
| pub fn new_num_cpus() -> CpuPool { |
| Builder::new().create() |
| } |
| |
| /// Spawns a future to run on this thread pool, returning a future |
| /// representing the produced value. |
| /// |
| /// This function will execute the future `f` on the associated thread |
| /// pool, and return a future representing the finished computation. The |
| /// returned future serves as a proxy to the computation that `F` is |
| /// running. |
| /// |
| /// To simply run an arbitrary closure on a thread pool and extract the |
| /// result, you can use the `future::lazy` combinator to defer work to |
| /// executing on the thread pool itself. |
| /// |
| /// Note that if the future `f` panics it will be caught by default and the |
| /// returned future will propagate the panic. That is, panics will not tear |
| /// down the thread pool and will be propagated to the returned future's |
| /// `poll` method if queried. |
| /// |
| /// If the returned future is dropped then this `CpuPool` will attempt to |
| /// cancel the computation, if possible. That is, if the computation is in |
| /// the middle of working, it will be interrupted when possible. |
| pub fn spawn<F>(&self, f: F) -> CpuFuture<F::Item, F::Error> |
| where F: Future + Send + 'static, |
| F::Item: Send + 'static, |
| F::Error: Send + 'static, |
| { |
| let (tx, rx) = channel(); |
| let keep_running_flag = Arc::new(AtomicBool::new(false)); |
| // AssertUnwindSafe is used here because `Send + 'static` is basically |
| // an alias for an implementation of the `UnwindSafe` trait but we can't |
| // express that in the standard library right now. |
| let sender = MySender { |
| fut: AssertUnwindSafe(f).catch_unwind(), |
| tx: Some(tx), |
| keep_running_flag: keep_running_flag.clone(), |
| }; |
| executor::spawn(sender).execute(self.inner.clone()); |
| CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() } |
| } |
| |
| /// Spawns a closure on this thread pool. |
| /// |
| /// This function is a convenience wrapper around the `spawn` function above |
| /// for running a closure wrapped in `future::lazy`. It will spawn the |
| /// function `f` provided onto the thread pool, and continue to run the |
| /// future returned by `f` on the thread pool as well. |
| /// |
| /// The returned future will be a handle to the result produced by the |
| /// future that `f` returns. |
| pub fn spawn_fn<F, R>(&self, f: F) -> CpuFuture<R::Item, R::Error> |
| where F: FnOnce() -> R + Send + 'static, |
| R: IntoFuture + 'static, |
| R::Future: Send + 'static, |
| R::Item: Send + 'static, |
| R::Error: Send + 'static, |
| { |
| self.spawn(lazy(f)) |
| } |
| } |
| |
| impl<F> Executor<F> for CpuPool |
| where F: Future<Item = (), Error = ()> + Send + 'static, |
| { |
| fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { |
| executor::spawn(future).execute(self.inner.clone()); |
| Ok(()) |
| } |
| } |
| |
| impl Inner { |
| fn send(&self, msg: Message) { |
| self.tx.lock().unwrap().send(msg).unwrap(); |
| } |
| |
| fn work(&self, after_start: Option<Arc<Fn() + Send + Sync>>, before_stop: Option<Arc<Fn() + Send + Sync>>) { |
| after_start.map(|fun| fun()); |
| loop { |
| let msg = self.rx.lock().unwrap().recv().unwrap(); |
| match msg { |
| Message::Run(r) => r.run(), |
| Message::Close => break, |
| } |
| } |
| before_stop.map(|fun| fun()); |
| } |
| } |
| |
| impl Clone for CpuPool { |
| fn clone(&self) -> CpuPool { |
| self.inner.cnt.fetch_add(1, Ordering::Relaxed); |
| CpuPool { inner: self.inner.clone() } |
| } |
| } |
| |
| impl Drop for CpuPool { |
| fn drop(&mut self) { |
| if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) == 1 { |
| for _ in 0..self.inner.size { |
| self.inner.send(Message::Close); |
| } |
| } |
| } |
| } |
| |
| impl OldExecutor for Inner { |
| fn execute(&self, run: Run) { |
| self.send(Message::Run(run)) |
| } |
| } |
| |
| impl<T, E> CpuFuture<T, E> { |
| /// Drop this future without canceling the underlying future. |
| /// |
| /// When `CpuFuture` is dropped, `CpuPool` will try to abort the underlying |
| /// future. This function can be used when user wants to drop but keep |
| /// executing the underlying future. |
| pub fn forget(self) { |
| self.keep_running_flag.store(true, Ordering::SeqCst); |
| } |
| } |
| |
| impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> { |
| type Item = T; |
| type Error = E; |
| |
| fn poll(&mut self) -> Poll<T, E> { |
| match self.inner.poll().expect("cannot poll CpuFuture twice") { |
| Async::Ready(Ok(Ok(e))) => Ok(e.into()), |
| Async::Ready(Ok(Err(e))) => Err(e), |
| Async::Ready(Err(e)) => panic::resume_unwind(e), |
| Async::NotReady => Ok(Async::NotReady), |
| } |
| } |
| } |
| |
| impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> { |
| type Item = (); |
| type Error = (); |
| |
| fn poll(&mut self) -> Poll<(), ()> { |
| if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() { |
| if !self.keep_running_flag.load(Ordering::SeqCst) { |
| // Cancelled, bail out |
| return Ok(().into()) |
| } |
| } |
| |
| let res = match self.fut.poll() { |
| Ok(Async::Ready(e)) => Ok(e), |
| Ok(Async::NotReady) => return Ok(Async::NotReady), |
| Err(e) => Err(e), |
| }; |
| |
| // if the receiving end has gone away then that's ok, we just ignore the |
| // send error here. |
| drop(self.tx.take().unwrap().send(res)); |
| Ok(Async::Ready(())) |
| } |
| } |
| |
| impl Builder { |
| /// Create a builder a number of workers equal to the number |
| /// of CPUs on the host. |
| pub fn new() -> Builder { |
| Builder { |
| pool_size: num_cpus::get(), |
| stack_size: 0, |
| name_prefix: None, |
| after_start: None, |
| before_stop: None, |
| } |
| } |
| |
| /// Set size of a future CpuPool |
| /// |
| /// The size of a thread pool is the number of worker threads spawned |
| pub fn pool_size(&mut self, size: usize) -> &mut Self { |
| self.pool_size = size; |
| self |
| } |
| |
| /// Set stack size of threads in the pool. |
| pub fn stack_size(&mut self, stack_size: usize) -> &mut Self { |
| self.stack_size = stack_size; |
| self |
| } |
| |
| /// Set thread name prefix of a future CpuPool |
| /// |
| /// Thread name prefix is used for generating thread names. For example, if prefix is |
| /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc. |
| pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self { |
| self.name_prefix = Some(name_prefix.into()); |
| self |
| } |
| |
| /// Execute function `f` right after each thread is started but before |
| /// running any jobs on it. |
| /// |
| /// This is initially intended for bookkeeping and monitoring uses. |
| /// The `f` will be deconstructed after the `builder` is deconstructed |
| /// and all threads in the pool has executed it. |
| pub fn after_start<F>(&mut self, f: F) -> &mut Self |
| where F: Fn() + Send + Sync + 'static |
| { |
| self.after_start = Some(Arc::new(f)); |
| self |
| } |
| |
| /// Execute function `f` before each worker thread stops. |
| /// |
| /// This is initially intended for bookkeeping and monitoring uses. |
| /// The `f` will be deconstructed after the `builder` is deconstructed |
| /// and all threads in the pool has executed it. |
| pub fn before_stop<F>(&mut self, f: F) -> &mut Self |
| where F: Fn() + Send + Sync + 'static |
| { |
| self.before_stop = Some(Arc::new(f)); |
| self |
| } |
| |
| /// Create CpuPool with configured parameters |
| /// |
| /// # Panics |
| /// |
| /// Panics if `pool_size == 0`. |
| pub fn create(&mut self) -> CpuPool { |
| let (tx, rx) = mpsc::channel(); |
| let pool = CpuPool { |
| inner: Arc::new(Inner { |
| tx: Mutex::new(tx), |
| rx: Mutex::new(rx), |
| cnt: AtomicUsize::new(1), |
| size: self.pool_size, |
| }), |
| }; |
| assert!(self.pool_size > 0); |
| |
| for counter in 0..self.pool_size { |
| let inner = pool.inner.clone(); |
| let after_start = self.after_start.clone(); |
| let before_stop = self.before_stop.clone(); |
| let mut thread_builder = thread::Builder::new(); |
| if let Some(ref name_prefix) = self.name_prefix { |
| thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter)); |
| } |
| if self.stack_size > 0 { |
| thread_builder = thread_builder.stack_size(self.stack_size); |
| } |
| thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap(); |
| } |
| return pool |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use std::sync::mpsc; |
| |
| #[test] |
| fn test_drop_after_start() { |
| let (tx, rx) = mpsc::sync_channel(2); |
| let _cpu_pool = Builder::new() |
| .pool_size(2) |
| .after_start(move || tx.send(1).unwrap()).create(); |
| |
| // After Builder is deconstructed, the tx should be droped |
| // so that we can use rx as an iterator. |
| let count = rx.into_iter().count(); |
| assert_eq!(count, 2); |
| } |
| } |