blob: 510235046e19550fce44b86028defb31ff779f39 [file] [log] [blame]
use super::mutex::Mutex;
use crate::os::xous::ffi::{blocking_scalar, scalar};
use crate::os::xous::services::{ticktimer_server, TicktimerScalar};
use crate::time::Duration;
use core::sync::atomic::{AtomicUsize, Ordering};
// The implementation is inspired by Andrew D. Birrell's paper
// "Implementing Condition Variables with Semaphores"
const NOTIFY_TRIES: usize = 3;
pub struct Condvar {
counter: AtomicUsize,
timed_out: AtomicUsize,
}
unsafe impl Send for Condvar {}
unsafe impl Sync for Condvar {}
impl Condvar {
#[inline]
#[rustc_const_stable(feature = "const_locks", since = "1.63.0")]
pub const fn new() -> Condvar {
Condvar { counter: AtomicUsize::new(0), timed_out: AtomicUsize::new(0) }
}
fn notify_some(&self, to_notify: usize) {
// Assumption: The Mutex protecting this condvar is locked throughout the
// entirety of this call, preventing calls to `wait` and `wait_timeout`.
// Logic check: Ensure that there aren't any missing waiters. Remove any that
// timed-out, ensuring the counter doesn't underflow.
assert!(self.timed_out.load(Ordering::Relaxed) <= self.counter.load(Ordering::Relaxed));
self.counter.fetch_sub(self.timed_out.swap(0, Ordering::Relaxed), Ordering::Relaxed);
// Figure out how many threads to notify. Note that it is impossible for `counter`
// to increase during this operation because Mutex is locked. However, it is
// possible for `counter` to decrease due to a condvar timing out, in which
// case the corresponding `timed_out` will increase accordingly.
let Ok(waiter_count) =
self.counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |counter| {
if counter == 0 {
return None;
} else {
Some(counter - counter.min(to_notify))
}
})
else {
// No threads are waiting on this condvar
return;
};
let mut remaining_to_wake = waiter_count.min(to_notify);
if remaining_to_wake == 0 {
return;
}
for _wake_tries in 0..NOTIFY_TRIES {
let result = blocking_scalar(
ticktimer_server(),
TicktimerScalar::NotifyCondition(self.index(), remaining_to_wake).into(),
)
.expect("failure to send NotifyCondition command");
// Remove the list of waiters that were notified
remaining_to_wake -= result[0];
// Also remove the number of waiters that timed out. Clamp it to 0 in order to
// ensure we don't wait forever in case the waiter woke up between the time
// we counted the remaining waiters and now.
remaining_to_wake =
remaining_to_wake.saturating_sub(self.timed_out.swap(0, Ordering::Relaxed));
if remaining_to_wake == 0 {
return;
}
crate::thread::yield_now();
}
}
pub fn notify_one(&self) {
self.notify_some(1)
}
pub fn notify_all(&self) {
self.notify_some(self.counter.load(Ordering::Relaxed))
}
fn index(&self) -> usize {
core::ptr::from_ref(self).addr()
}
/// Unlock the given Mutex and wait for the notification. Wait at most
/// `ms` milliseconds, or pass `0` to wait forever.
///
/// Returns `true` if the condition was received, `false` if it timed out
fn wait_ms(&self, mutex: &Mutex, ms: usize) -> bool {
self.counter.fetch_add(1, Ordering::Relaxed);
unsafe { mutex.unlock() };
// Threading concern: There is a chance that the `notify` thread wakes up here before
// we have a chance to wait for the condition. This is fine because we've recorded
// the fact that we're waiting by incrementing the counter.
let result = blocking_scalar(
ticktimer_server(),
TicktimerScalar::WaitForCondition(self.index(), ms).into(),
);
let awoken = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0;
// If we awoke due to a timeout, increment the `timed_out` counter so that the
// main loop of `notify` knows there's a timeout.
//
// This is done with the Mutex still unlocked, because the Mutex might still
// be locked by the `notify` process above.
if !awoken {
self.timed_out.fetch_add(1, Ordering::Relaxed);
}
unsafe { mutex.lock() };
awoken
}
pub unsafe fn wait(&self, mutex: &Mutex) {
// Wait for 0 ms, which is a special case to "wait forever"
self.wait_ms(mutex, 0);
}
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
let mut millis = dur.as_millis() as usize;
// Ensure we don't wait for 0 ms, which would cause us to wait forever
if millis == 0 {
millis = 1;
}
self.wait_ms(mutex, millis)
}
}
impl Drop for Condvar {
fn drop(&mut self) {
let remaining_count = self.counter.load(Ordering::Relaxed);
let timed_out = self.timed_out.load(Ordering::Relaxed);
assert!(
remaining_count - timed_out == 0,
"counter was {} and timed_out was {} not 0",
remaining_count,
timed_out
);
scalar(ticktimer_server(), TicktimerScalar::FreeCondition(self.index()).into()).ok();
}
}