Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: yoshuawuyts/futures-concurrency
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v7.5.0
Choose a base ref
...
head repository: yoshuawuyts/futures-concurrency
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v7.6.0
Choose a head ref

Commits on Mar 20, 2024

  1. Copy the full SHA
    2945c72 View commit details
  2. Copy the full SHA
    b7c785c View commit details
  3. Copy the full SHA
    8ae1cec View commit details
  4. Copy the full SHA
    f8d5a64 View commit details
  5. Copy the full SHA
    d2850ab View commit details
  6. Copy the full SHA
    1f42419 View commit details
  7. Copy the full SHA
    78e51a6 View commit details
  8. Copy the full SHA
    79903a6 View commit details
  9. Copy the full SHA
    4fc3a9a View commit details
  10. Update mod.rs

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    b15a413 View commit details
  11. notes on ordering

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    0531290 View commit details
  12. Copy the full SHA
    ded646f View commit details
  13. add tests

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    fa3d0f3 View commit details
  14. Copy the full SHA
    c47ba6d View commit details
  15. implement concurrent map

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    addef7c View commit details
  16. don't clone atomics

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    6599e1c View commit details
  17. Revert "don't clone atomics"

    This reverts commit 31ebdcc.
    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    d398d8a View commit details
  18. Revert "implement concurrent map"

    This reverts commit dde5bef.
    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    1a7dda4 View commit details
  19. don't clone atomics

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    07f82fc View commit details
  20. it works!

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    7c380e2 View commit details
  21. Copy the full SHA
    a54f367 View commit details
  22. implement most of foreach

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    1fba7cc View commit details
  23. Copy the full SHA
    d4457ef View commit details
  24. update PassThrough

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    faf5f26 View commit details
  25. Copy the full SHA
    c1535f3 View commit details
  26. update Drain

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    cc5b97c View commit details
  27. update for_each

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    4db355a View commit details
  28. concurrent drain

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    c958e5a View commit details
  29. for_each works!

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    6fab7af View commit details
  30. tweak examples

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    fbea630 View commit details
  31. Copy the full SHA
    294942a View commit details
  32. Copy the full SHA
    3deb039 View commit details
  33. boilerplate try_for_each

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    ac0ac0d View commit details
  34. correctly short-circuit

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    4e2d283 View commit details
  35. Copy the full SHA
    b85e222 View commit details
  36. Copy the full SHA
    e45af14 View commit details
  37. Copy the full SHA
    ab7276b View commit details
  38. Copy the full SHA
    f22665d View commit details
  39. Copy the full SHA
    433d67d View commit details
  40. Copy the full SHA
    d48d38a View commit details
  41. implement collect

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    52168e5 View commit details
  42. fix alloc builds

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    5e4b495 View commit details
  43. add private Try traits

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    b8835d9 View commit details
  44. Copy the full SHA
    a5a4022 View commit details
  45. fix core builds

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    6fca1e7 View commit details
  46. Copy the full SHA
    6918f71 View commit details
  47. remove drain

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    4667657 View commit details
  48. allow unused utils

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    ce89592 View commit details
  49. Delete drain.rs

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    cf43c56 View commit details
  50. rename finish to flush

    yoshuawuyts committed Mar 20, 2024
    Copy the full SHA
    49b91e9 View commit details
Showing with 2,390 additions and 133 deletions.
  1. +10 −7 Cargo.toml
  2. +62 −0 README.md
  3. +1 −1 examples/happy_eyeballs.rs
  4. +2 −0 src/collections/mod.rs
  5. +67 −0 src/collections/vec.rs
  6. +154 −0 src/concurrent_stream/enumerate.rs
  7. +206 −0 src/concurrent_stream/for_each.rs
  8. +88 −0 src/concurrent_stream/from_concurrent_stream.rs
  9. +100 −0 src/concurrent_stream/from_stream.rs
  10. +21 −0 src/concurrent_stream/into_concurrent_stream.rs
  11. +75 −0 src/concurrent_stream/limit.rs
  12. +182 −0 src/concurrent_stream/map.rs
  13. +238 −0 src/concurrent_stream/mod.rs
  14. +111 −0 src/concurrent_stream/take.rs
  15. +273 −0 src/concurrent_stream/try_for_each.rs
  16. +83 −8 src/future/future_group.rs
  17. +39 −0 src/future/futures_ext.rs
  18. +1 −0 src/future/join/array.rs
  19. +2 −1 src/future/join/tuple.rs
  20. +3 −2 src/future/join/vec.rs
  21. +2 −0 src/future/mod.rs
  22. +2 −0 src/future/race/vec.rs
  23. +2 −2 src/future/race_ok/tuple/mod.rs
  24. +2 −0 src/future/race_ok/vec/error.rs
  25. +3 −3 src/future/race_ok/vec/mod.rs
  26. +1 −0 src/future/try_join/array.rs
  27. +3 −2 src/future/try_join/tuple.rs
  28. +3 −0 src/future/try_join/vec.rs
  29. +61 −0 src/future/wait_until.rs
  30. +149 −57 src/lib.rs
  31. +2 −0 src/stream/chain/vec.rs
  32. +1 −0 src/stream/merge/array.rs
  33. +1 −0 src/stream/merge/tuple.rs
  34. +3 −0 src/stream/merge/vec.rs
  35. +2 −0 src/stream/mod.rs
  36. +54 −1 src/stream/stream_ext.rs
  37. +56 −13 src/stream/stream_group.rs
  38. +63 −0 src/stream/wait_until.rs
  39. +15 −14 src/stream/zip/array.rs
  40. +1 −0 src/stream/zip/tuple.rs
  41. +16 −14 src/stream/zip/vec.rs
  42. +2 −0 src/utils/futures/vec.rs
  43. +9 −0 src/utils/mod.rs
  44. +3 −2 src/utils/output/vec.rs
  45. +2 −1 src/utils/pin.rs
  46. +179 −0 src/utils/private.rs
  47. +28 −0 src/utils/stream.rs
  48. +2 −2 src/utils/wakers/array/no_std.rs
  49. +2 −2 src/utils/wakers/vec/no_std.rs
  50. +3 −1 src/utils/wakers/vec/waker_vec.rs
17 changes: 10 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-concurrency"
version = "7.5.0"
version = "7.6.0"
license = "MIT OR Apache-2.0"
repository = "https://github.com/yoshuawuyts/futures-concurrency"
documentation = "https://docs.rs/futures-concurrency"
@@ -9,9 +9,7 @@ readme = "README.md"
edition = "2021"
keywords = []
categories = []
authors = [
"Yoshua Wuyts <yoshuawuyts@gmail.com>"
]
authors = ["Yoshua Wuyts <yoshuawuyts@gmail.com>"]

[profile.bench]
debug = true
@@ -33,17 +31,22 @@ std = ["alloc"]
alloc = ["bitvec/alloc", "dep:slab", "dep:smallvec"]

[dependencies]
bitvec = { version = "1.0.1", default-features = false }
bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false }
futures-lite = "1.12.0"
pin-project = "1.0.8"
slab = { version = "0.4.8", optional = true }
smallvec = { version = "1.11.0", optional = true }

[dev-dependencies]
async-io = "2.3.2"
async-std = { version = "1.12.0", features = ["attributes"] }
criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] }
criterion = { version = "0.3", features = [
"async",
"async_futures",
"html_reports",
] }
futures = "0.3.25"
futures-lite = "1.12.0"
futures-time = "3.0.0"
lending-stream = "1.0.0"
rand = "0.8.5"
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -41,6 +41,68 @@
</h3>
</div>

Performant, portable, structured concurrency operations for async Rust. It
works with any runtime, does not erase lifetimes, always handles
cancellation, and always returns output to the caller.

`futures-concurrency` provides concurrency operations for both groups of futures
and streams. Both for bounded and unbounded sets of futures and streams. In both
cases performance should be on par with, if not exceed conventional executor
implementations.

## Examples

**Await multiple futures of different types**
```rust
use futures_concurrency::prelude::*;
use std::future;

let a = future::ready(1u8);
let b = future::ready("hello");
let c = future::ready(3u16);
assert_eq!((a, b, c).join().await, (1, "hello", 3));
```

**Concurrently process items in a stream**

```rust
use futures_concurrency::prelude::*;

let v: Vec<_> = vec!["chashu", "nori"]
.into_co_stream()
.map(|msg| async move { format!("hello {msg}") })
.collect()
.await;

assert_eq!(v, &["hello chashu", "hello nori"]);
```

**Access stack data outside the futures' scope**

_Adapted from [`std::thread::scope`](https://doc.rust-lang.org/std/thread/fn.scope.html)._

```rust
use futures_concurrency::prelude::*;

let mut container = vec![1, 2, 3];
let mut num = 0;

let a = async {
println!("hello from the first future");
dbg!(&container);
};

let b = async {
println!("hello from the second future");
num += container[0] + container[2];
};

println!("hello from the main future");
let _ = (a, b).join().await;
container.push(4);
assert_eq!(num, container.len());
```

## Installation
```sh
$ cargo add futures-concurrency
2 changes: 1 addition & 1 deletion examples/happy_eyeballs.rs
Original file line number Diff line number Diff line change
@@ -44,5 +44,5 @@ async fn open_tcp_socket(
}

// Start connecting. If an attempt succeeds, cancel all others attempts.
Ok(futures.race_ok().await?)
futures.race_ok().await
}
2 changes: 2 additions & 0 deletions src/collections/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(feature = "alloc")]
pub mod vec;
67 changes: 67 additions & 0 deletions src/collections/vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Parallel iterator types for [vectors][std::vec] (`Vec<T>`)
//!
//! You will rarely need to interact with this module directly unless you need
//! to name one of the iterator types.
//!
//! [std::vec]: https://doc.rust-lang.org/std/vec/index.html
use crate::concurrent_stream::{self, FromStream};
use crate::prelude::*;
use crate::utils::{from_iter, FromIter};
#[cfg(all(feature = "alloc", not(feature = "std")))]
use alloc::vec::Vec;
use core::future::Ready;

pub use crate::future::join::vec::Join;
pub use crate::future::race::vec::Race;
pub use crate::future::race_ok::vec::{AggregateError, RaceOk};
pub use crate::future::try_join::vec::TryJoin;
pub use crate::stream::chain::vec::Chain;
pub use crate::stream::merge::vec::Merge;
pub use crate::stream::zip::vec::Zip;

/// Concurrent async iterator that moves out of a vector.
#[derive(Debug)]
pub struct IntoConcurrentStream<T>(FromStream<FromIter<alloc::vec::IntoIter<T>>>);

impl<T> ConcurrentStream for IntoConcurrentStream<T> {
type Item = T;

type Future = Ready<T>;

async fn drive<C>(self, consumer: C) -> C::Output
where
C: concurrent_stream::Consumer<Self::Item, Self::Future>,
{
self.0.drive(consumer).await
}

fn concurrency_limit(&self) -> Option<core::num::NonZeroUsize> {
self.0.concurrency_limit()
}
}

impl<T> concurrent_stream::IntoConcurrentStream for Vec<T> {
type Item = T;

type IntoConcurrentStream = IntoConcurrentStream<T>;

fn into_co_stream(self) -> Self::IntoConcurrentStream {
let stream = from_iter(self);
let co_stream = stream.co();
IntoConcurrentStream(co_stream)
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;

#[test]
fn collect() {
futures_lite::future::block_on(async {
let v: Vec<_> = vec![1, 2, 3, 4, 5].into_co_stream().collect().await;
assert_eq!(v, &[1, 2, 3, 4, 5]);
});
}
}
154 changes: 154 additions & 0 deletions src/concurrent_stream/enumerate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use pin_project::pin_project;

use super::{ConcurrentStream, Consumer};
use core::future::Future;
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::task::{ready, Context, Poll};

/// A concurrent iterator that yields the current count and the element during iteration.
///
/// This `struct` is created by the [`enumerate`] method on [`ConcurrentStream`]. See its
/// documentation for more.
///
/// [`enumerate`]: ConcurrentStream::enumerate
/// [`ConcurrentStream`]: trait.ConcurrentStream.html
#[derive(Debug)]
pub struct Enumerate<CS: ConcurrentStream> {
inner: CS,
}

impl<CS: ConcurrentStream> Enumerate<CS> {
pub(crate) fn new(inner: CS) -> Self {
Self { inner }
}
}

impl<CS: ConcurrentStream> ConcurrentStream for Enumerate<CS> {
type Item = (usize, CS::Item);
type Future = EnumerateFuture<CS::Future, CS::Item>;

async fn drive<C>(self, consumer: C) -> C::Output
where
C: Consumer<Self::Item, Self::Future>,
{
self.inner
.drive(EnumerateConsumer {
inner: consumer,
count: 0,
})
.await
}

fn concurrency_limit(&self) -> Option<NonZeroUsize> {
self.inner.concurrency_limit()
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

#[pin_project]
struct EnumerateConsumer<C> {
#[pin]
inner: C,
count: usize,
}
impl<C, Item, Fut> Consumer<Item, Fut> for EnumerateConsumer<C>
where
Fut: Future<Output = Item>,
C: Consumer<(usize, Item), EnumerateFuture<Fut, Item>>,
{
type Output = C::Output;

async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
let this = self.project();
let count = *this.count;
*this.count += 1;
this.inner.send(EnumerateFuture::new(future, count)).await
}

async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
let this = self.project();
this.inner.progress().await
}

async fn flush(self: Pin<&mut Self>) -> Self::Output {
let this = self.project();
this.inner.flush().await
}
}

/// Takes a future and maps it to another future via a closure
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
done: bool,
#[pin]
fut_t: FutT,
count: usize,
}

impl<FutT, T> EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
fn new(fut_t: FutT, count: usize) -> Self {
Self {
done: false,
fut_t,
count,
}
}
}

impl<FutT, T> Future for EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
type Output = (usize, T);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if *this.done {
panic!("future has already been polled to completion once");
}

let item = ready!(this.fut_t.poll(cx));
*this.done = true;
Poll::Ready((*this.count, item))
}
}

#[cfg(test)]
mod test {
// use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
use crate::prelude::*;
use futures_lite::stream;
use futures_lite::StreamExt;
use std::num::NonZeroUsize;

#[test]
fn enumerate() {
futures_lite::future::block_on(async {
let mut n = 0;
stream::iter(std::iter::from_fn(|| {
let v = n;
n += 1;
Some(v)
}))
.take(5)
.co()
.limit(NonZeroUsize::new(1))
.enumerate()
.for_each(|(index, n)| async move {
assert_eq!(index, n);
})
.await;
});
}
}
Loading