| #[cfg(feature = "parallel")] |
| mod stepped { |
| use crate::parallel::num_threads; |
| |
| /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel] |
| /// for details. |
| pub struct Stepwise<Reduce: super::Reduce> { |
| /// This field is first to assure it's dropped first and cause threads that are dropped next to stop their loops |
| /// as sending results fails when the receiver is dropped. |
| receive_result: std::sync::mpsc::Receiver<Reduce::Input>, |
| /// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When |
| /// that happens, they break out of their loops. |
| threads: Vec<std::thread::JoinHandle<()>>, |
| /// The reducer is called only in the thread using the iterator, dropping it has no side effects. |
| reducer: Option<Reduce>, |
| } |
| |
| impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> { |
| fn drop(&mut self) { |
| let (_, sink) = std::sync::mpsc::channel(); |
| drop(std::mem::replace(&mut self.receive_result, sink)); |
| |
| let mut last_err = None; |
| for handle in std::mem::take(&mut self.threads) { |
| if let Err(err) = handle.join() { |
| last_err = Some(err); |
| }; |
| } |
| if let Some(thread_err) = last_err { |
| std::panic::resume_unwind(thread_err); |
| } |
| } |
| } |
| |
| impl<Reduce: super::Reduce> Stepwise<Reduce> { |
| /// Instantiate a new iterator and start working in threads. |
| /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()]. |
| pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>( |
| input: InputIter, |
| thread_limit: Option<usize>, |
| new_thread_state: ThreadStateFn, |
| consume: ConsumeFn, |
| reducer: Reduce, |
| ) -> Self |
| where |
| InputIter: Iterator<Item = I> + Send + 'static, |
| ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static, |
| ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static, |
| Reduce: super::Reduce<Input = O> + 'static, |
| I: Send + 'static, |
| O: Send + 'static, |
| { |
| let num_threads = num_threads(thread_limit); |
| let mut threads = Vec::with_capacity(num_threads + 1); |
| let receive_result = { |
| let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads); |
| let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads); |
| for thread_id in 0..num_threads { |
| let handle = std::thread::spawn({ |
| let send_result = send_result.clone(); |
| let receive_input = receive_input.clone(); |
| let new_thread_state = new_thread_state.clone(); |
| let consume = consume.clone(); |
| move || { |
| let mut state = new_thread_state(thread_id); |
| for item in receive_input { |
| if send_result.send(consume(item, &mut state)).is_err() { |
| break; |
| } |
| } |
| } |
| }); |
| threads.push(handle); |
| } |
| threads.push(std::thread::spawn(move || { |
| for item in input { |
| if send_input.send(item).is_err() { |
| break; |
| } |
| } |
| })); |
| receive_result |
| }; |
| Stepwise { |
| threads, |
| receive_result, |
| reducer: Some(reducer), |
| } |
| } |
| |
| /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()]. |
| pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> { |
| for value in self.by_ref() { |
| drop(value?); |
| } |
| self.reducer |
| .take() |
| .expect("this is the last call before consumption") |
| .finalize() |
| } |
| } |
| |
| impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> { |
| type Item = Result<Reduce::FeedProduce, Reduce::Error>; |
| |
| fn next(&mut self) -> Option<<Self as Iterator>::Item> { |
| self.receive_result |
| .recv() |
| .ok() |
| .and_then(|input| self.reducer.as_mut().map(|r| r.feed(input))) |
| } |
| } |
| |
| impl<R: super::Reduce> super::Finalize for Stepwise<R> { |
| type Reduce = R; |
| |
| fn finalize( |
| self, |
| ) -> Result< |
| <<Self as super::Finalize>::Reduce as super::Reduce>::Output, |
| <<Self as super::Finalize>::Reduce as super::Reduce>::Error, |
| > { |
| Stepwise::finalize(self) |
| } |
| } |
| } |
| |
| #[cfg(not(feature = "parallel"))] |
| mod stepped { |
| /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel] |
| /// for details. |
| pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> { |
| input: InputIter, |
| consume: ConsumeFn, |
| thread_state: ThreadState, |
| reducer: Reduce, |
| } |
| |
| impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce> |
| where |
| InputIter: Iterator<Item = I>, |
| ConsumeFn: Fn(I, &mut S) -> O, |
| Reduce: super::Reduce<Input = O>, |
| { |
| /// Instantiate a new iterator. |
| /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()]. |
| pub fn new<ThreadStateFn>( |
| input: InputIter, |
| _thread_limit: Option<usize>, |
| new_thread_state: ThreadStateFn, |
| consume: ConsumeFn, |
| reducer: Reduce, |
| ) -> Self |
| where |
| ThreadStateFn: Fn(usize) -> S, |
| { |
| Stepwise { |
| input, |
| consume, |
| thread_state: new_thread_state(0), |
| reducer, |
| } |
| } |
| |
| /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()]. |
| pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> { |
| for value in self.by_ref() { |
| drop(value?); |
| } |
| self.reducer.finalize() |
| } |
| } |
| |
| impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> |
| where |
| InputIter: Iterator<Item = I>, |
| ConsumeFn: Fn(I, &mut ThreadState) -> O, |
| Reduce: super::Reduce<Input = O>, |
| { |
| type Item = Result<Reduce::FeedProduce, Reduce::Error>; |
| |
| fn next(&mut self) -> Option<<Self as Iterator>::Item> { |
| self.input |
| .next() |
| .map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state))) |
| } |
| } |
| |
| impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R> |
| where |
| InputIter: Iterator<Item = I>, |
| ConsumeFn: Fn(I, &mut S) -> O, |
| R: super::Reduce<Input = O>, |
| { |
| type Reduce = R; |
| |
| fn finalize( |
| self, |
| ) -> Result< |
| <<Self as super::Finalize>::Reduce as super::Reduce>::Output, |
| <<Self as super::Finalize>::Reduce as super::Reduce>::Error, |
| > { |
| Stepwise::finalize(self) |
| } |
| } |
| } |
| |
| use std::marker::PhantomData; |
| |
| pub use stepped::Stepwise; |
| |
| /// An trait for aggregating items commonly produced in threads into a single result, without itself |
| /// needing to be thread safe. |
| pub trait Reduce { |
| /// The type fed to the reducer in the [`feed()`][Reduce::feed()] method. |
| /// |
| /// It's produced by a function that may run on multiple threads. |
| type Input; |
| /// The type produced in Ok(…) by [`feed()`][Reduce::feed()]. |
| /// Most reducers by nature use `()` here as the value is in the aggregation. |
| /// However, some may use it to collect statistics only and return their Input |
| /// in some form as a result here for [`Stepwise`] to be useful. |
| type FeedProduce; |
| /// The type produced once by the [`finalize()`][Reduce::finalize()] method. |
| /// |
| /// For traditional reducers, this is the value produced by the entire operation. |
| /// For those made for step-wise iteration this may be aggregated statistics. |
| type Output; |
| /// The error type to use for all methods of this trait. |
| type Error; |
| /// Called each time a new `item` was produced in order to aggregate it into the final result. |
| /// |
| /// If an `Error` is returned, the entire operation will be stopped. |
| fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>; |
| /// Called once once all items where passed to `feed()`, producing the final `Output` of the operation or an `Error`. |
| fn finalize(self) -> Result<Self::Output, Self::Error>; |
| } |
| |
| /// An identity reducer for those who want to use [`Stepwise`] or [`in_parallel()`][crate::parallel::in_parallel()] |
| /// without the use of non-threaded reduction of products created in threads. |
| pub struct IdentityWithResult<Input, Error> { |
| _input: PhantomData<Input>, |
| _error: PhantomData<Error>, |
| } |
| |
| impl<Input, Error> Default for IdentityWithResult<Input, Error> { |
| fn default() -> Self { |
| IdentityWithResult { |
| _input: Default::default(), |
| _error: Default::default(), |
| } |
| } |
| } |
| |
| impl<Input, Error> Reduce for IdentityWithResult<Input, Error> { |
| type Input = Result<Input, Self::Error>; |
| type FeedProduce = Input; |
| type Output = (); |
| type Error = Error; |
| |
| fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> { |
| item |
| } |
| |
| fn finalize(self) -> Result<Self::Output, Self::Error> { |
| Ok(()) |
| } |
| } |
| |
| /// A trait reflecting the `finalize()` method of [`Reduce`] implementations |
| pub trait Finalize { |
| /// An implementation of [`Reduce`] |
| type Reduce: self::Reduce; |
| |
| /// Similar to the [`Reduce::finalize()`] method |
| fn finalize( |
| self, |
| ) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>; |
| } |