Skip to content

Commit

Permalink
futures-util: Migrate from pin-project to pin-project-lite
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Dec 9, 2020
1 parent 1439cfe commit da71932
Show file tree
Hide file tree
Showing 81 changed files with 1,092 additions and 977 deletions.
2 changes: 1 addition & 1 deletion futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0"
pin-project = "1.0.1"
pin-project-lite = "0.2"

[dev-dependencies]
futures = { path = "../futures", version = "0.3.8", features = ["async-await", "thread-pool"] }
Expand Down
21 changes: 11 additions & 10 deletions futures-util/src/future/abortable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ use core::fmt;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, Ordering};
use alloc::sync::Arc;
use pin_project::pin_project;

/// A future which can be remotely short-circuited using an `AbortHandle`.
#[pin_project]
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Abortable<Fut> {
#[pin]
future: Fut,
inner: Arc<AbortInner>,
use pin_project_lite::pin_project;

pin_project! {
/// A future which can be remotely short-circuited using an `AbortHandle`.
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Abortable<Fut> {
#[pin]
future: Fut,
inner: Arc<AbortInner>,
}
}

impl<Fut> Abortable<Fut> where Fut: Future {
Expand Down
77 changes: 43 additions & 34 deletions futures-util/src/future/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@ use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project::pin_project;

/// Combines two different futures, streams, or sinks having the same associated types into a single
/// type.
#[pin_project(project = EitherProj)]
#[derive(Debug, Clone)]
pub enum Either<A, B> {
/// First branch of the type
Left(#[pin] A),
Left(A),
/// Second branch of the type
Right(#[pin] B),
Right(B),
}

impl<A, B> Either<A, B> {
fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
unsafe {
match self.get_unchecked_mut() {
Either::Left(a) => Either::Left(Pin::new_unchecked(a)),
Either::Right(b) => Either::Right(Pin::new_unchecked(b)),
}
}
}
}

impl<A, B, T> Either<(T, A), (T, B)> {
Expand Down Expand Up @@ -60,8 +69,8 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
EitherProj::Left(x) => x.poll(cx),
EitherProj::Right(x) => x.poll(cx),
Either::Left(x) => x.poll(cx),
Either::Right(x) => x.poll(cx),
}
}
}
Expand All @@ -88,8 +97,8 @@ where

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
EitherProj::Left(x) => x.poll_next(cx),
EitherProj::Right(x) => x.poll_next(cx),
Either::Left(x) => x.poll_next(cx),
Either::Right(x) => x.poll_next(cx),
}
}
}
Expand Down Expand Up @@ -117,29 +126,29 @@ where

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherProj::Left(x) => x.poll_ready(cx),
EitherProj::Right(x) => x.poll_ready(cx),
Either::Left(x) => x.poll_ready(cx),
Either::Right(x) => x.poll_ready(cx),
}
}

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
match self.project() {
EitherProj::Left(x) => x.start_send(item),
EitherProj::Right(x) => x.start_send(item),
Either::Left(x) => x.start_send(item),
Either::Right(x) => x.start_send(item),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherProj::Left(x) => x.poll_flush(cx),
EitherProj::Right(x) => x.poll_flush(cx),
Either::Left(x) => x.poll_flush(cx),
Either::Right(x) => x.poll_flush(cx),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherProj::Left(x) => x.poll_close(cx),
EitherProj::Right(x) => x.poll_close(cx),
Either::Left(x) => x.poll_close(cx),
Either::Right(x) => x.poll_close(cx),
}
}
}
Expand Down Expand Up @@ -176,8 +185,8 @@ mod if_std {
buf: &mut [u8],
) -> Poll<Result<usize>> {
match self.project() {
EitherProj::Left(x) => x.poll_read(cx, buf),
EitherProj::Right(x) => x.poll_read(cx, buf),
Either::Left(x) => x.poll_read(cx, buf),
Either::Right(x) => x.poll_read(cx, buf),
}
}

Expand All @@ -187,8 +196,8 @@ mod if_std {
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
match self.project() {
EitherProj::Left(x) => x.poll_read_vectored(cx, bufs),
EitherProj::Right(x) => x.poll_read_vectored(cx, bufs),
Either::Left(x) => x.poll_read_vectored(cx, bufs),
Either::Right(x) => x.poll_read_vectored(cx, bufs),
}
}
}
Expand All @@ -204,8 +213,8 @@ mod if_std {
buf: &[u8],
) -> Poll<Result<usize>> {
match self.project() {
EitherProj::Left(x) => x.poll_write(cx, buf),
EitherProj::Right(x) => x.poll_write(cx, buf),
Either::Left(x) => x.poll_write(cx, buf),
Either::Right(x) => x.poll_write(cx, buf),
}
}

Expand All @@ -215,22 +224,22 @@ mod if_std {
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
match self.project() {
EitherProj::Left(x) => x.poll_write_vectored(cx, bufs),
EitherProj::Right(x) => x.poll_write_vectored(cx, bufs),
Either::Left(x) => x.poll_write_vectored(cx, bufs),
Either::Right(x) => x.poll_write_vectored(cx, bufs),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.project() {
EitherProj::Left(x) => x.poll_flush(cx),
EitherProj::Right(x) => x.poll_flush(cx),
Either::Left(x) => x.poll_flush(cx),
Either::Right(x) => x.poll_flush(cx),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.project() {
EitherProj::Left(x) => x.poll_close(cx),
EitherProj::Right(x) => x.poll_close(cx),
Either::Left(x) => x.poll_close(cx),
Either::Right(x) => x.poll_close(cx),
}
}
}
Expand All @@ -246,8 +255,8 @@ mod if_std {
pos: SeekFrom,
) -> Poll<Result<u64>> {
match self.project() {
EitherProj::Left(x) => x.poll_seek(cx, pos),
EitherProj::Right(x) => x.poll_seek(cx, pos),
Either::Left(x) => x.poll_seek(cx, pos),
Either::Right(x) => x.poll_seek(cx, pos),
}
}
}
Expand All @@ -259,15 +268,15 @@ mod if_std {
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
match self.project() {
EitherProj::Left(x) => x.poll_fill_buf(cx),
EitherProj::Right(x) => x.poll_fill_buf(cx),
Either::Left(x) => x.poll_fill_buf(cx),
Either::Right(x) => x.poll_fill_buf(cx),
}
}

fn consume(self: Pin<&mut Self>, amt: usize) {
match self.project() {
EitherProj::Left(x) => x.consume(amt),
EitherProj::Right(x) => x.consume(amt),
Either::Left(x) => x.consume(amt),
Either::Right(x) => x.consume(amt),
}
}
}
Expand Down
20 changes: 12 additions & 8 deletions futures-util/src/future/future/catch_unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};

use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_project::pin_project;
use pin_project_lite::pin_project;

/// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method.
#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CatchUnwind<Fut>(#[pin] Fut);
pin_project! {
/// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CatchUnwind<Fut> {
#[pin]
future: Fut,
}
}

impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe {
pub(super) fn new(future: Fut) -> Self {
Self(future)
Self { future }
}
}

Expand All @@ -24,7 +28,7 @@ impl<Fut> Future for CatchUnwind<Fut>
type Output = Result<Fut::Output, Box<dyn Any + Send>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let f = self.project().0;
let f = self.project().future;
catch_unwind(AssertUnwindSafe(|| f.poll(cx)))?.map(Ok)
}
}
46 changes: 24 additions & 22 deletions futures-util/src/future/future/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ use futures_core::stream::{FusedStream, Stream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use futures_core::task::{Context, Poll};
use pin_project::pin_project;
use pin_project_lite::pin_project;

#[pin_project(project = FlattenProj)]
#[derive(Debug)]
pub enum Flatten<Fut1, Fut2> {
First(#[pin] Fut1),
Second(#[pin] Fut2),
Empty,
pin_project! {
#[project = FlattenProj]
#[derive(Debug)]
pub enum Flatten<Fut1, Fut2> {
First { #[pin] f: Fut1 },
Second { #[pin] f: Fut2 },
Empty,
}
}

impl<Fut1, Fut2> Flatten<Fut1, Fut2> {
pub(crate) fn new(future: Fut1) -> Self {
Self::First(future)
Self::First { f: future }
}
}

Expand All @@ -42,11 +44,11 @@ impl<Fut> Future for Flatten<Fut, Fut::Output>
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
match self.as_mut().project() {
FlattenProj::First(f) => {
FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
self.set(Self::Second(f));
self.set(Self::Second { f });
},
FlattenProj::Second(f) => {
FlattenProj::Second { f } => {
let output = ready!(f.poll(cx));
self.set(Self::Empty);
break output;
Expand Down Expand Up @@ -78,11 +80,11 @@ impl<Fut> Stream for Flatten<Fut, Fut::Output>
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
match self.as_mut().project() {
FlattenProj::First(f) => {
FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
self.set(Self::Second(f));
self.set(Self::Second { f });
},
FlattenProj::Second(f) => {
FlattenProj::Second { f } => {
let output = ready!(f.poll_next(cx));
if output.is_none() {
self.set(Self::Empty);
Expand Down Expand Up @@ -110,11 +112,11 @@ where
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(loop {
match self.as_mut().project() {
FlattenProj::First(f) => {
FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
self.set(Self::Second(f));
self.set(Self::Second { f });
},
FlattenProj::Second(f) => {
FlattenProj::Second { f } => {
break ready!(f.poll_ready(cx));
},
FlattenProj::Empty => panic!("poll_ready called after eof"),
Expand All @@ -124,16 +126,16 @@ where

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
match self.project() {
FlattenProj::First(_) => panic!("poll_ready not called first"),
FlattenProj::Second(f) => f.start_send(item),
FlattenProj::First { .. } => panic!("poll_ready not called first"),
FlattenProj::Second { f } => f.start_send(item),
FlattenProj::Empty => panic!("start_send called after eof"),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
FlattenProj::First(_) => Poll::Ready(Ok(())),
FlattenProj::Second(f) => f.poll_flush(cx),
FlattenProj::First { .. } => Poll::Ready(Ok(())),
FlattenProj::Second { f } => f.poll_flush(cx),
FlattenProj::Empty => panic!("poll_flush called after eof"),
}
}
Expand All @@ -143,7 +145,7 @@ where
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let res = match self.as_mut().project() {
FlattenProj::Second(f) => f.poll_close(cx),
FlattenProj::Second { f } => f.poll_close(cx),
_ => Poll::Ready(Ok(())),
};
if res.is_ready() {
Expand Down

0 comments on commit da71932

Please sign in to comment.