Skip to content

Commit

Permalink
process: use blocking threadpool for child stdio I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
ipetkov committed Jul 11, 2022
1 parent 5288e1e commit 9f03276
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 60 deletions.
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Expand Up @@ -69,11 +69,11 @@ process = [
"mio/net",
"signal-hook-registry",
"winapi/handleapi",
"winapi/minwindef",
"winapi/processthreadsapi",
"winapi/threadpoollegacyapiset",
"winapi/winbase",
"winapi/winnt",
"winapi/minwindef",
]
# Includes basic task execution capabilities
rt = ["once_cell"]
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/io/blocking.rs
Expand Up @@ -34,8 +34,9 @@ enum State<T> {
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
}

cfg_io_std! {
cfg_io_blocking! {
impl<T> Blocking<T> {
#[cfg_attr(feature = "fs", allow(dead_code))]
pub(crate) fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/poll_evented.rs
Expand Up @@ -136,7 +136,7 @@ impl<E: Source> PollEvented<E> {
}

feature! {
#![any(feature = "net", feature = "process")]
#![any(feature = "net", all(unix, feature = "process"))]

use crate::io::ReadBuf;
use std::task::{Context, Poll};
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/lib.rs
Expand Up @@ -431,7 +431,12 @@ cfg_process! {
pub mod process;
}

#[cfg(any(feature = "net", feature = "fs", feature = "io-std"))]
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
all(windows, feature = "process"),
))]
mod blocking;

cfg_rt! {
Expand Down
14 changes: 9 additions & 5 deletions tokio/src/macros/cfg.rs
Expand Up @@ -70,7 +70,11 @@ macro_rules! cfg_fs {

macro_rules! cfg_io_blocking {
($($item:item)*) => {
$( #[cfg(any(feature = "io-std", feature = "fs"))] $item )*
$( #[cfg(any(
feature = "io-std",
feature = "fs",
all(windows, feature = "process"),
))] $item )*
}
}

Expand All @@ -79,12 +83,12 @@ macro_rules! cfg_io_driver {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
#[cfg_attr(docsrs, doc(cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))))]
$item
Expand All @@ -97,7 +101,7 @@ macro_rules! cfg_io_driver_impl {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
$item
Expand All @@ -110,7 +114,7 @@ macro_rules! cfg_not_io_driver {
$(
#[cfg(not(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
)))]
$item
Expand Down
68 changes: 38 additions & 30 deletions tokio/src/process/mod.rs
Expand Up @@ -245,6 +245,7 @@ mod kill;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::process::kill::Kill;

use pin_project_lite::pin_project;
use std::convert::TryInto;
use std::ffi::OsStr;
use std::future::Future;
Expand Down Expand Up @@ -1211,31 +1212,40 @@ impl Child {
}
}

/// The standard input stream for spawned children.
///
/// This type implements the `AsyncWrite` trait to pass data to the stdin handle of
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdin {
inner: imp::ChildStdio,
pin_project! {
/// The standard input stream for spawned children.
///
/// This type implements the `AsyncWrite` trait to pass data to the stdin handle of
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdin {
#[pin]
inner: imp::ChildStdio,
}
}

/// The standard output stream for spawned children.
///
/// This type implements the `AsyncRead` trait to read data from the stdout
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdout {
inner: imp::ChildStdio,
pin_project! {
/// The standard output stream for spawned children.
///
/// This type implements the `AsyncRead` trait to read data from the stdout
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdout {
#[pin]
inner: imp::ChildStdio,
}
}

/// The standard error stream for spawned children.
///
/// This type implements the `AsyncRead` trait to read data from the stderr
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStderr {
inner: imp::ChildStdio,
pin_project! {
/// The standard error stream for spawned children.
///
/// This type implements the `AsyncRead` trait to read data from the stderr
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStderr {
#[pin]
inner: imp::ChildStdio,
}
}

impl ChildStdin {
Expand Down Expand Up @@ -1289,15 +1299,15 @@ impl AsyncWrite for ChildStdin {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
self.project().inner.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}
}

Expand All @@ -1307,8 +1317,7 @@ impl AsyncRead for ChildStdout {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
self.project().inner.poll_read(cx, buf)
}
}

Expand All @@ -1318,8 +1327,7 @@ impl AsyncRead for ChildStderr {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
self.project().inner.poll_read(cx, buf)
}
}

Expand Down
55 changes: 49 additions & 6 deletions tokio/src/process/unix/mod.rs
Expand Up @@ -29,7 +29,7 @@ use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;

use crate::io::PollEvented;
use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::signal::unix::driver::Handle as SignalHandle;
Expand Down Expand Up @@ -177,8 +177,8 @@ impl AsRawFd for Pipe {
}
}

pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
let mut fd = io.into_inner()?.fd;
pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> {
let mut fd = io.inner.into_inner()?.fd;

// Ensure that the fd to be inherited is set to *blocking* mode, as this
// is the default that virtually all programs expect to have. Those
Expand Down Expand Up @@ -213,7 +213,50 @@ impl Source for Pipe {
}
}

pub(crate) type ChildStdio = PollEvented<Pipe>;
pub(crate) struct ChildStdio {
inner: PollEvented<Pipe>,
}

impl fmt::Debug for ChildStdio {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(fmt)
}
}

impl AsRawFd for ChildStdio {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}

impl AsyncWrite for ChildStdio {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsyncRead for ChildStdio {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
}
}

fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
Expand All @@ -238,13 +281,13 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>
Ok(())
}

pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawFd,
{
// Set the fd to nonblocking before we pass it to the event loop
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;

PollEvented::new(pipe)
PollEvented::new(pipe).map(|inner| ChildStdio { inner })
}

0 comments on commit 9f03276

Please sign in to comment.