Skip to content

Commit 253cc74

Browse files
seanmonstarkxt
andauthoredMar 6, 2023
feat(client): add 1.0 compatible client conn API (#3155)
This patch backports client/conn/http1 and http2 modules from 1.0 to ease transition. It allows code still using 0.14.x to switch to the per-version Connection types available in 1.0. Closes #3053 Co-authored-by: KOVACS Tamas <ktamas@fastmail.fm>
1 parent c849339 commit 253cc74

File tree

7 files changed

+1110
-3
lines changed

7 files changed

+1110
-3
lines changed
 

‎.github/workflows/CI.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ jobs:
6262

6363
include:
6464
- rust: stable
65-
features: "--features full"
65+
features: "--features full,backports"
6666
- rust: beta
67-
features: "--features full"
67+
features: "--features full,backports"
6868
- rust: nightly
69-
features: "--features full,nightly"
69+
features: "--features full,nightly,backports"
7070
benches: true
7171

7272
runs-on: ${{ matrix.os }}

‎Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ tcp = [
109109
# C-API support (currently unstable (no semver))
110110
ffi = ["libc"]
111111

112+
# enable 1.0 backports
113+
backports = []
114+
112115
# internal features used in CI
113116
nightly = []
114117
__internal_happy_eyeballs_tests = []

‎src/client/conn.rs

+5
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@
5454
//! # }
5555
//! ```
5656
57+
#[cfg(all(feature = "backports", feature = "http1"))]
58+
pub mod http1;
59+
#[cfg(all(feature = "backports", feature = "http2"))]
60+
pub mod http2;
61+
5762
use std::error::Error as StdError;
5863
use std::fmt;
5964
#[cfg(not(all(feature = "http1", feature = "http2")))]

‎src/client/conn/http1.rs

+539
Large diffs are not rendered by default.

‎src/client/conn/http2.rs

+427
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,427 @@
1+
//! HTTP/2 client connections
2+
3+
use std::error::Error as StdError;
4+
use std::fmt;
5+
use std::marker::PhantomData;
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
9+
use http::{Request, Response};
10+
use tokio::io::{AsyncRead, AsyncWrite};
11+
12+
use super::super::dispatch;
13+
use crate::body::{HttpBody as Body, Body as IncomingBody};
14+
use crate::common::{
15+
exec::{BoxSendFuture, Exec},
16+
task, Future, Pin, Poll,
17+
};
18+
use crate::proto;
19+
use crate::rt::Executor;
20+
21+
/// The sender side of an established connection.
22+
pub struct SendRequest<B> {
23+
dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>,
24+
}
25+
26+
impl<B> Clone for SendRequest<B> {
27+
fn clone(&self) -> SendRequest<B> {
28+
SendRequest { dispatch: self.dispatch.clone() }
29+
}
30+
}
31+
32+
/// A future that processes all HTTP state for the IO object.
33+
///
34+
/// In most cases, this should just be spawned into an executor, so that it
35+
/// can process incoming and outgoing messages, notice hangups, and the like.
36+
#[must_use = "futures do nothing unless polled"]
37+
pub struct Connection<T, B>
38+
where
39+
T: AsyncRead + AsyncWrite + Send + 'static,
40+
B: Body + 'static,
41+
{
42+
inner: (PhantomData<T>, proto::h2::ClientTask<B>),
43+
}
44+
45+
/// A builder to configure an HTTP connection.
46+
///
47+
/// After setting options, the builder is used to create a handshake future.
48+
#[derive(Clone, Debug)]
49+
pub struct Builder {
50+
pub(super) exec: Exec,
51+
h2_builder: proto::h2::client::Config,
52+
}
53+
54+
/// Returns a handshake future over some IO.
55+
///
56+
/// This is a shortcut for `Builder::new().handshake(io)`.
57+
/// See [`client::conn`](crate::client::conn) for more.
58+
pub async fn handshake<E, T, B>(
59+
exec: E,
60+
io: T,
61+
) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
62+
where
63+
E: Executor<BoxSendFuture> + Send + Sync + 'static,
64+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
65+
B: Body + 'static,
66+
B::Data: Send,
67+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
68+
{
69+
Builder::new(exec).handshake(io).await
70+
}
71+
72+
// ===== impl SendRequest
73+
74+
impl<B> SendRequest<B> {
75+
/// Polls to determine whether this sender can be used yet for a request.
76+
///
77+
/// If the associated connection is closed, this returns an Error.
78+
pub fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
79+
if self.is_closed() {
80+
Poll::Ready(Err(crate::Error::new_closed()))
81+
} else {
82+
Poll::Ready(Ok(()))
83+
}
84+
}
85+
86+
/// Waits until the dispatcher is ready
87+
///
88+
/// If the associated connection is closed, this returns an Error.
89+
pub async fn ready(&mut self) -> crate::Result<()> {
90+
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
91+
}
92+
93+
/*
94+
pub(super) async fn when_ready(self) -> crate::Result<Self> {
95+
let mut me = Some(self);
96+
future::poll_fn(move |cx| {
97+
ready!(me.as_mut().unwrap().poll_ready(cx))?;
98+
Poll::Ready(Ok(me.take().unwrap()))
99+
})
100+
.await
101+
}
102+
103+
pub(super) fn is_ready(&self) -> bool {
104+
self.dispatch.is_ready()
105+
}
106+
*/
107+
108+
pub(super) fn is_closed(&self) -> bool {
109+
self.dispatch.is_closed()
110+
}
111+
}
112+
113+
impl<B> SendRequest<B>
114+
where
115+
B: Body + 'static,
116+
{
117+
/// Sends a `Request` on the associated connection.
118+
///
119+
/// Returns a future that if successful, yields the `Response`.
120+
///
121+
/// # Note
122+
///
123+
/// There are some key differences in what automatic things the `Client`
124+
/// does for you that will not be done here:
125+
///
126+
/// - `Client` requires absolute-form `Uri`s, since the scheme and
127+
/// authority are needed to connect. They aren't required here.
128+
/// - Since the `Client` requires absolute-form `Uri`s, it can add
129+
/// the `Host` header based on it. You must add a `Host` header yourself
130+
/// before calling this method.
131+
/// - Since absolute-form `Uri`s are not required, if received, they will
132+
/// be serialized as-is.
133+
pub fn send_request(
134+
&mut self,
135+
req: Request<B>,
136+
) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
137+
let sent = self.dispatch.send(req);
138+
139+
async move {
140+
match sent {
141+
Ok(rx) => match rx.await {
142+
Ok(Ok(resp)) => Ok(resp),
143+
Ok(Err(err)) => Err(err),
144+
// this is definite bug if it happens, but it shouldn't happen!
145+
Err(_canceled) => panic!("dispatch dropped without returning error"),
146+
},
147+
Err(_req) => {
148+
tracing::debug!("connection was not ready");
149+
150+
Err(crate::Error::new_canceled().with("connection was not ready"))
151+
}
152+
}
153+
}
154+
}
155+
156+
/*
157+
pub(super) fn send_request_retryable(
158+
&mut self,
159+
req: Request<B>,
160+
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
161+
where
162+
B: Send,
163+
{
164+
match self.dispatch.try_send(req) {
165+
Ok(rx) => {
166+
Either::Left(rx.then(move |res| {
167+
match res {
168+
Ok(Ok(res)) => future::ok(res),
169+
Ok(Err(err)) => future::err(err),
170+
// this is definite bug if it happens, but it shouldn't happen!
171+
Err(_) => panic!("dispatch dropped without returning error"),
172+
}
173+
}))
174+
}
175+
Err(req) => {
176+
tracing::debug!("connection was not ready");
177+
let err = crate::Error::new_canceled().with("connection was not ready");
178+
Either::Right(future::err((err, Some(req))))
179+
}
180+
}
181+
}
182+
*/
183+
}
184+
185+
impl<B> fmt::Debug for SendRequest<B> {
186+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187+
f.debug_struct("SendRequest").finish()
188+
}
189+
}
190+
191+
// ===== impl Connection
192+
193+
impl<T, B> Connection<T, B>
194+
where
195+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
196+
B: Body + Unpin + Send + 'static,
197+
B::Data: Send,
198+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
199+
{
200+
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
201+
///
202+
/// This setting is configured by the server peer by sending the
203+
/// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
204+
/// This method returns the currently acknowledged value received from the
205+
/// remote.
206+
///
207+
/// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
208+
/// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
209+
pub fn is_extended_connect_protocol_enabled(&self) -> bool {
210+
self.inner.1.is_extended_connect_protocol_enabled()
211+
}
212+
}
213+
214+
impl<T, B> fmt::Debug for Connection<T, B>
215+
where
216+
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
217+
B: Body + 'static,
218+
{
219+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220+
f.debug_struct("Connection").finish()
221+
}
222+
}
223+
224+
impl<T, B> Future for Connection<T, B>
225+
where
226+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
227+
B: Body + Send + 'static,
228+
B::Data: Send,
229+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
230+
{
231+
type Output = crate::Result<()>;
232+
233+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
234+
match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
235+
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
236+
#[cfg(feature = "http1")]
237+
proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
238+
}
239+
}
240+
}
241+
242+
// ===== impl Builder
243+
244+
impl Builder {
245+
/// Creates a new connection builder.
246+
#[inline]
247+
pub fn new<E>(exec: E) -> Builder
248+
where
249+
E: Executor<BoxSendFuture> + Send + Sync + 'static,
250+
{
251+
use std::sync::Arc;
252+
Builder {
253+
exec: Exec::Executor(Arc::new(exec)),
254+
h2_builder: Default::default(),
255+
}
256+
}
257+
258+
/// Provide an executor to execute background HTTP2 tasks.
259+
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
260+
where
261+
E: Executor<BoxSendFuture> + Send + Sync + 'static,
262+
{
263+
self.exec = Exec::Executor(Arc::new(exec));
264+
self
265+
}
266+
267+
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
268+
/// stream-level flow control.
269+
///
270+
/// Passing `None` will do nothing.
271+
///
272+
/// If not set, hyper will use a default.
273+
///
274+
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
275+
pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
276+
if let Some(sz) = sz.into() {
277+
self.h2_builder.adaptive_window = false;
278+
self.h2_builder.initial_stream_window_size = sz;
279+
}
280+
self
281+
}
282+
283+
/// Sets the max connection-level flow control for HTTP2
284+
///
285+
/// Passing `None` will do nothing.
286+
///
287+
/// If not set, hyper will use a default.
288+
pub fn initial_connection_window_size(
289+
&mut self,
290+
sz: impl Into<Option<u32>>,
291+
) -> &mut Self {
292+
if let Some(sz) = sz.into() {
293+
self.h2_builder.adaptive_window = false;
294+
self.h2_builder.initial_conn_window_size = sz;
295+
}
296+
self
297+
}
298+
299+
/// Sets whether to use an adaptive flow control.
300+
///
301+
/// Enabling this will override the limits set in
302+
/// `initial_stream_window_size` and
303+
/// `initial_connection_window_size`.
304+
pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
305+
use proto::h2::SPEC_WINDOW_SIZE;
306+
307+
self.h2_builder.adaptive_window = enabled;
308+
if enabled {
309+
self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
310+
self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
311+
}
312+
self
313+
}
314+
315+
/// Sets the maximum frame size to use for HTTP2.
316+
///
317+
/// Passing `None` will do nothing.
318+
///
319+
/// If not set, hyper will use a default.
320+
pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
321+
if let Some(sz) = sz.into() {
322+
self.h2_builder.max_frame_size = sz;
323+
}
324+
self
325+
}
326+
327+
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
328+
/// connection alive.
329+
///
330+
/// Pass `None` to disable HTTP2 keep-alive.
331+
///
332+
/// Default is currently disabled.
333+
#[cfg(feature = "runtime")]
334+
pub fn keep_alive_interval(
335+
&mut self,
336+
interval: impl Into<Option<Duration>>,
337+
) -> &mut Self {
338+
self.h2_builder.keep_alive_interval = interval.into();
339+
self
340+
}
341+
342+
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
343+
///
344+
/// If the ping is not acknowledged within the timeout, the connection will
345+
/// be closed. Does nothing if `keep_alive_interval` is disabled.
346+
///
347+
/// Default is 20 seconds.
348+
#[cfg(feature = "runtime")]
349+
pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
350+
self.h2_builder.keep_alive_timeout = timeout;
351+
self
352+
}
353+
354+
/// Sets whether HTTP2 keep-alive should apply while the connection is idle.
355+
///
356+
/// If disabled, keep-alive pings are only sent while there are open
357+
/// request/responses streams. If enabled, pings are also sent when no
358+
/// streams are active. Does nothing if `keep_alive_interval` is
359+
/// disabled.
360+
///
361+
/// Default is `false`.
362+
#[cfg(feature = "runtime")]
363+
pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
364+
self.h2_builder.keep_alive_while_idle = enabled;
365+
self
366+
}
367+
368+
/// Sets the maximum number of HTTP2 concurrent locally reset streams.
369+
///
370+
/// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
371+
/// details.
372+
///
373+
/// The default value is determined by the `h2` crate.
374+
///
375+
/// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
376+
pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
377+
self.h2_builder.max_concurrent_reset_streams = Some(max);
378+
self
379+
}
380+
381+
/// Set the maximum write buffer size for each HTTP/2 stream.
382+
///
383+
/// Default is currently 1MB, but may change.
384+
///
385+
/// # Panics
386+
///
387+
/// The value must be no larger than `u32::MAX`.
388+
pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
389+
assert!(max <= std::u32::MAX as usize);
390+
self.h2_builder.max_send_buffer_size = max;
391+
self
392+
}
393+
394+
/// Constructs a connection with the configured options and IO.
395+
/// See [`client::conn`](crate::client::conn) for more.
396+
///
397+
/// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
398+
/// do nothing.
399+
pub fn handshake<T, B>(
400+
&self,
401+
io: T,
402+
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
403+
where
404+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
405+
B: Body + 'static,
406+
B::Data: Send,
407+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
408+
{
409+
let opts = self.clone();
410+
411+
async move {
412+
tracing::trace!("client handshake HTTP/1");
413+
414+
let (tx, rx) = dispatch::channel();
415+
let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec)
416+
.await?;
417+
Ok((
418+
SendRequest {
419+
dispatch: tx.unbound(),
420+
},
421+
Connection {
422+
inner: (PhantomData, h2),
423+
},
424+
))
425+
}
426+
}
427+
}

‎src/client/dispatch.rs

+9
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,15 @@ impl<T, U> UnboundedSender<T, U> {
128128
.map(move |_| rx)
129129
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
130130
}
131+
132+
#[cfg(all(feature = "backports", feature = "http2"))]
133+
pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
134+
let (tx, rx) = oneshot::channel();
135+
self.inner
136+
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
137+
.map(move |_| rx)
138+
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
139+
}
131140
}
132141

133142
#[cfg(feature = "http2")]

‎tests/client.rs

+124
Original file line numberDiff line numberDiff line change
@@ -2246,6 +2246,130 @@ mod conn {
22462246
future::join(server, client).await;
22472247
}
22482248

2249+
#[cfg(feature = "backports")]
2250+
mod backports {
2251+
use super::*;
2252+
#[tokio::test]
2253+
async fn get() {
2254+
let _ = ::pretty_env_logger::try_init();
2255+
let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
2256+
.await
2257+
.unwrap();
2258+
let addr = listener.local_addr().unwrap();
2259+
2260+
let server = async move {
2261+
let mut sock = listener.accept().await.unwrap().0;
2262+
let mut buf = [0; 4096];
2263+
let n = sock.read(&mut buf).await.expect("read 1");
2264+
2265+
// Notably:
2266+
// - Just a path, since just a path was set
2267+
// - No host, since no host was set
2268+
let expected = "GET /a HTTP/1.1\r\n\r\n";
2269+
assert_eq!(s(&buf[..n]), expected);
2270+
2271+
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
2272+
.await
2273+
.unwrap();
2274+
};
2275+
2276+
let client = async move {
2277+
let tcp = tcp_connect(&addr).await.expect("connect");
2278+
let (mut client, conn) = conn::http1::handshake(tcp).await.expect("handshake");
2279+
2280+
tokio::task::spawn(async move {
2281+
conn.await.expect("http conn");
2282+
});
2283+
2284+
let req: Request<Body> = Request::builder()
2285+
.uri("/a")
2286+
.body(Default::default())
2287+
.unwrap();
2288+
let mut res = client.send_request(req).await.expect("send_request");
2289+
assert_eq!(res.status(), hyper::StatusCode::OK);
2290+
assert!(res.body_mut().next().await.is_none());
2291+
};
2292+
2293+
future::join(server, client).await;
2294+
}
2295+
2296+
#[tokio::test]
2297+
async fn http2_detect_conn_eof() {
2298+
use futures_util::future;
2299+
use hyper::service::{make_service_fn, service_fn};
2300+
use hyper::{Response, Server};
2301+
2302+
let _ = pretty_env_logger::try_init();
2303+
2304+
let server = Server::bind(&([127, 0, 0, 1], 0).into())
2305+
.http2_only(true)
2306+
.serve(make_service_fn(|_| async move {
2307+
Ok::<_, hyper::Error>(service_fn(|_req| {
2308+
future::ok::<_, hyper::Error>(Response::new(Body::empty()))
2309+
}))
2310+
}));
2311+
let addr = server.local_addr();
2312+
let (shdn_tx, shdn_rx) = oneshot::channel();
2313+
tokio::task::spawn(async move {
2314+
server
2315+
.with_graceful_shutdown(async move {
2316+
let _ = shdn_rx.await;
2317+
})
2318+
.await
2319+
.expect("server")
2320+
});
2321+
2322+
struct TokioExec;
2323+
impl<F> hyper::rt::Executor<F> for TokioExec
2324+
where
2325+
F: std::future::Future + Send + 'static,
2326+
F::Output: Send + 'static,
2327+
{
2328+
fn execute(&self, fut: F) {
2329+
tokio::spawn(fut);
2330+
}
2331+
}
2332+
2333+
let io = tcp_connect(&addr).await.expect("tcp connect");
2334+
let (mut client, conn) = conn::http2::Builder::new(TokioExec)
2335+
.handshake::<_, Body>(io)
2336+
.await
2337+
.expect("http handshake");
2338+
2339+
tokio::task::spawn(async move {
2340+
conn.await.expect("client conn");
2341+
});
2342+
2343+
// Sanity check that client is ready
2344+
future::poll_fn(|ctx| client.poll_ready(ctx))
2345+
.await
2346+
.expect("client poll ready sanity");
2347+
2348+
let req = Request::builder()
2349+
.uri(format!("http://{}/", addr))
2350+
.body(Body::empty())
2351+
.expect("request builder");
2352+
2353+
client.send_request(req).await.expect("req1 send");
2354+
2355+
// Sanity check that client is STILL ready
2356+
future::poll_fn(|ctx| client.poll_ready(ctx))
2357+
.await
2358+
.expect("client poll ready after");
2359+
2360+
// Trigger the server shutdown...
2361+
let _ = shdn_tx.send(());
2362+
2363+
// Allow time for graceful shutdown roundtrips...
2364+
tokio::time::sleep(Duration::from_millis(100)).await;
2365+
2366+
// After graceful shutdown roundtrips, the client should be closed...
2367+
future::poll_fn(|ctx| client.poll_ready(ctx))
2368+
.await
2369+
.expect_err("client should be closed");
2370+
}
2371+
}
2372+
22492373
#[tokio::test]
22502374
async fn get_obsolete_line_folding() {
22512375
let _ = ::pretty_env_logger::try_init();

0 commit comments

Comments
 (0)
Please sign in to comment.