Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add wait for remote settings being set before sending requests #641

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ where
pub fn is_extended_connect_protocol_enabled(&self) -> bool {
self.inner.is_extended_connect_protocol_enabled()
}

/// Returns negotiated max send streams
pub fn max_send_streams(&self) -> usize {
self.inner.max_send_streams()
}
}

impl<B> fmt::Debug for SendRequest<B>
Expand Down
26 changes: 26 additions & 0 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;

use std::task::{Context, Waker};
use std::usize;

#[derive(Debug)]
Expand All @@ -25,6 +26,11 @@ pub(super) struct Counts {

/// Current number of pending locally reset streams
num_reset_streams: usize,

/// If remote settings were applied
remote_settings_applied: bool,

remote_settings_applied_task: Option<Waker>,
}

impl Counts {
Expand All @@ -38,6 +44,8 @@ impl Counts {
num_recv_streams: 0,
max_reset_streams: config.local_reset_max,
num_reset_streams: 0,
remote_settings_applied: false,
remote_settings_applied_task: None,
}
}

Expand Down Expand Up @@ -108,6 +116,8 @@ impl Counts {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
}
self.remote_settings_applied = true;
self.notify_remote_settings_applied()
}

/// Run a block of code that could potentially transition a stream's state.
Expand Down Expand Up @@ -173,6 +183,16 @@ impl Counts {
self.max_send_streams
}

/// Returns if remote settings were applied
pub(crate) fn remote_settings_applied(&self) -> bool {
self.remote_settings_applied
}

/// Sets waker task for remote settings being set
pub(crate) fn wait_remote_settings_applied(&mut self, cx: &Context) {
self.remote_settings_applied_task = Some(cx.waker().clone());
}

/// Returns the maximum number of streams that can be initiated by the
/// remote peer.
pub(crate) fn max_recv_streams(&self) -> usize {
Expand All @@ -197,6 +217,12 @@ impl Counts {
assert!(self.num_reset_streams > 0);
self.num_reset_streams -= 1;
}

fn notify_remote_settings_applied(&mut self) {
if let Some(task) = self.remote_settings_applied_task.take() {
task.wake();
}
}
}

impl Drop for Counts {
Expand Down
4 changes: 4 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,10 @@ where

me.actions.ensure_no_conn_error()?;
me.actions.send.ensure_next_stream_id()?;
if !me.counts.remote_settings_applied() {
me.counts.wait_remote_settings_applied(cx);
return Poll::Pending;
}

if let Some(pending) = pending {
let mut stream = me.store.resolve(pending.key);
Expand Down