Skip to content

Commit

Permalink
io: add tokio_util::io::{CopyToBytes, SinkWriter} (#5070)
Browse files Browse the repository at this point in the history
Co-authored-by: Alice Ryhl <aliceryhl@google.com>
Co-authored-by: Simon Farnsworth <simon@farnz.org.uk>
  • Loading branch information
3 people committed Oct 30, 2022
1 parent 29c6de0 commit 620880f
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 0 deletions.
68 changes: 68 additions & 0 deletions tokio-util/src/io/copy_to_bytes.rs
@@ -0,0 +1,68 @@
use bytes::Bytes;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
/// A helper that wraps a [`Sink`]`<`[`Bytes`]`>` and converts it into a
/// [`Sink`]`<&'a [u8]>` by copying each byte slice into an owned [`Bytes`].
///
/// See the documentation for [`SinkWriter`] for an example.
///
/// [`Bytes`]: bytes::Bytes
/// [`SinkWriter`]: crate::io::SinkWriter
/// [`Sink`]: futures_sink::Sink
#[derive(Debug)]
pub struct CopyToBytes<S> {
#[pin]
inner: S,
}
}

impl<S> CopyToBytes<S> {
/// Creates a new [`CopyToBytes`].
pub fn new(inner: S) -> Self {
Self { inner }
}

/// Gets a reference to the underlying sink.
pub fn get_ref(&self) -> &S {
&self.inner
}

/// Gets a mutable reference to the underlying sink.
pub fn get_mut(&mut self) -> &mut S {
&mut self.inner
}

/// Consumes this [`CopyToBytes`], returning the underlying sink.
pub fn into_inner(self) -> S {
self.inner
}
}

impl<'a, S> Sink<&'a [u8]> for CopyToBytes<S>
where
S: Sink<Bytes>,
{
type Error = S::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: &'a [u8]) -> Result<(), Self::Error> {
self.project()
.inner
.start_send(Bytes::copy_from_slice(item))
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx)
}
}
4 changes: 4 additions & 0 deletions tokio-util/src/io/mod.rs
Expand Up @@ -10,18 +10,22 @@
//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html
//! [`AsyncRead`]: tokio::io::AsyncRead

mod copy_to_bytes;
mod inspect;
mod read_buf;
mod reader_stream;
mod sink_writer;
mod stream_reader;

cfg_io_util! {
mod sync_bridge;
pub use self::sync_bridge::SyncIoBridge;
}

pub use self::copy_to_bytes::CopyToBytes;
pub use self::inspect::{InspectReader, InspectWriter};
pub use self::read_buf::read_buf;
pub use self::reader_stream::ReaderStream;
pub use self::sink_writer::SinkWriter;
pub use self::stream_reader::StreamReader;
pub use crate::util::{poll_read_buf, poll_write_buf};
124 changes: 124 additions & 0 deletions tokio-util/src/io/sink_writer.rs
@@ -0,0 +1,124 @@
use futures_sink::Sink;

use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;

pin_project! {
/// Convert a [`Sink`] of byte chunks into an [`AsyncWrite`].
///
/// Whenever you write to this [`SinkWriter`], the supplied bytes are
/// forwarded to the inner [`Sink`]. When `shutdown` is called on this
/// [`SinkWriter`], the inner sink is closed.
///
/// This adapter takes a `Sink<&[u8]>` and provides an [`AsyncWrite`] impl
/// for it. Because of the lifetime, this trait is relatively rarely
/// implemented. The main ways to get a `Sink<&[u8]>` that you can use with
/// this type are:
///
/// * With the codec module by implementing the [`Encoder`]`<&[u8]>` trait.
/// * By wrapping a `Sink<Bytes>` in a [`CopyToBytes`].
/// * Manually implementing `Sink<&[u8]>` directly.
///
/// The opposite conversion of implementing `Sink<_>` for an [`AsyncWrite`]
/// is done using the [`codec`] module.
///
/// # Example
///
/// ```
/// use bytes::Bytes;
/// use futures_util::SinkExt;
/// use std::io::{Error, ErrorKind};
/// use tokio::io::AsyncWriteExt;
/// use tokio_util::io::{SinkWriter, CopyToBytes};
/// use tokio_util::sync::PollSender;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), Error> {
/// // We use an mpsc channel as an example of a `Sink<Bytes>`.
/// let (tx, mut rx) = tokio::sync::mpsc::channel::<Bytes>(1);
/// let sink = PollSender::new(tx).sink_map_err(|_| Error::from(ErrorKind::BrokenPipe));
///
/// // Wrap it in `CopyToBytes` to get a `Sink<&[u8]>`.
/// let mut writer = SinkWriter::new(CopyToBytes::new(sink));
///
/// // Write data to our interface...
/// let data: [u8; 4] = [1, 2, 3, 4];
/// let _ = writer.write(&data).await?;
///
/// // ... and receive it.
/// assert_eq!(data.as_slice(), &*rx.recv().await.unwrap());
/// # Ok(())
/// # }
/// ```
///
/// [`AsyncWrite`]: tokio::io::AsyncWrite
/// [`CopyToBytes`]: crate::io::CopyToBytes
/// [`Encoder`]: crate::codec::Encoder
/// [`Sink`]: futures_sink::Sink
/// [`codec`]: tokio_util::codec
#[derive(Debug)]
pub struct SinkWriter<S> {
#[pin]
inner: S,
}
}

impl<S> SinkWriter<S> {
/// Creates a new [`SinkWriter`].
pub fn new(sink: S) -> Self {
Self { inner: sink }
}

/// Gets a reference to the underlying sink.
pub fn get_ref(&self) -> &S {
&self.inner
}

/// Gets a mutable reference to the underlying sink.
pub fn get_mut(&mut self) -> &mut S {
&mut self.inner
}

/// Consumes this [`SinkWriter`], returning the underlying sink.
pub fn into_inner(self) -> S {
self.inner
}
}
impl<S, E> AsyncWrite for SinkWriter<S>
where
for<'a> S: Sink<&'a [u8], Error = E>,
E: Into<io::Error>,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let mut this = self.project();
match this.inner.as_mut().poll_ready(cx) {
Poll::Ready(Ok(())) => {
if let Err(e) = this.inner.as_mut().start_send(buf) {
Poll::Ready(Err(e.into()))
} else {
Poll::Ready(Ok(buf.len()))
}
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().inner.poll_flush(cx).map_err(Into::into)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().inner.poll_close(cx).map_err(Into::into)
}
}
72 changes: 72 additions & 0 deletions tokio-util/tests/io_sink_writer.rs
@@ -0,0 +1,72 @@
#![warn(rust_2018_idioms)]

use bytes::Bytes;
use futures_util::SinkExt;
use std::io::{self, Error, ErrorKind};
use tokio::io::AsyncWriteExt;
use tokio_util::codec::{Encoder, FramedWrite};
use tokio_util::io::{CopyToBytes, SinkWriter};
use tokio_util::sync::PollSender;

#[tokio::test]
async fn test_copied_sink_writer() -> Result<(), Error> {
// Construct a channel pair to send data across and wrap a pollable sink.
// Note that the sink must mimic a writable object, e.g. have `std::io::Error`
// as its error type.
// As `PollSender` requires an owned copy of the buffer, we wrap it additionally
// with a `CopyToBytes` helper.
let (tx, mut rx) = tokio::sync::mpsc::channel::<Bytes>(1);
let mut writer = SinkWriter::new(CopyToBytes::new(
PollSender::new(tx).sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe)),
));

// Write data to our interface...
let data: [u8; 4] = [1, 2, 3, 4];
let _ = writer.write(&data).await;

// ... and receive it.
assert_eq!(data.to_vec(), rx.recv().await.unwrap().to_vec());

Ok(())
}

/// A trivial encoder.
struct SliceEncoder;

impl SliceEncoder {
fn new() -> Self {
Self {}
}
}

impl<'a> Encoder<&'a [u8]> for SliceEncoder {
type Error = Error;

fn encode(&mut self, item: &'a [u8], dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
// This is where we'd write packet headers, lengths, etc. in a real encoder.
// For simplicity and demonstration purposes, we just pack a copy of
// the slice at the end of a buffer.
dst.extend_from_slice(item);
Ok(())
}
}

#[tokio::test]
async fn test_direct_sink_writer() -> Result<(), Error> {
// We define a framed writer which accepts byte slices
// and 'reverse' this construction immediately.
let framed_byte_lc = FramedWrite::new(Vec::new(), SliceEncoder::new());
let mut writer = SinkWriter::new(framed_byte_lc);

// Write multiple slices to the sink...
let _ = writer.write(&[1, 2, 3]).await;
let _ = writer.write(&[4, 5, 6]).await;

// ... and compare it with the buffer.
assert_eq!(
writer.into_inner().write_buffer().to_vec().as_slice(),
&[1, 2, 3, 4, 5, 6]
);

Ok(())
}

0 comments on commit 620880f

Please sign in to comment.