Skip to content

Commit

Permalink
Add biased variants for newly fair select functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Xaeroxe committed Aug 14, 2023
1 parent baeca1b commit e0f40c4
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 14 deletions.
6 changes: 3 additions & 3 deletions futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ mod join_all;
pub use self::join_all::{join_all, JoinAll};

mod select;
pub use self::select::{select, Select};
pub use self::select::{select, select_biased, Select};

#[cfg(feature = "alloc")]
mod select_all;
Expand All @@ -99,12 +99,12 @@ mod try_join_all;
pub use self::try_join_all::{try_join_all, TryJoinAll};

mod try_select;
pub use self::try_select::{try_select, TrySelect};
pub use self::try_select::{try_select, try_select_biased, TrySelect};

#[cfg(feature = "alloc")]
mod select_ok;
#[cfg(feature = "alloc")]
pub use self::select_ok::{select_ok, SelectOk};
pub use self::select_ok::{select_ok, select_ok_biased, SelectOk};

mod either;
pub use self::either::Either;
Expand Down
89 changes: 87 additions & 2 deletions futures-util/src/future/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures_core::task::{Context, Poll};
#[derive(Debug)]
pub struct Select<A, B> {
inner: Option<(A, B)>,
_biased: bool,
}

impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
Expand All @@ -23,7 +24,8 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
/// wrapped version of them.
///
/// If both futures are ready when this is polled, the winner will be pseudo-randomly
/// selected.
/// selected, unless the std feature is not enabled. If std is enabled, the first
/// argument will always win.
///
/// Also note that if both this and the second future have the same
/// output type you can use the `Either::factor_first` method to
Expand Down Expand Up @@ -91,6 +93,88 @@ where
{
assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
inner: Some((future1, future2)),
_biased: false,
})
}

/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one.
///
/// This function will return a new future which awaits for either one of both
/// futures to complete. The returned future will finish with both the value
/// resolved and a future representing the completion of the other work.
///
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// If both futures are ready when this is polled, the winner will always be the first argument.
///
/// Also note that if both this and the second future have the same
/// output type you can use the `Either::factor_first` method to
/// conveniently extract out the value at the end.
///
/// # Examples
///
/// A simple example
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::{
/// pin_mut,
/// future::Either,
/// future::self,
/// };
///
/// // These two futures have different types even though their outputs have the same type.
/// let future1 = async {
/// future::pending::<()>().await; // will never finish
/// 1
/// };
/// let future2 = async {
/// future::ready(2).await
/// };
///
/// // 'select_biased' requires Future + Unpin bounds
/// pin_mut!(future1);
/// pin_mut!(future2);
///
/// let value = match future::select_biased(future1, future2).await {
/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1`
/// // `_` represents `future2`
/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2`
/// // `_` represents `future1`
/// };
///
/// assert!(value == 2);
/// # });
/// ```
///
/// A more complex example
///
/// ```
/// use futures::future::{self, Either, Future, FutureExt};
///
/// // A poor-man's join implemented on top of select
///
/// fn join<A, B>(a: A, b: B) -> impl Future<Output=(A::Output, B::Output)>
/// where A: Future + Unpin,
/// B: Future + Unpin,
/// {
/// future::select_biased(a, b).then(|either| {
/// match either {
/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(),
/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(),
/// }
/// })
/// }
/// ```
pub fn select_biased<A, B>(future1: A, future2: B) -> Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
inner: Some((future1, future2)),
_biased: true,
})
}

Expand All @@ -111,6 +195,7 @@ where
Some(value) => value,
}
}
let _biased = self._biased;

let (a, b) = self.inner.as_mut().expect("cannot poll Select twice");

Expand All @@ -123,7 +208,7 @@ where
}

#[cfg(feature = "std")]
if crate::gen_index(2) == 0 {
if _biased || crate::gen_index(2) == 0 {
poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left);
poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right);
} else {
Expand Down
40 changes: 36 additions & 4 deletions futures-util/src/future/select_ok.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures_core::task::{Context, Poll};
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SelectOk<Fut> {
inner: Vec<Fut>,
_biased: bool,
}

impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
Expand All @@ -35,7 +36,7 @@ impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
/// Some futures that would have been polled and had errors get dropped, may now instead
/// remain in the collection without being polled.
///
/// If you were relying on this biased behavior, consider switching to the [`select_biased!`](crate::select_biased) macro.
/// If you were relying on this biased behavior, consider switching to the [`select_ok_biased`] function.
///
/// # Panics
///
Expand All @@ -45,7 +46,36 @@ where
I: IntoIterator,
I::Item: TryFuture + Unpin,
{
let ret = SelectOk { inner: iter.into_iter().collect() };
let ret = SelectOk { inner: iter.into_iter().collect(), _biased: false };
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
assert_future::<
Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
_,
>(ret)
}

/// Creates a new future which will select the first successful future over a list of futures.
///
/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike
/// `select_all`, this will only return the first successful completion, or the last
/// failure. This is useful in contexts where any success is desired and failures
/// are ignored, unless all the futures fail.
///
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// If multiple futures are ready at the same time this function is biased towards
/// entries that are earlier in the list.
///
/// # Panics
///
/// This function will panic if the iterator specified contains no items.
pub fn select_ok_biased<I>(iter: I) -> SelectOk<I::Item>
where
I: IntoIterator,
I::Item: TryFuture + Unpin,
{
let ret = SelectOk { inner: iter.into_iter().collect(), _biased: true };
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
assert_future::<
Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
Expand All @@ -57,10 +87,12 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { inner } = &mut *self;
let Self { inner, _biased } = &mut *self;
#[cfg(feature = "std")]
{
crate::shuffle(inner);
if !*_biased {
crate::shuffle(inner);
}
}
// loop until we've either exhausted all errors, a success was hit, or nothing is ready
loop {
Expand Down
55 changes: 53 additions & 2 deletions futures-util/src/future/try_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures_core::task::{Context, Poll};
#[derive(Debug)]
pub struct TrySelect<A, B> {
inner: Option<(A, B)>,
_biased: bool,
}

impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
Expand All @@ -25,7 +26,8 @@ type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::E
/// wrapped version of them.
///
/// If both futures are ready when this is polled, the winner will be pseudo-randomly
/// selected.
/// selected, unless the `std` feature is disabled. If the std feature is disabled,
/// the first argument will always win.
///
/// Also note that if both this and the second future have the same
/// success/error type you can use the `Either::factor_first` method to
Expand Down Expand Up @@ -60,6 +62,55 @@ where
{
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
inner: Some((future1, future2)),
_biased: false,
})
}

/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one.
///
/// This function will return a new future which awaits for either one of both
/// futures to complete. The returned future will finish with both the value
/// resolved and a future representing the completion of the other work.
///
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// If both futures are ready when this is polled, the winner will always be the first one.
///
/// Also note that if both this and the second future have the same
/// success/error type you can use the `Either::factor_first` method to
/// conveniently extract out the value at the end.
///
/// # Examples
///
/// ```
/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt};
///
/// // A poor-man's try_join implemented on top of select
///
/// fn try_join<A, B, E>(a: A, b: B) -> impl TryFuture<Ok=(A::Ok, B::Ok), Error=E>
/// where A: TryFuture<Error = E> + Unpin + 'static,
/// B: TryFuture<Error = E> + Unpin + 'static,
/// E: 'static,
/// {
/// future::try_select(a, b).then(|res| -> Box<dyn Future<Output = Result<_, _>> + Unpin> {
/// match res {
/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))),
/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))),
/// Err(Either::Left((e, _))) => Box::new(future::err(e)),
/// Err(Either::Right((e, _))) => Box::new(future::err(e)),
/// }
/// })
/// }
/// ```
pub fn try_select_biased<A, B>(future1: A, future2: B) -> TrySelect<A, B>
where
A: TryFuture + Unpin,
B: TryFuture + Unpin,
{
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
inner: Some((future1, future2)),
_biased: true,
})
}

Expand Down Expand Up @@ -91,7 +142,7 @@ where

#[cfg(feature = "std")]
{
if crate::gen_index(2) == 0 {
if self._biased || crate::gen_index(2) == 0 {
poll_wrap!(a, b, Either::Left, Either::Right)
} else {
poll_wrap!(b, a, Either::Right, Either::Left)
Expand Down
12 changes: 11 additions & 1 deletion futures/tests/future_select.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::future::ready;

use futures::future::select;
use futures::future::{select, select_biased};
use futures_executor::block_on;

#[test]
Expand All @@ -14,3 +14,13 @@ fn is_fair() {
assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD);
assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD)
}

#[test]
fn is_biased() {
let mut results = Vec::with_capacity(100);
for _ in 0..100 {
let (i, _) = block_on(select_biased(ready(0), ready(1))).factor_first();
results.push(i);
}
assert!(results.iter().all(|i| *i == 0));
}
14 changes: 13 additions & 1 deletion futures/tests/future_select_ok.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Debug;
use std::time::Duration;

use futures::executor::block_on;
use futures::future::{err, ok, select_ok, Future};
use futures::future::{err, ok, select_ok, select_ok_biased, Future};
use futures_channel::oneshot;
use std::thread;

Expand Down Expand Up @@ -65,3 +65,15 @@ fn is_fair() {
assert_eq!(results.iter().filter(|i| **i == 3).take(THRESHOLD).count(), THRESHOLD);
assert_eq!(results.iter().filter(|i| **i == 4).take(THRESHOLD).count(), THRESHOLD);
}

#[test]
fn is_biased() {
let mut results = Vec::with_capacity(100);
for _ in 0..100 {
let v = vec![err(1), err(2), ok(3), ok(4)];

let (i, _v) = block_on(select_ok_biased(v)).ok().unwrap();
results.push(i);
}
assert!(results.iter().all(|i| *i == 3));
}
13 changes: 12 additions & 1 deletion futures/tests/future_try_select.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::future::{ok, try_select};
use futures::future::{ok, try_select, try_select_biased};
use futures_executor::block_on;

#[test]
Expand All @@ -12,3 +12,14 @@ fn is_fair() {
assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD);
assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD)
}

#[test]
fn is_biased() {
let mut results = Vec::with_capacity(100);
for _ in 0..100 {
let (i, _) =
block_on(try_select_biased(ok::<_, ()>(0), ok::<_, ()>(1))).unwrap().factor_first();
results.push(i);
}
assert!(results.iter().all(|i| *i == 0));
}

0 comments on commit e0f40c4

Please sign in to comment.