blob: a80762badaec1f333aa85ed9bc97a3b6ee1c3e04 [file] [log] [blame]
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use crate::parallel::{num_threads, Reduce};
/// A scope to start threads within.
pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>;
/// Runs `left` and `right` in parallel, returning their output when both are done.
pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) {
std::thread::scope(|s| {
let left = std::thread::Builder::new()
.name("gitoxide.join.left".into())
.spawn_scoped(s, left)
.expect("valid name");
let right = std::thread::Builder::new()
.name("gitoxide.join.right".into())
.spawn_scoped(s, right)
.expect("valid name");
(left.join().unwrap(), right.join().unwrap())
})
}
/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
/// That way it's possible to handle threads without needing the 'static lifetime for data they interact with.
///
/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely, hence should not
/// connect each other with channels as deadlock would occur in single-threaded mode.
pub fn threads<'env, F, R>(f: F) -> R
where
F: for<'scope> FnOnce(&'scope std::thread::Scope<'scope, 'env>) -> R,
{
std::thread::scope(f)
}
/// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
pub fn build_thread() -> std::thread::Builder {
std::thread::Builder::new()
}
/// Read items from `input` and `consume` them in multiple threads,
/// whose output output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
///
/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
/// created by `new_thread_state(…)`.
/// * For `reducer`, see the [`Reduce`] trait
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(I, &mut S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
I: Send,
O: Send,
{
let num_threads = num_threads(thread_limit);
std::thread::scope(move |s| {
let receive_result = {
let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
for thread_id in 0..num_threads {
std::thread::Builder::new()
.name(format!("gitoxide.in_parallel.produce.{thread_id}"))
.spawn_scoped(s, {
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let mut consume = consume.clone();
move || {
let mut state = new_thread_state(thread_id);
for item in receive_input {
if send_result.send(consume(item, &mut state)).is_err() {
break;
}
}
}
})
.expect("valid name");
}
std::thread::Builder::new()
.name("gitoxide.in_parallel.feed".into())
.spawn_scoped(s, move || {
for item in input {
if send_input.send(item).is_err() {
break;
}
}
})
.expect("valid name");
receive_result
};
for item in receive_result {
drop(reducer.feed(item)?);
}
reducer.finalize()
})
}
/// Read items from `input` and `consume` them in multiple threads,
/// whose output output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
///
/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
/// created by `new_thread_state(…)`.
/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
/// * For `reducer`, see the [`Reduce`] trait
pub fn in_parallel_with_finalize<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(I, &mut S) -> O + Send + Clone,
finalize: impl FnOnce(S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
I: Send,
O: Send,
{
let num_threads = num_threads(thread_limit);
std::thread::scope(move |s| {
let receive_result = {
let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
for thread_id in 0..num_threads {
std::thread::Builder::new()
.name(format!("gitoxide.in_parallel.produce.{thread_id}"))
.spawn_scoped(s, {
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let mut consume = consume.clone();
let finalize = finalize.clone();
move || {
let mut state = new_thread_state(thread_id);
let mut can_send = true;
for item in receive_input {
if send_result.send(consume(item, &mut state)).is_err() {
can_send = false;
break;
}
}
if can_send {
send_result.send(finalize(state)).ok();
}
}
})
.expect("valid name");
}
std::thread::Builder::new()
.name("gitoxide.in_parallel.feed".into())
.spawn_scoped(s, move || {
for item in input {
if send_input.send(item).is_err() {
break;
}
}
})
.expect("valid name");
receive_result
};
for item in receive_result {
drop(reducer.feed(item)?);
}
reducer.finalize()
})
}
/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
/// for file-io as it won't make use of sorted inputs well.
/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast.
/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation.
/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`),
/// which allows callees to implement their own work-stealing in case the work is distributed unevenly.
/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice
/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads,
/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust
/// the thread-count accordingly.
// TODO: better docs
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration> + Send,
state_to_rval: impl FnOnce(S) -> R + Send + Clone,
) -> Result<Vec<R>, E>
where
I: Send,
E: Send,
R: Send,
{
let num_threads = num_threads(thread_limit);
let mut results = Vec::with_capacity(num_threads);
let stop_everything = &AtomicBool::default();
let index = &AtomicUsize::default();
let threads_left = &AtomicIsize::new(num_threads as isize);
std::thread::scope({
move |s| {
std::thread::Builder::new()
.name("gitoxide.in_parallel_with_slice.watch-interrupts".into())
.spawn_scoped(s, {
move || loop {
if stop_everything.load(Ordering::Relaxed) {
break;
}
match periodic() {
Some(duration) => std::thread::sleep(duration),
None => {
stop_everything.store(true, Ordering::Relaxed);
break;
}
}
}
})
.expect("valid name");
let input_len = input.len();
struct Input<I>(*mut [I])
where
I: Send;
// SAFETY: I is Send + Sync, so is a *mut [I]
#[allow(unsafe_code)]
unsafe impl<I> Send for Input<I> where I: Send {}
let threads: Vec<_> = (0..num_threads)
.map(|thread_id| {
std::thread::Builder::new()
.name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}"))
.spawn_scoped(s, {
let new_thread_state = new_thread_state.clone();
let state_to_rval = state_to_rval.clone();
let mut consume = consume.clone();
let input = Input(input as *mut [I]);
move || {
let _ = &input;
threads_left.fetch_sub(1, Ordering::SeqCst);
let mut state = new_thread_state(thread_id);
let res = (|| {
while let Ok(input_index) =
index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
(x < input_len).then_some(x + 1)
})
{
if stop_everything.load(Ordering::Relaxed) {
break;
}
// SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
// each item exactly once.
let item = {
#[allow(unsafe_code)]
unsafe {
&mut (&mut *input.0)[input_index]
}
};
if let Err(err) = consume(item, &mut state, threads_left, stop_everything) {
stop_everything.store(true, Ordering::Relaxed);
return Err(err);
}
}
Ok(state_to_rval(state))
})();
threads_left.fetch_add(1, Ordering::SeqCst);
res
}
})
.expect("valid name")
})
.collect();
for thread in threads {
match thread.join() {
Ok(res) => {
results.push(res?);
}
Err(err) => {
// a panic happened, stop the world gracefully (even though we panic later)
stop_everything.store(true, Ordering::Relaxed);
std::panic::resume_unwind(err);
}
}
}
stop_everything.store(true, Ordering::Relaxed);
Ok(results)
}
})
}