Skip to content

Commit

Permalink
Merge branch 'master' into pin-project-lite
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Dec 8, 2020
2 parents 7d6ce00 + cb52c5e commit 7213cdc
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 47 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,22 +192,23 @@ jobs:
- run: cargo bench --manifest-path futures-util/Cargo.toml --features=bilock,unstable

features:
name: cargo hack check --each-feature
name: cargo hack check --feature-powerset
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust
run: rustup update nightly && rustup default nightly
- run: cargo install cargo-hack
# Check each specified feature works properly
# * `--each-feature` - run for each feature which includes --no-default-features and default features of package
# * `--feature-powerset` - run for the feature powerset of the package
# * `--depth 2` - limit the max number of simultaneous feature flags of `--feature-powerset`
# * `--no-dev-deps` - build without dev-dependencies to avoid https://github.com/rust-lang/cargo/issues/4866
# * `--exclude futures-test` - futures-test cannot be compiled with no-default features
# * `--features unstable` - some features cannot be compiled without this feature
# * `--ignore-unknown-features` - some crates doesn't have 'unstable' feature
- run: |
cargo hack check \
--each-feature --no-dev-deps \
--feature-powerset --depth 2 --no-dev-deps \
--workspace --exclude futures-test \
--features unstable --ignore-unknown-features
Expand Down
10 changes: 5 additions & 5 deletions futures-util/src/future/future/remote_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ impl<Fut: Future> Future for Remote<Fut> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = self.project();

if let Poll::Ready(_) = this.tx.as_mut().unwrap().poll_canceled(cx) {
if !this.keep_running.load(Ordering::SeqCst) {
// Cancelled, bail out
return Poll::Ready(())
}
if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready()
&& !this.keep_running.load(Ordering::SeqCst)
{
// Cancelled, bail out
return Poll::Ready(());
}

let output = ready!(this.future.poll(cx));
Expand Down
88 changes: 52 additions & 36 deletions futures-util/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
//! This module is only available when the `sink` feature of this
//! library is activated, and it is activated by default.

use crate::future::Either;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
use crate::future::Either;

#[cfg(feature = "compat")]
use crate::compat::CompatSink;
Expand Down Expand Up @@ -41,6 +41,9 @@ pub use self::send::Send;
mod send_all;
pub use self::send_all::SendAll;

mod unfold;
pub use self::unfold::{unfold, Unfold};

mod with;
pub use self::with::With;

Expand Down Expand Up @@ -69,10 +72,11 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Note that this function consumes the given sink, returning a wrapped
/// version, much like `Iterator::map`.
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
where F: FnMut(U) -> Fut,
Fut: Future<Output = Result<Item, E>>,
E: From<Self::Error>,
Self: Sized
where
F: FnMut(U) -> Fut,
Fut: Future<Output = Result<Item, E>>,
E: From<Self::Error>,
Self: Sized,
{
With::new(self, f)
}
Expand Down Expand Up @@ -110,9 +114,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// # });
/// ```
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
where F: FnMut(U) -> St,
St: Stream<Item = Result<Item, Self::Error>>,
Self: Sized
where
F: FnMut(U) -> St,
St: Stream<Item = Result<Item, Self::Error>>,
Self: Sized,
{
WithFlatMap::new(self, f)
}
Expand All @@ -133,8 +138,9 @@ pub trait SinkExt<Item>: Sink<Item> {

/// Transforms the error returned by the sink.
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
where F: FnOnce(Self::Error) -> E,
Self: Sized,
where
F: FnOnce(Self::Error) -> E,
Self: Sized,
{
SinkMapErr::new(self, f)
}
Expand All @@ -143,13 +149,13 @@ pub trait SinkExt<Item>: Sink<Item> {
///
/// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
where Self: Sized,
Self::Error: Into<E>,
where
Self: Sized,
Self::Error: Into<E>,
{
SinkErrInto::new(self)
}


/// Adds a fixed-size buffer to the current sink.
///
/// The resulting sink will buffer up to `capacity` items when the
Expand All @@ -164,14 +170,16 @@ pub trait SinkExt<Item>: Sink<Item> {
/// library is activated, and it is activated by default.
#[cfg(feature = "alloc")]
fn buffer(self, capacity: usize) -> Buffer<Self, Item>
where Self: Sized,
where
Self: Sized,
{
Buffer::new(self, capacity)
}

/// Close the sink.
fn close(&mut self) -> Close<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Close::new(self)
}
Expand All @@ -181,9 +189,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This adapter clones each incoming item and forwards it to both this as well as
/// the other sink at the same time.
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
where Self: Sized,
Item: Clone,
Si: Sink<Item, Error=Self::Error>
where
Self: Sized,
Item: Clone,
Si: Sink<Item, Error = Self::Error>,
{
Fanout::new(self, other)
}
Expand All @@ -193,7 +202,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This adapter is intended to be used when you want to stop sending to the sink
/// until all current requests are processed.
fn flush(&mut self) -> Flush<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Flush::new(self)
}
Expand All @@ -205,7 +215,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// to batch together items to send via `send_all`, rather than flushing
/// between each item.**
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Send::new(self, item)
}
Expand All @@ -221,12 +232,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Doing `sink.send_all(stream)` is roughly equivalent to
/// `stream.forward(sink)`. The returned future will exhaust all items from
/// `stream` and send them to `self`.
fn send_all<'a, St>(
&'a mut self,
stream: &'a mut St
) -> SendAll<'a, Self, St>
where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
Self: Unpin,
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
where
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
Self: Unpin,
{
SendAll::new(self, stream)
}
Expand All @@ -237,8 +246,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This can be used in combination with the `right_sink` method to write `if`
/// statements that evaluate to different streams in different branches.
fn left_sink<Si2>(self) -> Either<Self, Si2>
where Si2: Sink<Item, Error = Self::Error>,
Self: Sized
where
Si2: Sink<Item, Error = Self::Error>,
Self: Sized,
{
Either::Left(self)
}
Expand All @@ -249,8 +259,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This can be used in combination with the `left_sink` method to write `if`
/// statements that evaluate to different streams in different branches.
fn right_sink<Si1>(self) -> Either<Si1, Self>
where Si1: Sink<Item, Error = Self::Error>,
Self: Sized
where
Si1: Sink<Item, Error = Self::Error>,
Self: Sized,
{
Either::Right(self)
}
Expand All @@ -260,39 +271,44 @@ pub trait SinkExt<Item>: Sink<Item> {
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> CompatSink<Self, Item>
where Self: Sized + Unpin,
where
Self: Sized + Unpin,
{
CompatSink::new(self)
}

/// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
/// sink types.
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_ready(cx)
}

/// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
/// sink types.
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).start_send(item)
}

/// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
/// sink types.
fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_flush(cx)
}

/// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
/// sink types.
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_close(cx)
}
Expand Down
83 changes: 83 additions & 0 deletions futures-util/src/sink/unfold.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use core::{future::Future, pin::Pin};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_project::pin_project;

/// Sink for the [`unfold`] function.
#[pin_project]
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct Unfold<T, F, R> {
state: Option<T>,
function: F,
#[pin]
future: Option<R>,
}

/// Create a sink from a function which processes one item at a time.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::sink::{self, SinkExt};
///
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
/// async move {
/// sum += i;
/// eprintln!("{}", i);
/// Ok::<_, futures::never::Never>(sum)
/// }
/// });
/// futures::pin_mut!(unfold);
/// unfold.send(5).await?;
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
/// ```
pub fn unfold<T, F, R>(init: T, function: F) -> Unfold<T, F, R> {
Unfold {
state: Some(init),
function,
future: None,
}
}

impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
where
F: FnMut(T, Item) -> R,
R: Future<Output = Result<T, E>>,
{
type Error = E;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
let mut this = self.project();
debug_assert!(this.future.is_none());
let future = (this.function)(this.state.take().unwrap(), item);
this.future.set(Some(future));
Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() {
let result = match ready!(future.poll(cx)) {
Ok(state) => {
*this.state = Some(state);
Ok(())
}
Err(err) => Err(err),
};
this.future.set(None);
result
} else {
Ok(())
})
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}
9 changes: 6 additions & 3 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ pub mod future {
NeverError,

TryFutureExt,
AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse,
InspectOk, InspectErr, TryFlattenStream, UnwrapOrElse,
AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, MapOkOrElse, MapInto,
OrElse, OkInto, InspectOk, InspectErr, TryFlatten, TryFlattenStream, UnwrapOrElse,
};

#[cfg(feature = "alloc")]
Expand Down Expand Up @@ -348,6 +348,9 @@ pub mod io {
Repeat, ReuniteError, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf,
WriteVectored,
};

#[cfg(feature = "write-all-vectored")]
pub use futures_util::io::WriteAllVectored;
}

#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
Expand Down Expand Up @@ -417,7 +420,7 @@ pub mod sink {

pub use futures_util::sink::{
Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
SinkExt, Fanout, Drain, drain,
SinkExt, Fanout, Drain, drain, Unfold, unfold,
WithFlatMap,
};

Expand Down

0 comments on commit 7213cdc

Please sign in to comment.