Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extension trait for Sink #229

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Expand Up @@ -46,6 +46,9 @@ futures-01 = ["futures-01-crate"]
# The standard library's implementation of futures
futures = ["futures-core-crate", "pin-project"]

# Add extension trait for the SinkExt trait
sink = ["futures-crate"]

# No public user should make use of this feature
# https://github.com/rust-lang/cargo/issues/1596
"internal-dev-dependencies" = ["futures-crate"]
Expand All @@ -63,3 +66,6 @@ futures-01-crate = { package = "futures", version = "0.1", optional = true, defa
futures-crate = { package = "futures", version = "0.3.0", optional = true, default-features = false }
futures-core-crate = { package = "futures-core", version = "0.3.0", optional = true, default-features = false }
pin-project = { version = "0.4", optional = true, default-features = false }

[dev-dependencies]
futures-01-crate = { package = "futures", version = "0.1" }
41 changes: 38 additions & 3 deletions compatibility-tests/futures-0.1/src/lib.rs
@@ -1,12 +1,19 @@
#![cfg(test)]

mod api {
use futures::{future, stream, Future, Stream};
use futures::{future, stream, Future, Stream, Sink};
use snafu::Snafu;

#[derive(Debug, Snafu)]
pub enum Error {
InvalidUrl { url: String },
InfallibleFailed,
}

impl From<()> for Error {
fn from(_: ()) -> Self {
Error::InfallibleFailed
}
}

pub fn fetch_page(url: &str) -> impl Future<Item = String, Error = Error> {
Expand All @@ -16,18 +23,28 @@ mod api {
pub fn keep_fetching_page<'u>(url: &'u str) -> impl Stream<Item = String, Error = Error> + 'u {
stream::repeat::<_, ()>(()).then(move |_| fetch_page(url))
}

pub fn upload_str(url: &str, _: &str) -> Result<String, Error> {
InvalidUrl { url }.fail()
}

pub fn upload<'u>(url: &'u str) -> impl Sink<SinkItem = String, SinkError = Error> + 'u {
Vec::new().with(move |s: String| { upload_str(url, &s) })
}
}

use futures::{future, Future, Stream};
use futures::{future, stream, Future, Stream, Sink};
use snafu::{
futures01::{future::FutureExt as _, stream::StreamExt as _},
futures01::{future::FutureExt as _, stream::StreamExt as _, sink::SinkExt as _},
Snafu,
};

#[derive(Debug, Snafu)]
enum Error {
UnableToLoadAppleStock { source: api::Error },
UnableToLoadGoogleStock { source: api::Error, name: String },
UnableToUploadApple { source: api::Error },
UnableToUploadGoogle { source: api::Error, name: String },
}

// Can be used as a `Future` combinator
Expand Down Expand Up @@ -59,6 +76,21 @@ fn load_stock_data_series() -> impl Future<Item = String, Error = Error> {
})
}

// Can be used as a `SinkExt` combinator
fn upload_strings() -> impl Future<Item=(), Error=Error> {
let apple = api::upload("apple").context(UnableToUploadApple);
let google = api::upload("google").with_context(|| UnableToUploadGoogle {
name: String::from("sink"),
});

let together = apple.fanout(google);

stream::repeat("str".to_owned())
.take(10)
.forward(together)
.map(|_| ())
}

#[test]
fn implements_error() {
fn check<T: std::error::Error>() {}
Expand All @@ -69,4 +101,7 @@ fn implements_error() {

let c = load_stock_data_series().wait();
c.unwrap_err();

let d = upload_strings().wait();
d.unwrap_err();
}
2 changes: 1 addition & 1 deletion compatibility-tests/futures/Cargo.toml
Expand Up @@ -5,5 +5,5 @@ authors = ["Jake Goulding <jake.goulding@gmail.com>"]
edition = "2018"

[dependencies]
snafu = { path = "../..", features = ["futures"] }
snafu = { path = "../..", features = ["futures", "sink"] }
futures = "0.3.0"
48 changes: 43 additions & 5 deletions compatibility-tests/futures/src/lib.rs
@@ -1,36 +1,55 @@
#![cfg(test)]

mod api {
use futures::{stream, StreamExt, TryStream};
use futures::{sink, stream, Sink, SinkExt, StreamExt, TryStream};
use snafu::Snafu;
use std::convert::Infallible;

#[derive(Debug, Snafu)]
#[derive(Debug, Clone, Snafu)]
pub enum Error {
InvalidUrl { url: String },
}

impl From<Infallible> for Error {
fn from(e: Infallible) -> Self {
match e {}
}
}

pub async fn fetch_page(url: &str) -> Result<String, Error> {
InvalidUrl { url }.fail()
}

pub fn keep_fetching_page<'u>(url: &'u str) -> impl TryStream<Ok = String, Error = Error> + 'u {
stream::repeat(()).then(move |_| fetch_page(url))
}

pub async fn upload_str(url: &str, _: &str) -> Result<String, Error> {
InvalidUrl { url }.fail()
}

pub fn upload<'u>(url: &'u str) -> impl Sink<String, Error = Error> + 'u {
sink::drain().with(move |s: String| async move { upload_str(url, &s).await })
}
}

use futures::future::ok;
use futures::{
future,
stream::{StreamExt as _, TryStreamExt as _},
stream::{self, StreamExt as _, TryStreamExt as _},
SinkExt,
};
use snafu::{
futures::{TryFutureExt as _, TryStreamExt as _},
futures::{SnafuSinkExt as _, TryFutureExt as _, TryStreamExt as _},
ResultExt, Snafu,
};

#[derive(Debug, Snafu)]
#[derive(Debug, Clone, Snafu)]
enum Error {
UnableToLoadAppleStock { source: api::Error },
UnableToLoadGoogleStock { source: api::Error, name: String },
UnableToUploadApple { source: api::Error },
UnableToUploadGoogle { source: api::Error, name: String },
}

// Normal `Result` code works with `await`
Expand Down Expand Up @@ -97,6 +116,22 @@ async fn load_stock_data_series() -> Result<String, Error> {
.await
}

// Can be used as a `SinkExt` combinator
async fn upload_strings() -> Result<(), Error> {
let apple = api::upload("apple").context(UnableToUploadApple);
let google = api::upload("google").with_context(|| UnableToUploadGoogle {
name: String::from("sink"),
});

let together = apple.fanout(google);

stream::repeat(Ok("str".to_owned()))
.take(10)
.forward(together)
.await?;
Ok(())
}

#[test]
fn implements_error() {
fn check<T: std::error::Error>() {}
Expand All @@ -115,4 +150,7 @@ fn implements_error() {

let d = block_on(load_stock_data_series());
d.unwrap_err();

let d = block_on(upload_strings());
d.unwrap_err();
}
5 changes: 5 additions & 0 deletions src/futures/mod.rs
Expand Up @@ -7,9 +7,14 @@
//! [`TryStream`]: futures_core_crate::TryStream
//! [feature flag]: crate::guide::feature_flags

#[cfg(feature = "sink")]
pub mod sink;
pub mod try_future;
pub mod try_stream;

#[cfg(feature = "sink")]
#[doc(inline)]
pub use self::sink::SnafuSinkExt;
#[doc(inline)]
pub use self::try_future::TryFutureExt;
#[doc(inline)]
Expand Down