Skip to content

Commit

Permalink
WiP
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Zak <richard@profian.com>
  • Loading branch information
rjzak committed May 24, 2022
1 parent 4ec6ba8 commit 2d90ef5
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 18 deletions.
8 changes: 7 additions & 1 deletion .cargo/config
@@ -1,2 +1,8 @@
# [build]
# rustflags = ["--cfg", "tokio_unstable"]
# rustflags = ["--cfg", "tokio_unstable"]

[build]
target = "wasm32-wasi"

[target.wasm32-wasi]
runner = ["enarx", "run"]
1 change: 1 addition & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -29,6 +29,7 @@ cfg_codec! {
}

cfg_net! {
#[cfg(not(target_arch = "wasm32"))]
pub mod udp;
pub mod net;
}
Expand Down
9 changes: 6 additions & 3 deletions tokio/Cargo.toml
Expand Up @@ -51,7 +51,6 @@ net = [
"mio/os-poll",
"mio/os-ext",
"mio/net",
"socket2",
"winapi/namedpipeapi",
]
process = [
Expand Down Expand Up @@ -97,11 +96,15 @@ pin-project-lite = "0.2.0"
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.8.1", optional = true }
socket2 = { version = "0.4.4", optional = true, features = [ "all" ] }
#mio = { version = "0.8.1", optional = true }
mio = { path = "../../mio", optional = true }
#socket2 = { version = "0.4.4", optional = true, features = [ "all" ] }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.12.0", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
socket2 = { version = "0.4.4", features = [ "all" ] }

# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_unstable)'.dependencies]
Expand Down
13 changes: 12 additions & 1 deletion tokio/src/io/driver/mod.rs
Expand Up @@ -71,12 +71,16 @@ pub(super) struct Inner {
resources: Mutex<Option<Slab<ScheduledIo>>>,

/// Registers I/O resources.
#[cfg(not(target_arch = "wasm32"))]
registry: mio::Registry,
#[cfg(target_arch = "wasm32")]
registry: Arc<mio::Registry>,

/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,

/// Used to wake up the reactor from a call to `turn`.
#[cfg(not(target_os = "wasi"))]
waker: mio::Waker,

metrics: IoDriverMetrics,
Expand Down Expand Up @@ -120,8 +124,13 @@ impl Driver {
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
let poll = mio::Poll::new()?;
#[cfg(not(target_os = "wasi"))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;

#[cfg(not(target_os = "wasi"))]
let registry = poll.registry().try_clone()?;
#[cfg(target_os = "wasi")]
let registry = poll.registry();

let slab = Slab::new();
let allocator = slab.allocator();
Expand All @@ -133,8 +142,9 @@ impl Driver {
resources: Some(slab),
inner: Arc::new(Inner {
resources: Mutex::new(None),
registry,
registry: registry,
io_dispatch: allocator,
#[cfg(not(target_os = "wasi"))]
waker,
metrics: IoDriverMetrics::default(),
}),
Expand Down Expand Up @@ -318,6 +328,7 @@ impl Handle {
/// blocked in `turn`, then the next call to `turn` will not block and
/// return immediately.
fn wakeup(&self) {
#[cfg(not(target_os = "wasi"))]
if let Some(inner) = self.inner() {
inner.waker.wake().expect("failed to wake I/O driver");
}
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/lib.rs
Expand Up @@ -409,7 +409,9 @@ mod future;
pub mod io;
pub mod net;

#[cfg(not(target_arch = "wasm32-wasi"))]
mod loom;
#[cfg(not(target_arch = "wasm32-wasi"))]
mod park;

cfg_process! {
Expand Down
24 changes: 20 additions & 4 deletions tokio/src/macros/cfg.rs
Expand Up @@ -61,6 +61,7 @@ macro_rules! cfg_fs {
($($item:item)*) => {
$(
#[cfg(feature = "fs")]
#[cfg(not(target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
$item
)*
Expand All @@ -86,6 +87,7 @@ macro_rules! cfg_io_driver {
feature = "process",
all(unix, feature = "signal"),
))))]
//#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand All @@ -99,6 +101,7 @@ macro_rules! cfg_io_driver_impl {
feature = "process",
all(unix, feature = "signal"),
))]
//#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand All @@ -112,6 +115,7 @@ macro_rules! cfg_not_io_driver {
feature = "process",
all(unix, feature = "signal"),
)))]
//#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand Down Expand Up @@ -154,13 +158,21 @@ macro_rules! cfg_not_io_util {

macro_rules! cfg_loom {
($($item:item)*) => {
$( #[cfg(loom)] $item )*
$(
#[cfg(loom)]
#[cfg(not(target_os = "wasi"))]
$item
)*
}
}

macro_rules! cfg_not_loom {
($($item:item)*) => {
$( #[cfg(not(loom))] $item )*
$(
#[cfg(not(loom))]
#[cfg(target_os = "wasi")]
$item
)*
}
}

Expand Down Expand Up @@ -247,6 +259,7 @@ macro_rules! cfg_process {
#[cfg(feature = "process")]
#[cfg_attr(docsrs, doc(cfg(feature = "process")))]
#[cfg(not(loom))]
#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand Down Expand Up @@ -275,6 +288,7 @@ macro_rules! cfg_signal {
#[cfg(feature = "signal")]
#[cfg_attr(docsrs, doc(cfg(feature = "signal")))]
#[cfg(not(loom))]
#[cfg(not(target_arch = "wasm32"))]
$item
)*
}
Expand Down Expand Up @@ -451,7 +465,8 @@ macro_rules! cfg_has_atomic_u64 {
target_arch = "arm",
target_arch = "mips",
target_arch = "powerpc",
target_arch = "riscv32"
target_arch = "riscv32",
target_arch = "wasm32"
)))]
$item
)*
Expand All @@ -465,7 +480,8 @@ macro_rules! cfg_not_has_atomic_u64 {
target_arch = "arm",
target_arch = "mips",
target_arch = "powerpc",
target_arch = "riscv32"
target_arch = "riscv32",
target_arch = "wasm32"
))]
$item
)*
Expand Down
19 changes: 11 additions & 8 deletions tokio/src/net/mod.rs
Expand Up @@ -33,20 +33,23 @@ cfg_net! {

pub mod tcp;
pub use tcp::listener::TcpListener;
pub use tcp::socket::TcpSocket;
pub use tcp::stream::TcpStream;

mod udp;
pub use udp::UdpSocket;
}

cfg_net_unix! {
pub mod unix;
pub use unix::datagram::socket::UnixDatagram;
pub use unix::listener::UnixListener;
pub use unix::stream::UnixStream;
pub mod unix;
pub use unix::datagram::socket::UnixDatagram;
pub use unix::listener::UnixListener;
pub use unix::stream::UnixStream;
pub use tcp::socket::TcpSocket;
mod udp;
pub use udp::UdpSocket;
}

cfg_net_windows! {
pub mod windows;
pub mod windows;
pub use tcp::socket::TcpSocket;
mod udp;
pub use udp::UdpSocket;
}
26 changes: 25 additions & 1 deletion tokio/src/net/tcp/listener.rs
Expand Up @@ -2,10 +2,12 @@ use crate::io::{Interest, PollEvented};
use crate::net::tcp::TcpStream;
use crate::net::{to_socket_addrs, ToSocketAddrs};

use std::convert::TryFrom;
use std::convert::{Infallible, TryFrom};
use std::fmt;
use std::io;
use std::io::Error;
use std::net::{self, SocketAddr};
//use std::ops::FromResidual;
use std::task::{Context, Poll};

cfg_net! {
Expand Down Expand Up @@ -94,6 +96,7 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
#[cfg(not(target_os = "wasi"))]
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
let addrs = to_socket_addrs(addr).await?;

Expand All @@ -114,6 +117,7 @@ impl TcpListener {
}))
}

#[cfg(not(target_os = "wasi"))]
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
let listener = mio::net::TcpListener::bind(addr)?;
TcpListener::new(listener)
Expand Down Expand Up @@ -249,6 +253,7 @@ impl TcpListener {
/// [`tokio::net::TcpListener`]: TcpListener
/// [`std::net::TcpListener`]: std::net::TcpListener
/// [`set_nonblocking`]: fn@std::net::TcpListener::set_nonblocking
#[cfg(not(target_os = "wasi"))]
pub fn into_std(self) -> io::Result<std::net::TcpListener> {
#[cfg(unix)]
{
Expand Down Expand Up @@ -297,6 +302,7 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr()
}
Expand Down Expand Up @@ -372,6 +378,12 @@ impl fmt::Debug for TcpListener {
}
}

/*impl FromResidual<Result<Infallible, std::io::Error>> for TcpListener {
fn from_residual(residual: Result<Infallible, Error>) -> Self {
todo!()
}
}*/

#[cfg(unix)]
mod sys {
use super::TcpListener;
Expand All @@ -384,6 +396,18 @@ mod sys {
}
}

#[cfg(target_os = "wasi")]
mod sys {
use super::TcpListener;
use std::os::wasi::prelude::*;

impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
}

#[cfg(windows)]
mod sys {
use super::TcpListener;
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/net/tcp/socket.rs
@@ -1,3 +1,5 @@
#![cfg(not(target_os = "wasi"))]

use crate::net::{TcpListener, TcpStream};

use std::fmt;
Expand Down
13 changes: 13 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -108,6 +108,7 @@ impl TcpStream {
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
#[cfg(not(target_os = "wasi"))]
pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
let addrs = to_socket_addrs(addr).await?;

Expand All @@ -129,6 +130,7 @@ impl TcpStream {
}

/// Establishes a connection to the specified `addr`.
#[cfg(not(target_os = "wasi"))]
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(addr)?;
TcpStream::connect_mio(sys).await
Expand Down Expand Up @@ -226,6 +228,7 @@ impl TcpStream {
/// [`tokio::net::TcpStream`]: TcpStream
/// [`std::net::TcpStream`]: std::net::TcpStream
/// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
#[cfg(not(target_os = "wasi"))]
pub fn into_std(self) -> io::Result<std::net::TcpStream> {
#[cfg(unix)]
{
Expand Down Expand Up @@ -1091,9 +1094,14 @@ impl TcpStream {
/// # Ok(())
/// # }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn linger(&self) -> io::Result<Option<Duration>> {
socket2::SockRef::from(self).linger()
}
#[cfg(target_os = "wasi")]
pub fn linger(&self) -> io::Result<Option<Duration>> {
Err(io::ErrorKind::Unsupported.into())
}

/// Sets the linger duration of this socket by setting the SO_LINGER option.
///
Expand All @@ -1116,9 +1124,14 @@ impl TcpStream {
/// # Ok(())
/// # }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
socket2::SockRef::from(self).set_linger(dur)
}
#[cfg(target_os = "wasi")]
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
Ok(())
}

/// Gets the value of the `IP_TTL` option for this socket.
///
Expand Down

0 comments on commit 2d90ef5

Please sign in to comment.