Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/tokio-rs/tokio into sem-t…
Browse files Browse the repository at this point in the history
…oken-bucket
  • Loading branch information
maminrayej committed Sep 8, 2023
2 parents 8edc422 + a6be73e commit 54657a3
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 5 deletions.
5 changes: 3 additions & 2 deletions README.md
Expand Up @@ -218,16 +218,17 @@ releases are:

* `1.20.x` - LTS release until September 2023. (MSRV 1.49)
* `1.25.x` - LTS release until March 2024. (MSRV 1.49)
* `1.32.x` - LTS release until September 2024 (MSRV 1.63)

Each LTS release will continue to receive backported fixes for at least a year.
If you wish to use a fixed minor release in your project, we recommend that you
use an LTS release.

To use a fixed minor version, you can specify the version with a tilde. For
example, to specify that you wish to use the newest `1.18.x` patch release, you
example, to specify that you wish to use the newest `1.25.x` patch release, you
can use the following dependency specification:
```text
tokio = { version = "~1.18", features = [...] }
tokio = { version = "~1.25", features = [...] }
```

### Previous LTS releases
Expand Down
2 changes: 2 additions & 0 deletions tokio-util/src/codec/lines_codec.rs
Expand Up @@ -6,6 +6,8 @@ use std::{cmp, fmt, io, str, usize};

/// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines.
///
/// This uses the `\n` character as the line ending on all platforms.
///
/// [`Decoder`]: crate::codec::Decoder
/// [`Encoder`]: crate::codec::Encoder
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
Expand Down
5 changes: 3 additions & 2 deletions tokio/README.md
Expand Up @@ -218,16 +218,17 @@ releases are:

* `1.20.x` - LTS release until September 2023. (MSRV 1.49)
* `1.25.x` - LTS release until March 2024. (MSRV 1.49)
* `1.32.x` - LTS release until September 2024 (MSRV 1.63)

Each LTS release will continue to receive backported fixes for at least a year.
If you wish to use a fixed minor release in your project, we recommend that you
use an LTS release.

To use a fixed minor version, you can specify the version with a tilde. For
example, to specify that you wish to use the newest `1.18.x` patch release, you
example, to specify that you wish to use the newest `1.25.x` patch release, you
can use the following dependency specification:
```text
tokio = { version = "~1.18", features = [...] }
tokio = { version = "~1.25", features = [...] }
```

### Previous LTS releases
Expand Down
75 changes: 75 additions & 0 deletions tokio/src/io/util/mem.rs
Expand Up @@ -124,6 +124,18 @@ impl AsyncWrite for DuplexStream {
Pin::new(&mut *self.write.lock()).poll_write(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
Pin::new(&mut *self.write.lock()).poll_write_vectored(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
true
}

#[allow(unused_mut)]
fn poll_flush(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -224,6 +236,37 @@ impl Pipe {
}
Poll::Ready(Ok(len))
}

fn poll_write_vectored_internal(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
if self.is_closed {
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
}
let avail = self.max_buf_size - self.buffer.len();
if avail == 0 {
self.write_waker = Some(cx.waker().clone());
return Poll::Pending;
}

let mut rem = avail;
for buf in bufs {
if rem == 0 {
break;
}

let len = buf.len().min(rem);
self.buffer.extend_from_slice(&buf[..len]);
rem -= len;
}

if let Some(waker) = self.read_waker.take() {
waker.wake();
}
Poll::Ready(Ok(avail - rem))
}
}

impl AsyncRead for Pipe {
Expand Down Expand Up @@ -285,6 +328,38 @@ impl AsyncWrite for Pipe {
}
}

cfg_coop! {
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = self.poll_write_vectored_internal(cx, bufs);
if ret.is_ready() {
coop.made_progress();
}
ret
}
}

cfg_not_coop! {
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
ready!(crate::trace::trace_leaf(cx));
self.poll_write_vectored_internal(cx, bufs)
}
}

fn is_write_vectored(&self) -> bool {
true
}

fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/process/mod.rs
Expand Up @@ -1166,6 +1166,10 @@ impl Child {
/// If the caller wishes to explicitly control when the child's stdin
/// handle is closed, they may `.take()` it before calling `.wait()`:
///
/// # Cancel safety
///
/// This function is cancel safe.
///
/// ```
/// # #[cfg(not(unix))]fn main(){}
/// # #[cfg(unix)]
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -474,7 +474,7 @@ impl RuntimeMetrics {
///
/// This metric only applies to the **multi-threaded** scheduler.
///
/// The worker steal count starts at zero when the runtime is created and
/// The worker overflow count starts at zero when the runtime is created and
/// increases by one each time the worker attempts to schedule a task
/// locally, but its local queue is full. When this happens, half of the
/// local queue is moved to the injection queue.
Expand Down
1 change: 1 addition & 0 deletions tokio/src/task/local.rs
Expand Up @@ -575,6 +575,7 @@ impl LocalSet {
run_until.await
}

#[track_caller]
pub(in crate::task) fn spawn_named<F>(
&self,
future: F,
Expand Down
47 changes: 47 additions & 0 deletions tokio/tests/duplex_stream.rs
@@ -0,0 +1,47 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use std::io::IoSlice;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

const HELLO: &[u8] = b"hello world...";

#[tokio::test]
async fn write_vectored() {
let (mut client, mut server) = tokio::io::duplex(64);

let ret = client
.write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)])
.await
.unwrap();
assert_eq!(ret, HELLO.len() * 2);

client.flush().await.unwrap();
drop(client);

let mut buf = Vec::with_capacity(HELLO.len() * 2);
let bytes_read = server.read_to_end(&mut buf).await.unwrap();

assert_eq!(bytes_read, HELLO.len() * 2);
assert_eq!(buf, [HELLO, HELLO].concat());
}

#[tokio::test]
async fn write_vectored_and_shutdown() {
let (mut client, mut server) = tokio::io::duplex(64);

let ret = client
.write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)])
.await
.unwrap();
assert_eq!(ret, HELLO.len() * 2);

client.shutdown().await.unwrap();
drop(client);

let mut buf = Vec::with_capacity(HELLO.len() * 2);
let bytes_read = server.read_to_end(&mut buf).await.unwrap();

assert_eq!(bytes_read, HELLO.len() * 2);
assert_eq!(buf, [HELLO, HELLO].concat());
}

0 comments on commit 54657a3

Please sign in to comment.