forked from tokio-rs/tokio
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
macros: join! start by polling a different future each time poll_fn i…
…s polled Fixes: tokio-rs#4612
- Loading branch information
1 parent
c43832a
commit 2628b37
Showing
6 changed files
with
419 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
use proc_macro::TokenStream; | ||
use proc_macro::{Punct, TokenTree}; | ||
use quote::quote; | ||
use syn::parse::{Parse, ParseStream}; | ||
use syn::{Expr, Token}; | ||
|
||
pub(crate) fn count(input: TokenStream) -> TokenStream { | ||
let count: usize = input.into_iter().count(); | ||
|
||
TokenStream::from(quote!(#count)) | ||
} | ||
|
||
struct Join { | ||
fut_exprs: Vec<Expr>, | ||
} | ||
|
||
impl Parse for Join { | ||
fn parse(input: ParseStream<'_>) -> syn::Result<Self> { | ||
let mut exprs = Vec::new(); | ||
|
||
while !input.is_empty() { | ||
exprs.push(input.parse::<Expr>()?); | ||
|
||
if !input.is_empty() { | ||
input.parse::<Token![,]>()?; | ||
} | ||
} | ||
|
||
Ok(Join { fut_exprs: exprs }) | ||
} | ||
} | ||
|
||
pub(crate) fn test(input: TokenStream) -> TokenStream { | ||
let parsed = syn::parse_macro_input!(input as Join); | ||
|
||
let futures_count = parsed.fut_exprs.len(); | ||
|
||
// let futures = parsed.fut_exprs.into_iter().map(|fut_expr| { | ||
// quote! { | ||
// maybe_done(#fut_expr) | ||
// } | ||
// }); | ||
|
||
let futures = parsed.fut_exprs.into_iter().map(|fut_epxr| fut_epxr); | ||
|
||
let match_statement_branches = (0..futures_count).map(|i| { | ||
quote! { | ||
#i => { | ||
let fut = &mut futures.#i; | ||
|
||
// Safety: future is stored on the stack above | ||
// and never moved. | ||
let mut fut = unsafe { Pin::new_unchecked(fut) }; | ||
|
||
// Try polling | ||
if fut.poll(cx).is_pending() { | ||
is_pending = true; | ||
} | ||
|
||
continue; | ||
} | ||
} | ||
}); | ||
|
||
let ready_output = (0..futures_count).map(|i| { | ||
quote! { | ||
let fut = &mut futures.#i; | ||
|
||
// Safety: future is stored on the stack above | ||
// and never moved. | ||
let mut fut = unsafe { Pin::new_unchecked(fut) }; | ||
|
||
fut.take_output().expect("expected completed future") | ||
} | ||
}); | ||
|
||
TokenStream::from(quote! {{ | ||
use tokio::macros::support::{maybe_done, poll_fn, Future, Pin}; | ||
use tokio::macros::support::Poll::{Ready, Pending}; | ||
|
||
// Safety: nothing must be moved out of `futures`. This is to satisfy | ||
// the requirement of `Pin::new_unchecked` called below. | ||
// let mut futures = #futures; | ||
let mut futures = ( #( maybe_done(#futures) ),* ); | ||
|
||
// How many futures were passed to join!. | ||
const FUTURE_COUNT: u32 = #futures_count; | ||
|
||
// When poll_fn is polled, start polling the future at this index. | ||
let mut start_index = 0; | ||
|
||
poll_fn(move |cx| { | ||
let mut is_pending = false; | ||
|
||
for i in 0..FUTURE_COUNT { | ||
let turn; | ||
|
||
#[allow(clippy::modulo_one)] | ||
{ | ||
turn = (start_index + i) % FUTURE_COUNT | ||
}; | ||
|
||
match turn { | ||
#( #match_statement_branches )* | ||
_ => unreachable!("reaching this means there probably is an off by one bug") | ||
} | ||
} | ||
|
||
if is_pending { | ||
// Start by polling the next future first the next time poll_fn is polled | ||
#[allow(clippy::modulo_one)] | ||
{ | ||
start_index = (start_index + 1) % FUTURE_COUNT; | ||
} | ||
|
||
Pending | ||
} else { | ||
// Ready( #( #ready_output )* ) | ||
} | ||
}).await | ||
}}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.