Skip to content

Commit

Permalink
io: add lines example for StreamReader (#5145)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Oct 31, 2022
1 parent c2210df commit a9d5eb2
Showing 1 changed file with 177 additions and 103 deletions.
280 changes: 177 additions & 103 deletions tokio-util/src/io/stream_reader.rs
@@ -1,113 +1,162 @@
use bytes::Buf;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};

pin_project! {
/// Convert a [`Stream`] of byte chunks into an [`AsyncRead`].
///
/// This type performs the inverse operation of [`ReaderStream`].
///
/// # Example
///
/// ```
/// use bytes::Bytes;
/// use tokio::io::{AsyncReadExt, Result};
/// use tokio_util::io::StreamReader;
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a stream from an iterator.
/// let stream = tokio_stream::iter(vec![
/// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
/// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
/// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
/// ]);
///
/// // Convert it to an AsyncRead.
/// let mut read = StreamReader::new(stream);
///
/// // Read five bytes from the stream.
/// let mut buf = [0; 5];
/// read.read_exact(&mut buf).await?;
/// assert_eq!(buf, [0, 1, 2, 3, 4]);
///
/// // Read the rest of the current chunk.
/// assert_eq!(read.read(&mut buf).await?, 3);
/// assert_eq!(&buf[..3], [5, 6, 7]);
///
/// // Read the next chunk.
/// assert_eq!(read.read(&mut buf).await?, 4);
/// assert_eq!(&buf[..4], [8, 9, 10, 11]);
///
/// // We have now reached the end.
/// assert_eq!(read.read(&mut buf).await?, 0);
///
/// # Ok(())
/// # }
/// ```
///
/// If the stream produces errors which are not [std::io::Error],
/// the errors can be converted using [`StreamExt`] to map each
/// element.
///
/// ```
/// use bytes::Bytes;
/// use tokio::io::AsyncReadExt;
/// use tokio_util::io::StreamReader;
/// use tokio_stream::StreamExt;
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a stream from an iterator, including an error.
/// let stream = tokio_stream::iter(vec![
/// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
/// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
/// Result::Err("Something bad happened!")
/// ]);
///
/// // Use StreamExt to map the stream and error to a std::io::Error
/// let stream = stream.map(|result| result.map_err(|err| {
/// std::io::Error::new(std::io::ErrorKind::Other, err)
/// }));
///
/// // Convert it to an AsyncRead.
/// let mut read = StreamReader::new(stream);
///
/// // Read five bytes from the stream.
/// let mut buf = [0; 5];
/// read.read_exact(&mut buf).await?;
/// assert_eq!(buf, [0, 1, 2, 3, 4]);
///
/// // Read the rest of the current chunk.
/// assert_eq!(read.read(&mut buf).await?, 3);
/// assert_eq!(&buf[..3], [5, 6, 7]);
///
/// // Reading the next chunk will produce an error
/// let error = read.read(&mut buf).await.unwrap_err();
/// assert_eq!(error.kind(), std::io::ErrorKind::Other);
/// assert_eq!(error.into_inner().unwrap().to_string(), "Something bad happened!");
///
/// // We have now reached the end.
/// assert_eq!(read.read(&mut buf).await?, 0);
///
/// # Ok(())
/// # }
/// ```
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: futures_core::Stream
/// [`ReaderStream`]: crate::io::ReaderStream
/// [`StreamExt`]: tokio_stream::StreamExt
#[derive(Debug)]
pub struct StreamReader<S, B> {
#[pin]
inner: S,
chunk: Option<B>,
}
/// Convert a [`Stream`] of byte chunks into an [`AsyncRead`].
///
/// This type performs the inverse operation of [`ReaderStream`].
///
/// This type also implements the [`AsyncBufRead`] trait, so you can use it
/// to read a `Stream` of byte chunks line-by-line. See the examples below.
///
/// # Example
///
/// ```
/// use bytes::Bytes;
/// use tokio::io::{AsyncReadExt, Result};
/// use tokio_util::io::StreamReader;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a stream from an iterator.
/// let stream = tokio_stream::iter(vec![
/// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
/// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
/// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
/// ]);
///
/// // Convert it to an AsyncRead.
/// let mut read = StreamReader::new(stream);
///
/// // Read five bytes from the stream.
/// let mut buf = [0; 5];
/// read.read_exact(&mut buf).await?;
/// assert_eq!(buf, [0, 1, 2, 3, 4]);
///
/// // Read the rest of the current chunk.
/// assert_eq!(read.read(&mut buf).await?, 3);
/// assert_eq!(&buf[..3], [5, 6, 7]);
///
/// // Read the next chunk.
/// assert_eq!(read.read(&mut buf).await?, 4);
/// assert_eq!(&buf[..4], [8, 9, 10, 11]);
///
/// // We have now reached the end.
/// assert_eq!(read.read(&mut buf).await?, 0);
///
/// # Ok(())
/// # }
/// ```
///
/// If the stream produces errors which are not [`std::io::Error`],
/// the errors can be converted using [`StreamExt`] to map each
/// element.
///
/// ```
/// use bytes::Bytes;
/// use tokio::io::AsyncReadExt;
/// use tokio_util::io::StreamReader;
/// use tokio_stream::StreamExt;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a stream from an iterator, including an error.
/// let stream = tokio_stream::iter(vec![
/// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
/// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
/// Result::Err("Something bad happened!")
/// ]);
///
/// // Use StreamExt to map the stream and error to a std::io::Error
/// let stream = stream.map(|result| result.map_err(|err| {
/// std::io::Error::new(std::io::ErrorKind::Other, err)
/// }));
///
/// // Convert it to an AsyncRead.
/// let mut read = StreamReader::new(stream);
///
/// // Read five bytes from the stream.
/// let mut buf = [0; 5];
/// read.read_exact(&mut buf).await?;
/// assert_eq!(buf, [0, 1, 2, 3, 4]);
///
/// // Read the rest of the current chunk.
/// assert_eq!(read.read(&mut buf).await?, 3);
/// assert_eq!(&buf[..3], [5, 6, 7]);
///
/// // Reading the next chunk will produce an error
/// let error = read.read(&mut buf).await.unwrap_err();
/// assert_eq!(error.kind(), std::io::ErrorKind::Other);
/// assert_eq!(error.into_inner().unwrap().to_string(), "Something bad happened!");
///
/// // We have now reached the end.
/// assert_eq!(read.read(&mut buf).await?, 0);
///
/// # Ok(())
/// # }
/// ```
///
/// Using the [`AsyncBufRead`] impl, you can read a `Stream` of byte chunks
/// line-by-line. Note that you will usually also need to convert the error
/// type when doing this. See the second example for an explanation of how
/// to do this.
///
/// ```
/// use tokio::io::{Result, AsyncBufReadExt};
/// use tokio_util::io::StreamReader;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a stream of byte chunks.
/// let stream = tokio_stream::iter(vec![
/// Result::Ok(b"The first line.\n".as_slice()),
/// Result::Ok(b"The second line.".as_slice()),
/// Result::Ok(b"\nThe third".as_slice()),
/// Result::Ok(b" line.\nThe fourth line.\nThe fifth line.\n".as_slice()),
/// ]);
///
/// // Convert it to an AsyncRead.
/// let mut read = StreamReader::new(stream);
///
/// // Loop through the lines from the `StreamReader`.
/// let mut line = String::new();
/// let mut lines = Vec::new();
/// loop {
/// line.clear();
/// let len = read.read_line(&mut line).await?;
/// if len == 0 { break; }
/// lines.push(line.clone());
/// }
///
/// // Verify that we got the lines we expected.
/// assert_eq!(
/// lines,
/// vec![
/// "The first line.\n",
/// "The second line.\n",
/// "The third line.\n",
/// "The fourth line.\n",
/// "The fifth line.\n",
/// ]
/// );
/// # Ok(())
/// # }
/// ```
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`AsyncBufRead`]: tokio::io::AsyncBufRead
/// [`Stream`]: futures_core::Stream
/// [`ReaderStream`]: crate::io::ReaderStream
/// [`StreamExt`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html
#[derive(Debug)]
pub struct StreamReader<S, B> {
// This field is pinned.
inner: S,
// This field is not pinned.
chunk: Option<B>,
}

impl<S, B, E> StreamReader<S, B>
Expand Down Expand Up @@ -250,3 +299,28 @@ where
}
}
}

// The code below is a manual expansion of the code that pin-project-lite would
// generate. This is done because pin-project-lite fails by hitting the recusion
// limit on this struct. (Every line of documentation is handled recursively by
// the macro.)

impl<S: Unpin, B> Unpin for StreamReader<S, B> {}

struct StreamReaderProject<'a, S, B> {
inner: Pin<&'a mut S>,
chunk: &'a mut Option<B>,
}

impl<S, B> StreamReader<S, B> {
#[inline]
fn project(self: Pin<&mut Self>) -> StreamReaderProject<'_, S, B> {
// SAFETY: We define that only `inner` should be pinned when `Self` is
// and have an appropriate `impl Unpin` for this.
let me = unsafe { Pin::into_inner_unchecked(self) };
StreamReaderProject {
inner: unsafe { Pin::new_unchecked(&mut me.inner) },
chunk: &mut me.chunk,
}
}
}

0 comments on commit a9d5eb2

Please sign in to comment.