Skip to content

Commit

Permalink
add use-dev-tty feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
yyogo committed Jan 9, 2023
1 parent a1373d7 commit be80323
Show file tree
Hide file tree
Showing 7 changed files with 599 additions and 297 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Expand Up @@ -29,6 +29,7 @@ all-features = true
default = ["bracketed-paste"]
bracketed-paste = []
event-stream = ["futures-core"]
use-dev-tty = ["filedescriptor"]

#
# Shared dependencies
Expand Down Expand Up @@ -57,7 +58,9 @@ crossterm_winapi = "0.9"
[target.'cfg(unix)'.dependencies]
libc = "0.2"
signal-hook = { version = "0.3.13" }
filedescriptor = "0.8"
filedescriptor = { version = "0.8", optional = true }
mio = { version = "0.8", features = ["os-poll"] }
signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] }

#
# Dev dependencies (examples, ...)
Expand Down
269 changes: 8 additions & 261 deletions src/event/source/unix.rs
@@ -1,264 +1,11 @@
use std::os::unix::prelude::AsRawFd;
use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration};
#[cfg(feature = "use-dev-tty")]
pub(crate) mod tty;

use signal_hook::low_level::pipe;
#[cfg(not(feature = "use-dev-tty"))]
pub(crate) mod mio;

use crate::event::timeout::PollTimeout;
use crate::event::Event;
use crate::Result;
#[cfg(feature = "use-dev-tty")]
pub(crate) use self::tty::UnixInternalEventSource;

use filedescriptor::{poll, pollfd, POLLIN};

#[cfg(feature = "event-stream")]
use super::super::sys::Waker;

use super::{
super::{
sys::unix::{
file_descriptor::{tty_fd, FileDesc},
parse::parse_event,
},
InternalEvent,
},
EventSource,
};

/// Holds a prototypical Waker and a receiver we can wait on when doing select().
#[cfg(feature = "event-stream")]
struct WakePipe {
receiver: UnixStream,
waker: Waker,
}

#[cfg(feature = "event-stream")]
impl WakePipe {
fn new() -> Result<Self> {
let (receiver, sender) = nonblocking_unix_pair()?;
Ok(WakePipe {
receiver,
waker: Waker::new(sender),
})
}
}

// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
// is enough.
const TTY_BUFFER_SIZE: usize = 1_024;

pub(crate) struct UnixInternalEventSource {
parser: Parser,
tty_buffer: [u8; TTY_BUFFER_SIZE],
tty: FileDesc,
winch_signal_receiver: UnixStream,
#[cfg(feature = "event-stream")]
wake_pipe: WakePipe,
}

fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> {
let (receiver, sender) = UnixStream::pair()?;
receiver.set_nonblocking(true)?;
sender.set_nonblocking(true)?;
Ok((receiver, sender))
}

impl UnixInternalEventSource {
pub fn new() -> Result<Self> {
UnixInternalEventSource::from_file_descriptor(tty_fd()?)
}

pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> {
Ok(UnixInternalEventSource {
parser: Parser::default(),
tty_buffer: [0u8; TTY_BUFFER_SIZE],
tty: input_fd,
winch_signal_receiver: {
let (receiver, sender) = nonblocking_unix_pair()?;
// Unregistering is unnecessary because EventSource is a singleton
pipe::register(libc::SIGWINCH, sender)?;
receiver
},
#[cfg(feature = "event-stream")]
wake_pipe: WakePipe::new()?,
})
}
}

/// read_complete reads from a non-blocking file descriptor
/// until the buffer is full or it would block.
///
/// Similar to `std::io::Read::read_to_end`, except this function
/// only fills the given buffer and does not read beyond that.
fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result<usize> {
loop {
match fd.read(buf, buf.len()) {
Ok(x) => return Ok(x),
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => return Ok(0),
io::ErrorKind::Interrupted => continue,
_ => return Err(e),
},
}
}
}

impl EventSource for UnixInternalEventSource {
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
let timeout = PollTimeout::new(timeout);

fn make_pollfd<F: AsRawFd>(fd: &F) -> pollfd {
pollfd {
fd: fd.as_raw_fd(),
events: POLLIN,
revents: 0,
}
}

#[cfg(not(feature = "event-stream"))]
let mut fds = [
make_pollfd(&self.tty),
make_pollfd(&self.winch_signal_receiver),
];

#[cfg(feature = "event-stream")]
let mut fds = [
make_pollfd(&self.tty),
make_pollfd(&self.winch_signal_receiver),
make_pollfd(&self.wake_pipe.receiver),
];

while timeout.leftover().map_or(true, |t| !t.is_zero()) {
// check if there are buffered events from the last read
if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
match poll(&mut fds, timeout.leftover()) {
Err(filedescriptor::Error::Io(e)) => return Err(e),
res => res.expect("polling tty"),
};
if fds[0].revents & POLLIN != 0 {
loop {
let read_count = read_complete(&self.tty, &mut self.tty_buffer)?;
if read_count > 0 {
self.parser.advance(
&self.tty_buffer[..read_count],
read_count == TTY_BUFFER_SIZE,
);
}

if let Some(event) = self.parser.next() {
return Ok(Some(event));
}

if read_count == 0 {
break;
}
}
}
if fds[1].revents & POLLIN != 0 {
let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false);
// drain the pipe
while read_complete(&fd, &mut [0; 1024])? != 0 {}
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}

#[cfg(feature = "event-stream")]
if fds[2].revents & POLLIN != 0 {
let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false);
// drain the pipe
while read_complete(&fd, &mut [0; 1024])? != 0 {}

return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
));
}
}
Ok(None)
}

#[cfg(feature = "event-stream")]
fn waker(&self) -> Waker {
self.wake_pipe.waker.clone()
}
}

//
// Following `Parser` structure exists for two reasons:
//
// * mimic anes Parser interface
// * move the advancing, parsing, ... stuff out of the `try_read` method
//
#[derive(Debug)]
struct Parser {
buffer: Vec<u8>,
internal_events: VecDeque<InternalEvent>,
}

impl Default for Parser {
fn default() -> Self {
Parser {
// This buffer is used for -> 1 <- ANSI escape sequence. Are we
// aware of any ANSI escape sequence that is bigger? Can we make
// it smaller?
//
// Probably not worth spending more time on this as "there's a plan"
// to use the anes crate parser.
buffer: Vec::with_capacity(256),
// TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
// fit? What is an average sequence length? Let's guess here
// and say that the average ANSI escape sequence length is 8 bytes. Thus
// the buffer size should be 1024/8=128 to avoid additional allocations
// when processing large amounts of data.
//
// There's no need to make it bigger, because when you look at the `try_read`
// method implementation, all events are consumed before the next TTY_BUFFER
// is processed -> events pushed.
internal_events: VecDeque::with_capacity(128),
}
}
}

impl Parser {
fn advance(&mut self, buffer: &[u8], more: bool) {
for (idx, byte) in buffer.iter().enumerate() {
let more = idx + 1 < buffer.len() || more;

self.buffer.push(*byte);

match parse_event(&self.buffer, more) {
Ok(Some(ie)) => {
self.internal_events.push_back(ie);
self.buffer.clear();
}
Ok(None) => {
// Event can't be parsed, because we don't have enough bytes for
// the current sequence. Keep the buffer and process next bytes.
}
Err(_) => {
// Event can't be parsed (not enough parameters, parameter is not a number, ...).
// Clear the buffer and continue with another sequence.
self.buffer.clear();
}
}
}
}
}

impl Iterator for Parser {
type Item = InternalEvent;

fn next(&mut self) -> Option<Self::Item> {
self.internal_events.pop_front()
}
}
#[cfg(not(feature = "use-dev-tty"))]
pub(crate) use self::mio::UnixInternalEventSource;

0 comments on commit be80323

Please sign in to comment.