blob: f9992cfd23dda6be4222cea17208005879daff46 [file] [log] [blame]
#[cfg(feature = "parallel")]
mod stepped {
use crate::parallel::num_threads;
/// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
/// for details.
pub struct Stepwise<Reduce: super::Reduce> {
/// This field is first to assure it's dropped first and cause threads that are dropped next to stop their loops
/// as sending results fails when the receiver is dropped.
receive_result: std::sync::mpsc::Receiver<Reduce::Input>,
/// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When
/// that happens, they break out of their loops.
threads: Vec<std::thread::JoinHandle<()>>,
/// The reducer is called only in the thread using the iterator, dropping it has no side effects.
reducer: Option<Reduce>,
}
impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> {
fn drop(&mut self) {
let (_, sink) = std::sync::mpsc::channel();
drop(std::mem::replace(&mut self.receive_result, sink));
let mut last_err = None;
for handle in std::mem::take(&mut self.threads) {
if let Err(err) = handle.join() {
last_err = Some(err);
};
}
if let Some(thread_err) = last_err {
std::panic::resume_unwind(thread_err);
}
}
}
impl<Reduce: super::Reduce> Stepwise<Reduce> {
/// Instantiate a new iterator and start working in threads.
/// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>(
input: InputIter,
thread_limit: Option<usize>,
new_thread_state: ThreadStateFn,
consume: ConsumeFn,
reducer: Reduce,
) -> Self
where
InputIter: Iterator<Item = I> + Send + 'static,
ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static,
ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static,
Reduce: super::Reduce<Input = O> + 'static,
I: Send + 'static,
O: Send + 'static,
{
let num_threads = num_threads(thread_limit);
let mut threads = Vec::with_capacity(num_threads + 1);
let receive_result = {
let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads);
for thread_id in 0..num_threads {
let handle = std::thread::spawn({
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let consume = consume.clone();
move || {
let mut state = new_thread_state(thread_id);
for item in receive_input {
if send_result.send(consume(item, &mut state)).is_err() {
break;
}
}
}
});
threads.push(handle);
}
threads.push(std::thread::spawn(move || {
for item in input {
if send_input.send(item).is_err() {
break;
}
}
}));
receive_result
};
Stepwise {
threads,
receive_result,
reducer: Some(reducer),
}
}
/// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
for value in self.by_ref() {
drop(value?);
}
self.reducer
.take()
.expect("this is the last call before consumption")
.finalize()
}
}
impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> {
type Item = Result<Reduce::FeedProduce, Reduce::Error>;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
self.receive_result
.recv()
.ok()
.and_then(|input| self.reducer.as_mut().map(|r| r.feed(input)))
}
}
impl<R: super::Reduce> super::Finalize for Stepwise<R> {
type Reduce = R;
fn finalize(
self,
) -> Result<
<<Self as super::Finalize>::Reduce as super::Reduce>::Output,
<<Self as super::Finalize>::Reduce as super::Reduce>::Error,
> {
Stepwise::finalize(self)
}
}
}
#[cfg(not(feature = "parallel"))]
mod stepped {
/// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
/// for details.
pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> {
input: InputIter,
consume: ConsumeFn,
thread_state: ThreadState,
reducer: Reduce,
}
impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce>
where
InputIter: Iterator<Item = I>,
ConsumeFn: Fn(I, &mut S) -> O,
Reduce: super::Reduce<Input = O>,
{
/// Instantiate a new iterator.
/// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
pub fn new<ThreadStateFn>(
input: InputIter,
_thread_limit: Option<usize>,
new_thread_state: ThreadStateFn,
consume: ConsumeFn,
reducer: Reduce,
) -> Self
where
ThreadStateFn: Fn(usize) -> S,
{
Stepwise {
input,
consume,
thread_state: new_thread_state(0),
reducer,
}
}
/// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
for value in self.by_ref() {
drop(value?);
}
self.reducer.finalize()
}
}
impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce>
where
InputIter: Iterator<Item = I>,
ConsumeFn: Fn(I, &mut ThreadState) -> O,
Reduce: super::Reduce<Input = O>,
{
type Item = Result<Reduce::FeedProduce, Reduce::Error>;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
self.input
.next()
.map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state)))
}
}
impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R>
where
InputIter: Iterator<Item = I>,
ConsumeFn: Fn(I, &mut S) -> O,
R: super::Reduce<Input = O>,
{
type Reduce = R;
fn finalize(
self,
) -> Result<
<<Self as super::Finalize>::Reduce as super::Reduce>::Output,
<<Self as super::Finalize>::Reduce as super::Reduce>::Error,
> {
Stepwise::finalize(self)
}
}
}
use std::marker::PhantomData;
pub use stepped::Stepwise;
/// An trait for aggregating items commonly produced in threads into a single result, without itself
/// needing to be thread safe.
pub trait Reduce {
/// The type fed to the reducer in the [`feed()`][Reduce::feed()] method.
///
/// It's produced by a function that may run on multiple threads.
type Input;
/// The type produced in Ok(…) by [`feed()`][Reduce::feed()].
/// Most reducers by nature use `()` here as the value is in the aggregation.
/// However, some may use it to collect statistics only and return their Input
/// in some form as a result here for [`Stepwise`] to be useful.
type FeedProduce;
/// The type produced once by the [`finalize()`][Reduce::finalize()] method.
///
/// For traditional reducers, this is the value produced by the entire operation.
/// For those made for step-wise iteration this may be aggregated statistics.
type Output;
/// The error type to use for all methods of this trait.
type Error;
/// Called each time a new `item` was produced in order to aggregate it into the final result.
///
/// If an `Error` is returned, the entire operation will be stopped.
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>;
/// Called once once all items where passed to `feed()`, producing the final `Output` of the operation or an `Error`.
fn finalize(self) -> Result<Self::Output, Self::Error>;
}
/// An identity reducer for those who want to use [`Stepwise`] or [`in_parallel()`][crate::parallel::in_parallel()]
/// without the use of non-threaded reduction of products created in threads.
pub struct IdentityWithResult<Input, Error> {
_input: PhantomData<Input>,
_error: PhantomData<Error>,
}
impl<Input, Error> Default for IdentityWithResult<Input, Error> {
fn default() -> Self {
IdentityWithResult {
_input: Default::default(),
_error: Default::default(),
}
}
}
impl<Input, Error> Reduce for IdentityWithResult<Input, Error> {
type Input = Result<Input, Self::Error>;
type FeedProduce = Input;
type Output = ();
type Error = Error;
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
item
}
fn finalize(self) -> Result<Self::Output, Self::Error> {
Ok(())
}
}
/// A trait reflecting the `finalize()` method of [`Reduce`] implementations
pub trait Finalize {
/// An implementation of [`Reduce`]
type Reduce: self::Reduce;
/// Similar to the [`Reduce::finalize()`] method
fn finalize(
self,
) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>;
}