diff --git a/Cargo.toml b/Cargo.toml index 56558baef..cc3b12d9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,9 +56,8 @@ crossterm_winapi = "0.9" # [target.'cfg(unix)'.dependencies] libc = "0.2" -mio = { version = "0.8", features = ["os-poll"] } signal-hook = { version = "0.3.13" } -signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] } +filedescriptor = "0.8" # # Dev dependencies (examples, ...) diff --git a/src/event/source/unix.rs b/src/event/source/unix.rs index 8a44d5a5d..5f5a26cc2 100644 --- a/src/event/source/unix.rs +++ b/src/event/source/unix.rs @@ -1,27 +1,45 @@ -use std::{collections::VecDeque, io, time::Duration}; +use std::os::unix::prelude::AsRawFd; +use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration}; -use mio::{unix::SourceFd, Events, Interest, Poll, Token}; -use signal_hook_mio::v0_8::Signals; +use signal_hook::low_level::pipe; +use crate::event::timeout::PollTimeout; +use crate::event::Event; use crate::Result; +use filedescriptor::{poll, pollfd, POLLIN}; + #[cfg(feature = "event-stream")] use super::super::sys::Waker; -use super::super::{ - source::EventSource, - sys::unix::{ - file_descriptor::{tty_fd, FileDesc}, - parse::parse_event, + +use super::{ + super::{ + sys::unix::{ + file_descriptor::{tty_fd, FileDesc}, + parse::parse_event, + }, + InternalEvent, }, - timeout::PollTimeout, - Event, InternalEvent, + EventSource, }; -// Tokens to identify file descriptor -const TTY_TOKEN: Token = Token(0); -const SIGNAL_TOKEN: Token = Token(1); +/// Holds a prototypical Waker and a receiver we can wait on when doing select(). #[cfg(feature = "event-stream")] -const WAKE_TOKEN: Token = Token(2); +struct WakePipe { + receiver: UnixStream, + waker: Waker, +} + +#[cfg(feature = "event-stream")] +impl WakePipe { + fn new() -> Result { + let (receiver, sender) = nonblocking_unix_pair()?; + Ok(WakePipe { + receiver, + waker: Waker::new(sender), + }) + } +} // I (@zrzka) wasn't able to read more than 1_022 bytes when testing // reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes @@ -29,14 +47,19 @@ const WAKE_TOKEN: Token = Token(2); const TTY_BUFFER_SIZE: usize = 1_024; pub(crate) struct UnixInternalEventSource { - poll: Poll, - events: Events, parser: Parser, tty_buffer: [u8; TTY_BUFFER_SIZE], - tty_fd: FileDesc, - signals: Signals, + tty: FileDesc, + winch_signal_receiver: UnixStream, #[cfg(feature = "event-stream")] - waker: Waker, + wake_pipe: WakePipe, +} + +fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> { + let (receiver, sender) = UnixStream::pair()?; + receiver.set_nonblocking(true)?; + sender.set_nonblocking(true)?; + Ok((receiver, sender)) } impl UnixInternalEventSource { @@ -45,128 +68,128 @@ impl UnixInternalEventSource { } pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result { - let poll = Poll::new()?; - let registry = poll.registry(); - - let tty_raw_fd = input_fd.raw_fd(); - let mut tty_ev = SourceFd(&tty_raw_fd); - registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?; - - let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?; - registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?; - - #[cfg(feature = "event-stream")] - let waker = Waker::new(registry, WAKE_TOKEN)?; - Ok(UnixInternalEventSource { - poll, - events: Events::with_capacity(3), parser: Parser::default(), tty_buffer: [0u8; TTY_BUFFER_SIZE], - tty_fd: input_fd, - signals, + tty: input_fd, + winch_signal_receiver: { + let (receiver, sender) = nonblocking_unix_pair()?; + // Unregistering is unnecessary because EventSource is a singleton + pipe::register(libc::SIGWINCH, sender)?; + receiver + }, #[cfg(feature = "event-stream")] - waker, + wake_pipe: WakePipe::new()?, }) } } +/// read_complete reads from a non-blocking file descriptor +/// until the buffer is full or it would block. +/// +/// Similar to `std::io::Read::read_to_end`, except this function +/// only fills the given buffer and does not read beyond that. +fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result { + loop { + match fd.read(buf, buf.len()) { + Ok(x) => return Ok(x), + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => return Ok(0), + io::ErrorKind::Interrupted => continue, + _ => return Err(e), + }, + } + } +} + impl EventSource for UnixInternalEventSource { fn try_read(&mut self, timeout: Option) -> Result> { - if let Some(event) = self.parser.next() { - return Ok(Some(event)); + let timeout = PollTimeout::new(timeout); + + fn make_pollfd(fd: &F) -> pollfd { + pollfd { + fd: fd.as_raw_fd(), + events: POLLIN, + revents: 0, + } } - let timeout = PollTimeout::new(timeout); + #[cfg(not(feature = "event-stream"))] + let mut fds = [ + make_pollfd(&self.tty), + make_pollfd(&self.winch_signal_receiver), + ]; - loop { - if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) { - // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds. - // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned). - // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes - if e.kind() == io::ErrorKind::Interrupted { - continue; - } else { - return Err(e); - } - }; + #[cfg(feature = "event-stream")] + let mut fds = [ + make_pollfd(&self.tty), + make_pollfd(&self.winch_signal_receiver), + make_pollfd(&self.wake_pipe.receiver), + ]; - if self.events.is_empty() { - // No readiness events = timeout - return Ok(None); + while timeout.leftover().map_or(true, |t| !t.is_zero()) { + // check if there are buffered events from the last read + if let Some(event) = self.parser.next() { + return Ok(Some(event)); } - - for token in self.events.iter().map(|x| x.token()) { - match token { - TTY_TOKEN => { - loop { - match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) { - Ok(read_count) => { - if read_count > 0 { - self.parser.advance( - &self.tty_buffer[..read_count], - read_count == TTY_BUFFER_SIZE, - ); - } - } - Err(e) => { - // No more data to read at the moment. We will receive another event - if e.kind() == io::ErrorKind::WouldBlock { - break; - } - // once more data is available to read. - else if e.kind() == io::ErrorKind::Interrupted { - continue; - } - } - }; - - if let Some(event) = self.parser.next() { - return Ok(Some(event)); - } - } + match poll(&mut fds, timeout.leftover()) { + Err(filedescriptor::Error::Io(e)) => return Err(e), + res => res.expect("polling tty"), + }; + if fds[0].events & POLLIN != 0 { + loop { + let read_count = read_complete(&self.tty, &mut self.tty_buffer)?; + if read_count > 0 { + self.parser.advance( + &self.tty_buffer[..read_count], + read_count == TTY_BUFFER_SIZE, + ); } - SIGNAL_TOKEN => { - for signal in self.signals.pending() { - match signal { - signal_hook::consts::SIGWINCH => { - // TODO Should we remove tput? - // - // This can take a really long time, because terminal::size can - // launch new process (tput) and then it parses its output. It's - // not a really long time from the absolute time point of view, but - // it's a really long time from the mio, async-std/tokio executor, ... - // point of view. - let new_size = crate::terminal::size()?; - return Ok(Some(InternalEvent::Event(Event::Resize( - new_size.0, new_size.1, - )))); - } - _ => unreachable!("Synchronize signal registration & handling"), - }; - } + + if let Some(event) = self.parser.next() { + return Ok(Some(event)); } - #[cfg(feature = "event-stream")] - WAKE_TOKEN => { - return Err(std::io::Error::new( - std::io::ErrorKind::Interrupted, - "Poll operation was woken up by `Waker::wake`", - )); + + if read_count == 0 { + break; } - _ => unreachable!("Synchronize Evented handle registration & token handling"), } } + if fds[1].events & POLLIN != 0 { + let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false); + // drain the pipe + while read_complete(&fd, &mut [0; 1024])? != 0 {} + // TODO Should we remove tput? + // + // This can take a really long time, because terminal::size can + // launch new process (tput) and then it parses its output. It's + // not a really long time from the absolute time point of view, but + // it's a really long time from the mio, async-std/tokio executor, ... + // point of view. + let new_size = crate::terminal::size()?; + return Ok(Some(InternalEvent::Event(Event::Resize( + new_size.0, new_size.1, + )))); + } + + #[cfg(feature = "event-stream")] + if fds[2].events & POLLIN != 0 { + let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false); + // drain the pipe + while read_complete(&fd, &mut [0; 1024])? != 0 {} - // Processing above can take some time, check if timeout expired - if timeout.elapsed() { - return Ok(None); + return Err(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "Poll operation was woken up by `Waker::wake`", + )); } } + Ok(None) } #[cfg(feature = "event-stream")] fn waker(&self) -> Waker { - self.waker.clone() + self.wake_pipe.waker.clone() } } diff --git a/src/event/sys/unix/file_descriptor.rs b/src/event/sys/unix/file_descriptor.rs index 8b481766b..025845e86 100644 --- a/src/event/sys/unix/file_descriptor.rs +++ b/src/event/sys/unix/file_descriptor.rs @@ -1,6 +1,9 @@ use std::{ fs, io, - os::unix::io::{IntoRawFd, RawFd}, + os::unix::{ + io::{IntoRawFd, RawFd}, + prelude::AsRawFd, + }, }; use libc::size_t; @@ -63,6 +66,12 @@ impl Drop for FileDesc { } } +impl AsRawFd for FileDesc { + fn as_raw_fd(&self) -> RawFd { + self.raw_fd() + } +} + /// Creates a file descriptor pointing to the standard input or `/dev/tty`. pub fn tty_fd() -> Result { let (fd, close_on_drop) = if unsafe { libc::isatty(libc::STDIN_FILENO) == 1 } { diff --git a/src/event/sys/unix/waker.rs b/src/event/sys/unix/waker.rs index 4509e9ba3..47868f129 100644 --- a/src/event/sys/unix/waker.rs +++ b/src/event/sys/unix/waker.rs @@ -1,29 +1,31 @@ -use std::sync::{Arc, Mutex}; - -use mio::{Registry, Token}; +use std::{ + io::Write, + os::unix::net::UnixStream, + sync::{Arc, Mutex}, +}; use crate::Result; -/// Allows to wake up the `mio::Poll::poll()` method. -/// This type wraps `mio::Waker`, for more information see its documentation. +/// Allows to wake up the EventSource::try_read() method. #[derive(Clone, Debug)] pub(crate) struct Waker { - inner: Arc>, + inner: Arc>, } impl Waker { /// Create a new `Waker`. - pub(crate) fn new(registry: &Registry, waker_token: Token) -> Result { - Ok(Self { - inner: Arc::new(Mutex::new(mio::Waker::new(registry, waker_token)?)), - }) + pub(crate) fn new(writer: UnixStream) -> Self { + Self { + inner: Arc::new(Mutex::new(writer)), + } } /// Wake up the [`Poll`] associated with this `Waker`. /// /// Readiness is set to `Ready::readable()`. pub(crate) fn wake(&self) -> Result<()> { - self.inner.lock().unwrap().wake() + self.inner.lock().unwrap().write(&[0])?; + Ok(()) } /// Resets the state so the same waker can be reused.