| //! # Notes |
| //! |
| //! The current implementation is somewhat limited. The `Waker` is not |
| //! implemented, as at the time of writing there is no way to support to wake-up |
| //! a thread from calling `poll_oneoff`. |
| //! |
| //! Furthermore the (re/de)register functions also don't work while concurrently |
| //! polling as both registering and polling requires a lock on the |
| //! `subscriptions`. |
| //! |
| //! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't |
| //! work. However this could be implemented by use of an `Arc`. |
| //! |
| //! In summary, this only (barely) works using a single thread. |
| |
| use std::cmp::min; |
| use std::io; |
| #[cfg(all(feature = "net", debug_assertions))] |
| use std::sync::atomic::{AtomicUsize, Ordering}; |
| use std::sync::{Arc, Mutex}; |
| use std::time::Duration; |
| |
| #[cfg(feature = "net")] |
| use crate::{Interest, Token}; |
| |
| cfg_net! { |
| pub(crate) mod tcp { |
| use std::io; |
| use std::net::{self, SocketAddr}; |
| |
| pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> { |
| let (stream, addr) = listener.accept()?; |
| stream.set_nonblocking(true)?; |
| Ok((stream, addr)) |
| } |
| } |
| } |
| |
| /// Unique id for use as `SelectorId`. |
| #[cfg(all(debug_assertions, feature = "net"))] |
| static NEXT_ID: AtomicUsize = AtomicUsize::new(1); |
| |
| pub(crate) struct Selector { |
| #[cfg(all(debug_assertions, feature = "net"))] |
| id: usize, |
| /// Subscriptions (reads events) we're interested in. |
| subscriptions: Arc<Mutex<Vec<wasi::Subscription>>>, |
| } |
| |
| impl Selector { |
| pub(crate) fn new() -> io::Result<Selector> { |
| Ok(Selector { |
| #[cfg(all(debug_assertions, feature = "net"))] |
| id: NEXT_ID.fetch_add(1, Ordering::Relaxed), |
| subscriptions: Arc::new(Mutex::new(Vec::new())), |
| }) |
| } |
| |
| #[cfg(all(debug_assertions, feature = "net"))] |
| pub(crate) fn id(&self) -> usize { |
| self.id |
| } |
| |
| pub(crate) fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> { |
| events.clear(); |
| |
| let mut subscriptions = self.subscriptions.lock().unwrap(); |
| |
| // If we want to a use a timeout in the `wasi_poll_oneoff()` function |
| // we need another subscription to the list. |
| if let Some(timeout) = timeout { |
| subscriptions.push(timeout_subscription(timeout)); |
| } |
| |
| // `poll_oneoff` needs the same number of events as subscriptions. |
| let length = subscriptions.len(); |
| events.reserve(length); |
| |
| debug_assert!(events.capacity() >= length); |
| #[cfg(debug_assertions)] |
| if length == 0 { |
| warn!( |
| "calling mio::Poll::poll with empty subscriptions, this likely not what you want" |
| ); |
| } |
| |
| let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) }; |
| |
| // Remove the timeout subscription we possibly added above. |
| if timeout.is_some() { |
| let timeout_sub = subscriptions.pop(); |
| debug_assert_eq!( |
| timeout_sub.unwrap().u.tag, |
| wasi::EVENTTYPE_CLOCK.raw(), |
| "failed to remove timeout subscription" |
| ); |
| } |
| |
| drop(subscriptions); // Unlock. |
| |
| match res { |
| Ok(n_events) => { |
| // Safety: `poll_oneoff` initialises the `events` for us. |
| unsafe { events.set_len(n_events) }; |
| |
| // Remove the timeout event. |
| if timeout.is_some() { |
| if let Some(index) = events.iter().position(is_timeout_event) { |
| events.swap_remove(index); |
| } |
| } |
| |
| check_errors(&events) |
| } |
| Err(err) => Err(io_err(err)), |
| } |
| } |
| |
| pub(crate) fn try_clone(&self) -> io::Result<Selector> { |
| Ok(Selector { |
| #[cfg(all(debug_assertions, feature = "net"))] |
| id: self.id, |
| subscriptions: self.subscriptions.clone(), |
| }) |
| } |
| |
| #[cfg(feature = "net")] |
| pub(crate) fn register( |
| &self, |
| fd: wasi::Fd, |
| token: Token, |
| interests: Interest, |
| ) -> io::Result<()> { |
| let mut subscriptions = self.subscriptions.lock().unwrap(); |
| |
| if interests.is_writable() { |
| let subscription = wasi::Subscription { |
| userdata: token.0 as wasi::Userdata, |
| u: wasi::SubscriptionU { |
| tag: wasi::EVENTTYPE_FD_WRITE.raw(), |
| u: wasi::SubscriptionUU { |
| fd_write: wasi::SubscriptionFdReadwrite { |
| file_descriptor: fd, |
| }, |
| }, |
| }, |
| }; |
| subscriptions.push(subscription); |
| } |
| |
| if interests.is_readable() { |
| let subscription = wasi::Subscription { |
| userdata: token.0 as wasi::Userdata, |
| u: wasi::SubscriptionU { |
| tag: wasi::EVENTTYPE_FD_READ.raw(), |
| u: wasi::SubscriptionUU { |
| fd_read: wasi::SubscriptionFdReadwrite { |
| file_descriptor: fd, |
| }, |
| }, |
| }, |
| }; |
| subscriptions.push(subscription); |
| } |
| |
| Ok(()) |
| } |
| |
| #[cfg(feature = "net")] |
| pub(crate) fn reregister( |
| &self, |
| fd: wasi::Fd, |
| token: Token, |
| interests: Interest, |
| ) -> io::Result<()> { |
| self.deregister(fd) |
| .and_then(|()| self.register(fd, token, interests)) |
| } |
| |
| #[cfg(feature = "net")] |
| pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> { |
| let mut subscriptions = self.subscriptions.lock().unwrap(); |
| |
| let predicate = |subscription: &wasi::Subscription| { |
| // Safety: `subscription.u.tag` defines the type of the union in |
| // `subscription.u.u`. |
| match subscription.u.tag { |
| t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe { |
| subscription.u.u.fd_write.file_descriptor == fd |
| }, |
| t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe { |
| subscription.u.u.fd_read.file_descriptor == fd |
| }, |
| _ => false, |
| } |
| }; |
| |
| let mut ret = Err(io::ErrorKind::NotFound.into()); |
| |
| while let Some(index) = subscriptions.iter().position(predicate) { |
| subscriptions.swap_remove(index); |
| ret = Ok(()) |
| } |
| |
| ret |
| } |
| } |
| |
| /// Token used to a add a timeout subscription, also used in removing it again. |
| const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::max_value(); |
| |
| /// Returns a `wasi::Subscription` for `timeout`. |
| fn timeout_subscription(timeout: Duration) -> wasi::Subscription { |
| wasi::Subscription { |
| userdata: TIMEOUT_TOKEN, |
| u: wasi::SubscriptionU { |
| tag: wasi::EVENTTYPE_CLOCK.raw(), |
| u: wasi::SubscriptionUU { |
| clock: wasi::SubscriptionClock { |
| id: wasi::CLOCKID_MONOTONIC, |
| // Timestamp is in nanoseconds. |
| timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos()) |
| as wasi::Timestamp, |
| // Give the implementation another millisecond to coalesce |
| // events. |
| precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp, |
| // Zero means the `timeout` is considered relative to the |
| // current time. |
| flags: 0, |
| }, |
| }, |
| }, |
| } |
| } |
| |
| fn is_timeout_event(event: &wasi::Event) -> bool { |
| event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN |
| } |
| |
| /// Check all events for possible errors, it returns the first error found. |
| fn check_errors(events: &[Event]) -> io::Result<()> { |
| for event in events { |
| if event.error != wasi::ERRNO_SUCCESS { |
| return Err(io_err(event.error)); |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Convert `wasi::Errno` into an `io::Error`. |
| fn io_err(errno: wasi::Errno) -> io::Error { |
| // TODO: check if this is valid. |
| io::Error::from_raw_os_error(errno.raw() as i32) |
| } |
| |
| pub(crate) type Events = Vec<Event>; |
| |
| pub(crate) type Event = wasi::Event; |
| |
| pub(crate) mod event { |
| use std::fmt; |
| |
| use crate::sys::Event; |
| use crate::Token; |
| |
| pub(crate) fn token(event: &Event) -> Token { |
| Token(event.userdata as usize) |
| } |
| |
| pub(crate) fn is_readable(event: &Event) -> bool { |
| event.type_ == wasi::EVENTTYPE_FD_READ |
| } |
| |
| pub(crate) fn is_writable(event: &Event) -> bool { |
| event.type_ == wasi::EVENTTYPE_FD_WRITE |
| } |
| |
| pub(crate) fn is_error(_: &Event) -> bool { |
| // Not supported? It could be that `wasi::Event.error` could be used for |
| // this, but the docs say `error that occurred while processing the |
| // subscription request`, so it's checked in `Select::select` already. |
| false |
| } |
| |
| pub(crate) fn is_read_closed(event: &Event) -> bool { |
| event.type_ == wasi::EVENTTYPE_FD_READ |
| // Safety: checked the type of the union above. |
| && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 |
| } |
| |
| pub(crate) fn is_write_closed(event: &Event) -> bool { |
| event.type_ == wasi::EVENTTYPE_FD_WRITE |
| // Safety: checked the type of the union above. |
| && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 |
| } |
| |
| pub(crate) fn is_priority(_: &Event) -> bool { |
| // Not supported. |
| false |
| } |
| |
| pub(crate) fn is_aio(_: &Event) -> bool { |
| // Not supported. |
| false |
| } |
| |
| pub(crate) fn is_lio(_: &Event) -> bool { |
| // Not supported. |
| false |
| } |
| |
| pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { |
| debug_detail!( |
| TypeDetails(wasi::Eventtype), |
| PartialEq::eq, |
| wasi::EVENTTYPE_CLOCK, |
| wasi::EVENTTYPE_FD_READ, |
| wasi::EVENTTYPE_FD_WRITE, |
| ); |
| |
| #[allow(clippy::trivially_copy_pass_by_ref)] |
| fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool { |
| (got & want) != 0 |
| } |
| debug_detail!( |
| EventrwflagsDetails(wasi::Eventrwflags), |
| check_flag, |
| wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP, |
| ); |
| |
| struct EventFdReadwriteDetails(wasi::EventFdReadwrite); |
| |
| impl fmt::Debug for EventFdReadwriteDetails { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("EventFdReadwrite") |
| .field("nbytes", &self.0.nbytes) |
| .field("flags", &self.0.flags) |
| .finish() |
| } |
| } |
| |
| f.debug_struct("Event") |
| .field("userdata", &event.userdata) |
| .field("error", &event.error) |
| .field("type", &TypeDetails(event.type_)) |
| .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite)) |
| .finish() |
| } |
| } |
| |
| cfg_os_poll! { |
| cfg_io_source! { |
| pub(crate) struct IoSourceState; |
| |
| impl IoSourceState { |
| pub(crate) fn new() -> IoSourceState { |
| IoSourceState |
| } |
| |
| pub(crate) fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R> |
| where |
| F: FnOnce(&T) -> io::Result<R>, |
| { |
| // We don't hold state, so we can just call the function and |
| // return. |
| f(io) |
| } |
| } |
| } |
| } |