| //! An asynchronously awaitable `CancellationToken`. |
| //! The token allows to signal a cancellation request to one or more tasks. |
| pub(crate) mod guard; |
| mod tree_node; |
| |
| use crate::loom::sync::Arc; |
| use core::future::Future; |
| use core::pin::Pin; |
| use core::task::{Context, Poll}; |
| |
| use guard::DropGuard; |
| use pin_project_lite::pin_project; |
| |
| /// A token which can be used to signal a cancellation request to one or more |
| /// tasks. |
| /// |
| /// Tasks can call [`CancellationToken::cancelled()`] in order to |
| /// obtain a Future which will be resolved when cancellation is requested. |
| /// |
| /// Cancellation can be requested through the [`CancellationToken::cancel`] method. |
| /// |
| /// # Examples |
| /// |
| /// ```no_run |
| /// use tokio::select; |
| /// use tokio_util::sync::CancellationToken; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let token = CancellationToken::new(); |
| /// let cloned_token = token.clone(); |
| /// |
| /// let join_handle = tokio::spawn(async move { |
| /// // Wait for either cancellation or a very long time |
| /// select! { |
| /// _ = cloned_token.cancelled() => { |
| /// // The token was cancelled |
| /// 5 |
| /// } |
| /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { |
| /// 99 |
| /// } |
| /// } |
| /// }); |
| /// |
| /// tokio::spawn(async move { |
| /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; |
| /// token.cancel(); |
| /// }); |
| /// |
| /// assert_eq!(5, join_handle.await.unwrap()); |
| /// } |
| /// ``` |
| pub struct CancellationToken { |
| inner: Arc<tree_node::TreeNode>, |
| } |
| |
| pin_project! { |
| /// A Future that is resolved once the corresponding [`CancellationToken`] |
| /// is cancelled. |
| #[must_use = "futures do nothing unless polled"] |
| pub struct WaitForCancellationFuture<'a> { |
| cancellation_token: &'a CancellationToken, |
| #[pin] |
| future: tokio::sync::futures::Notified<'a>, |
| } |
| } |
| |
| // ===== impl CancellationToken ===== |
| |
| impl core::fmt::Debug for CancellationToken { |
| fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
| f.debug_struct("CancellationToken") |
| .field("is_cancelled", &self.is_cancelled()) |
| .finish() |
| } |
| } |
| |
| impl Clone for CancellationToken { |
| fn clone(&self) -> Self { |
| tree_node::increase_handle_refcount(&self.inner); |
| CancellationToken { |
| inner: self.inner.clone(), |
| } |
| } |
| } |
| |
| impl Drop for CancellationToken { |
| fn drop(&mut self) { |
| tree_node::decrease_handle_refcount(&self.inner); |
| } |
| } |
| |
| impl Default for CancellationToken { |
| fn default() -> CancellationToken { |
| CancellationToken::new() |
| } |
| } |
| |
| impl CancellationToken { |
| /// Creates a new CancellationToken in the non-cancelled state. |
| pub fn new() -> CancellationToken { |
| CancellationToken { |
| inner: Arc::new(tree_node::TreeNode::new()), |
| } |
| } |
| |
| /// Creates a `CancellationToken` which will get cancelled whenever the |
| /// current token gets cancelled. |
| /// |
| /// If the current token is already cancelled, the child token will get |
| /// returned in cancelled state. |
| /// |
| /// # Examples |
| /// |
| /// ```no_run |
| /// use tokio::select; |
| /// use tokio_util::sync::CancellationToken; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let token = CancellationToken::new(); |
| /// let child_token = token.child_token(); |
| /// |
| /// let join_handle = tokio::spawn(async move { |
| /// // Wait for either cancellation or a very long time |
| /// select! { |
| /// _ = child_token.cancelled() => { |
| /// // The token was cancelled |
| /// 5 |
| /// } |
| /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { |
| /// 99 |
| /// } |
| /// } |
| /// }); |
| /// |
| /// tokio::spawn(async move { |
| /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; |
| /// token.cancel(); |
| /// }); |
| /// |
| /// assert_eq!(5, join_handle.await.unwrap()); |
| /// } |
| /// ``` |
| pub fn child_token(&self) -> CancellationToken { |
| CancellationToken { |
| inner: tree_node::child_node(&self.inner), |
| } |
| } |
| |
| /// Cancel the [`CancellationToken`] and all child tokens which had been |
| /// derived from it. |
| /// |
| /// This will wake up all tasks which are waiting for cancellation. |
| /// |
| /// Be aware that cancellation is not an atomic operation. It is possible |
| /// for another thread running in parallel with a call to `cancel` to first |
| /// receive `true` from `is_cancelled` on one child node, and then receive |
| /// `false` from `is_cancelled` on another child node. However, once the |
| /// call to `cancel` returns, all child nodes have been fully cancelled. |
| pub fn cancel(&self) { |
| tree_node::cancel(&self.inner); |
| } |
| |
| /// Returns `true` if the `CancellationToken` is cancelled. |
| pub fn is_cancelled(&self) -> bool { |
| tree_node::is_cancelled(&self.inner) |
| } |
| |
| /// Returns a `Future` that gets fulfilled when cancellation is requested. |
| /// |
| /// The future will complete immediately if the token is already cancelled |
| /// when this method is called. |
| /// |
| /// # Cancel safety |
| /// |
| /// This method is cancel safe. |
| pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { |
| WaitForCancellationFuture { |
| cancellation_token: self, |
| future: self.inner.notified(), |
| } |
| } |
| |
| /// Creates a `DropGuard` for this token. |
| /// |
| /// Returned guard will cancel this token (and all its children) on drop |
| /// unless disarmed. |
| pub fn drop_guard(self) -> DropGuard { |
| DropGuard { inner: Some(self) } |
| } |
| } |
| |
| // ===== impl WaitForCancellationFuture ===== |
| |
| impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> { |
| fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
| f.debug_struct("WaitForCancellationFuture").finish() |
| } |
| } |
| |
| impl<'a> Future for WaitForCancellationFuture<'a> { |
| type Output = (); |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
| let mut this = self.project(); |
| loop { |
| if this.cancellation_token.is_cancelled() { |
| return Poll::Ready(()); |
| } |
| |
| // No wakeups can be lost here because there is always a call to |
| // `is_cancelled` between the creation of the future and the call to |
| // `poll`, and the code that sets the cancelled flag does so before |
| // waking the `Notified`. |
| if this.future.as_mut().poll(cx).is_pending() { |
| return Poll::Pending; |
| } |
| |
| this.future.set(this.cancellation_token.inner.notified()); |
| } |
| } |
| } |