Skip to content

Latest commit

 

History

History
207 lines (163 loc) · 7.17 KB

6-2-select!.md

File metadata and controls

207 lines (163 loc) · 7.17 KB

futures::select宏同时运行多个futures,允许用户在任何future完成时做出响应。

use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}

这个函数将会并发地允许t1t2。当t1或者t2完成了,相应的处理器(handler)将会调用println!,函数将会在剩余标记的任务还未完成的情况下结束。

select的基础语法是<pattern> = <expression> => <code>,,重复你想要selectfuture

default => ... 和 complete => ...

select也支持defaultcomplete分支。

没有future完成时,default将会被执行。selectdefault分支将会立马返回,因为没有其他futures准备好了。

complete分支可用于所有被selectfutures都已经完成。这通常处理循环一个select!

use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}

UpinFusedFuture交互

在上面第一个例子中你可能注意到我们在两个async fn返回的futures上调用了.fuse(),以及通过pin_mut固定了它们。这些都是必要的,因为在select中使用的future必须实现UnpinFusedFuture特征。

Unpin是必要的,因为select中使用的future不是获取的值,而是可变引用。因为不获取future的所有权,没完成的futures可以在调用select时再次使用。

类似地,FusedFuture也是必要的,因为select不能轮询一个已完成的future。实现了FusedFuturefuture会追踪它们是否完成。这让select在循环中使用变为可能,只轮询未完成的future。看看上面的例子,a_fut或者b_fut将在第二次循环的时候完成。因为通过future::ready返回的future实现了fusedFuture,它将会告诉select不要再次轮询了。

记住streams也有相应的FusedStream特征。实现了这个特征或者通过.fuse()包装的Streams将会通过它们的.next()或者.try_next()组合器生成FusedFuturefuture

use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}

通过FuseFuturesUnordered在循环中select实现并发任务

一个有些难以发现,但是很方便的函数是Fuse::terminated(),它可以构造一个空的且终止的future,并且随后可以被一个需要运行的future填充。

当有一个任务需要在select的循环中运行,且此任务在循环的内部创建时非常方便。

记住.select_next_some()函数的作用。它可以与select配合,只运行从流中返回的值为Some(_)的分支,

忽略None

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

当有许多相同future的副本需要同时运行时,使用FuturesUnordered类型。下面的例子与上面的类似,但是会运行每个run_on_new_num_fut的副本直到完成,而不是在创建新的时中止它们。它也会打印run_on_new_num_fut的返回值。

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}