Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace mio polling with filedescriptor's poll() #735

Merged
merged 4 commits into from Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Expand Up @@ -29,6 +29,7 @@ all-features = true
default = ["bracketed-paste"]
bracketed-paste = []
event-stream = ["futures-core"]
use-dev-tty = ["filedescriptor"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont we want to make a freature flag: mio and refer to the mio dependency, and make this enabled by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which would you like to be enabled by default?

I think default features are a bit of a pain to deal with since you can't disable them individually, so if we keep mio as the default by making it a default feature it would be troublesome for users to opt out, as well as possibly breaking. this seemed cleaner since it makes the new behavior opt in and the feature can be deprecated easily later. Also it's intended to include #407.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. So then we temp can not opt out from mio but can opt in for the new feature.


#
# Shared dependencies
Expand Down Expand Up @@ -56,8 +57,9 @@ crossterm_winapi = "0.9"
#
[target.'cfg(unix)'.dependencies]
libc = "0.2"
mio = { version = "0.8", features = ["os-poll"] }
signal-hook = { version = "0.3.13" }
filedescriptor = { version = "0.8", optional = true }
mio = { version = "0.8", features = ["os-poll"] }
signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] }

#
Expand Down
246 changes: 8 additions & 238 deletions src/event/source/unix.rs
@@ -1,241 +1,11 @@
use std::{collections::VecDeque, io, time::Duration};
#[cfg(feature = "use-dev-tty")]
pub(crate) mod tty;

use mio::{unix::SourceFd, Events, Interest, Poll, Token};
use signal_hook_mio::v0_8::Signals;
#[cfg(not(feature = "use-dev-tty"))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then here we can use the mio feature flag

pub(crate) mod mio;

use crate::Result;
#[cfg(feature = "use-dev-tty")]
pub(crate) use self::tty::UnixInternalEventSource;

#[cfg(feature = "event-stream")]
use super::super::sys::Waker;
use super::super::{
source::EventSource,
sys::unix::{
file_descriptor::{tty_fd, FileDesc},
parse::parse_event,
},
timeout::PollTimeout,
Event, InternalEvent,
};

// Tokens to identify file descriptor
const TTY_TOKEN: Token = Token(0);
const SIGNAL_TOKEN: Token = Token(1);
#[cfg(feature = "event-stream")]
const WAKE_TOKEN: Token = Token(2);

// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
// is enough.
const TTY_BUFFER_SIZE: usize = 1_024;

pub(crate) struct UnixInternalEventSource {
poll: Poll,
events: Events,
parser: Parser,
tty_buffer: [u8; TTY_BUFFER_SIZE],
tty_fd: FileDesc,
signals: Signals,
#[cfg(feature = "event-stream")]
waker: Waker,
}

impl UnixInternalEventSource {
pub fn new() -> Result<Self> {
UnixInternalEventSource::from_file_descriptor(tty_fd()?)
}

pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> {
let poll = Poll::new()?;
let registry = poll.registry();

let tty_raw_fd = input_fd.raw_fd();
let mut tty_ev = SourceFd(&tty_raw_fd);
registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;

let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?;
registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;

#[cfg(feature = "event-stream")]
let waker = Waker::new(registry, WAKE_TOKEN)?;

Ok(UnixInternalEventSource {
poll,
events: Events::with_capacity(3),
parser: Parser::default(),
tty_buffer: [0u8; TTY_BUFFER_SIZE],
tty_fd: input_fd,
signals,
#[cfg(feature = "event-stream")]
waker,
})
}
}

impl EventSource for UnixInternalEventSource {
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
if let Some(event) = self.parser.next() {
return Ok(Some(event));
}

let timeout = PollTimeout::new(timeout);

loop {
if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
// Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
// Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
// https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
if e.kind() == io::ErrorKind::Interrupted {
continue;
} else {
return Err(e);
}
};

if self.events.is_empty() {
// No readiness events = timeout
return Ok(None);
}

for token in self.events.iter().map(|x| x.token()) {
match token {
TTY_TOKEN => {
loop {
match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
Ok(read_count) => {
if read_count > 0 {
self.parser.advance(
&self.tty_buffer[..read_count],
read_count == TTY_BUFFER_SIZE,
);
}
}
Err(e) => {
// No more data to read at the moment. We will receive another event
if e.kind() == io::ErrorKind::WouldBlock {
break;
}
// once more data is available to read.
else if e.kind() == io::ErrorKind::Interrupted {
continue;
}
}
};

if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
}
}
SIGNAL_TOKEN => {
for signal in self.signals.pending() {
match signal {
signal_hook::consts::SIGWINCH => {
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}
_ => unreachable!("Synchronize signal registration & handling"),
};
}
}
#[cfg(feature = "event-stream")]
WAKE_TOKEN => {
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
));
}
_ => unreachable!("Synchronize Evented handle registration & token handling"),
}
}

// Processing above can take some time, check if timeout expired
if timeout.elapsed() {
return Ok(None);
}
}
}

#[cfg(feature = "event-stream")]
fn waker(&self) -> Waker {
self.waker.clone()
}
}

//
// Following `Parser` structure exists for two reasons:
//
// * mimic anes Parser interface
// * move the advancing, parsing, ... stuff out of the `try_read` method
//
#[derive(Debug)]
struct Parser {
buffer: Vec<u8>,
internal_events: VecDeque<InternalEvent>,
}

impl Default for Parser {
fn default() -> Self {
Parser {
// This buffer is used for -> 1 <- ANSI escape sequence. Are we
// aware of any ANSI escape sequence that is bigger? Can we make
// it smaller?
//
// Probably not worth spending more time on this as "there's a plan"
// to use the anes crate parser.
buffer: Vec::with_capacity(256),
// TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
// fit? What is an average sequence length? Let's guess here
// and say that the average ANSI escape sequence length is 8 bytes. Thus
// the buffer size should be 1024/8=128 to avoid additional allocations
// when processing large amounts of data.
//
// There's no need to make it bigger, because when you look at the `try_read`
// method implementation, all events are consumed before the next TTY_BUFFER
// is processed -> events pushed.
internal_events: VecDeque::with_capacity(128),
}
}
}

impl Parser {
fn advance(&mut self, buffer: &[u8], more: bool) {
for (idx, byte) in buffer.iter().enumerate() {
let more = idx + 1 < buffer.len() || more;

self.buffer.push(*byte);

match parse_event(&self.buffer, more) {
Ok(Some(ie)) => {
self.internal_events.push_back(ie);
self.buffer.clear();
}
Ok(None) => {
// Event can't be parsed, because we don't have enough bytes for
// the current sequence. Keep the buffer and process next bytes.
}
Err(_) => {
// Event can't be parsed (not enough parameters, parameter is not a number, ...).
// Clear the buffer and continue with another sequence.
self.buffer.clear();
}
}
}
}
}

impl Iterator for Parser {
type Item = InternalEvent;

fn next(&mut self) -> Option<Self::Item> {
self.internal_events.pop_front()
}
}
#[cfg(not(feature = "use-dev-tty"))]
pub(crate) use self::mio::UnixInternalEventSource;