Skip to content

Commit 84881c9

Browse files
authoredMar 8, 2023
feat(server): backport the split server conn modules from 1.0 (#3102)
Closes #3079
1 parent 0368a41 commit 84881c9

File tree

4 files changed

+853
-2
lines changed

4 files changed

+853
-2
lines changed
 

‎src/server/conn.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ use crate::error::{Kind, Parse};
5858
#[cfg(feature = "http1")]
5959
use crate::upgrade::Upgraded;
6060

61+
#[cfg(all(feature = "backports", feature = "http1"))]
62+
pub mod http1;
63+
#[cfg(all(feature = "backports", feature = "http2"))]
64+
pub mod http2;
65+
6166
cfg_feature! {
6267
#![any(feature = "http1", feature = "http2")]
6368

@@ -327,7 +332,7 @@ impl<E> Http<E> {
327332
self
328333
}
329334

330-
/// Set a timeout for reading client request headers. If a client does not
335+
/// Set a timeout for reading client request headers. If a client does not
331336
/// transmit the entire header within this time, the connection is closed.
332337
///
333338
/// Default is None.
@@ -809,7 +814,12 @@ where
809814
let mut conn = Some(self);
810815
futures_util::future::poll_fn(move |cx| {
811816
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
812-
Poll::Ready(conn.take().unwrap().try_into_parts().ok_or_else(crate::Error::new_without_shutdown_not_h1))
817+
Poll::Ready(
818+
conn.take()
819+
.unwrap()
820+
.try_into_parts()
821+
.ok_or_else(crate::Error::new_without_shutdown_not_h1),
822+
)
813823
})
814824
}
815825

‎src/server/conn/http1.rs

+446
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,446 @@
1+
//! HTTP/1 Server Connections
2+
3+
use std::error::Error as StdError;
4+
use std::fmt;
5+
use std::time::Duration;
6+
7+
use bytes::Bytes;
8+
use tokio::io::{AsyncRead, AsyncWrite};
9+
10+
use crate::body::{Body as IncomingBody, HttpBody as Body};
11+
use crate::common::{task, Future, Pin, Poll, Unpin};
12+
use crate::proto;
13+
use crate::service::HttpService;
14+
15+
type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
16+
proto::h1::dispatch::Server<S, IncomingBody>,
17+
B,
18+
T,
19+
proto::ServerTransaction,
20+
>;
21+
22+
pin_project_lite::pin_project! {
23+
/// A future binding an http1 connection with a Service.
24+
///
25+
/// Polling this future will drive HTTP forward.
26+
#[must_use = "futures do nothing unless polled"]
27+
pub struct Connection<T, S>
28+
where
29+
S: HttpService<IncomingBody>,
30+
{
31+
conn: Http1Dispatcher<T, S::ResBody, S>,
32+
}
33+
}
34+
35+
/// A configuration builder for HTTP/1 server connections.
36+
#[derive(Clone, Debug)]
37+
pub struct Builder {
38+
h1_half_close: bool,
39+
h1_keep_alive: bool,
40+
h1_title_case_headers: bool,
41+
h1_preserve_header_case: bool,
42+
h1_header_read_timeout: Option<Duration>,
43+
h1_writev: Option<bool>,
44+
max_buf_size: Option<usize>,
45+
pipeline_flush: bool,
46+
}
47+
48+
/// Deconstructed parts of a `Connection`.
49+
///
50+
/// This allows taking apart a `Connection` at a later time, in order to
51+
/// reclaim the IO object, and additional related pieces.
52+
#[derive(Debug)]
53+
pub struct Parts<T, S> {
54+
/// The original IO object used in the handshake.
55+
pub io: T,
56+
/// A buffer of bytes that have been read but not processed as HTTP.
57+
///
58+
/// If the client sent additional bytes after its last request, and
59+
/// this connection "ended" with an upgrade, the read buffer will contain
60+
/// those bytes.
61+
///
62+
/// You will want to check for any existing bytes if you plan to continue
63+
/// communicating on the IO object.
64+
pub read_buf: Bytes,
65+
/// The `Service` used to serve this connection.
66+
pub service: S,
67+
_inner: (),
68+
}
69+
70+
// ===== impl Connection =====
71+
72+
impl<I, S> fmt::Debug for Connection<I, S>
73+
where
74+
S: HttpService<IncomingBody>,
75+
{
76+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77+
f.debug_struct("Connection").finish()
78+
}
79+
}
80+
81+
impl<I, B, S> Connection<I, S>
82+
where
83+
S: HttpService<IncomingBody, ResBody = B>,
84+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
85+
I: AsyncRead + AsyncWrite + Unpin,
86+
B: Body + 'static,
87+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
88+
{
89+
/// Start a graceful shutdown process for this connection.
90+
///
91+
/// This `Connection` should continue to be polled until shutdown
92+
/// can finish.
93+
///
94+
/// # Note
95+
///
96+
/// This should only be called while the `Connection` future is still
97+
/// pending. If called after `Connection::poll` has resolved, this does
98+
/// nothing.
99+
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
100+
self.conn.disable_keep_alive();
101+
}
102+
103+
/// Return the inner IO object, and additional information.
104+
///
105+
/// If the IO object has been "rewound" the io will not contain those bytes rewound.
106+
/// This should only be called after `poll_without_shutdown` signals
107+
/// that the connection is "done". Otherwise, it may not have finished
108+
/// flushing all necessary HTTP bytes.
109+
///
110+
/// # Panics
111+
/// This method will panic if this connection is using an h2 protocol.
112+
pub fn into_parts(self) -> Parts<I, S> {
113+
let (io, read_buf, dispatch) = self.conn.into_inner();
114+
Parts {
115+
io,
116+
read_buf,
117+
service: dispatch.into_service(),
118+
_inner: (),
119+
}
120+
}
121+
122+
/// Poll the connection for completion, but without calling `shutdown`
123+
/// on the underlying IO.
124+
///
125+
/// This is useful to allow running a connection while doing an HTTP
126+
/// upgrade. Once the upgrade is completed, the connection would be "done",
127+
/// but it is not desired to actually shutdown the IO object. Instead you
128+
/// would take it back using `into_parts`.
129+
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
130+
where
131+
S: Unpin,
132+
S::Future: Unpin,
133+
B: Unpin,
134+
{
135+
self.conn.poll_without_shutdown(cx)
136+
}
137+
138+
/// Prevent shutdown of the underlying IO object at the end of service the request,
139+
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
140+
///
141+
/// # Error
142+
///
143+
/// This errors if the underlying connection protocol is not HTTP/1.
144+
pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
145+
where
146+
S: Unpin,
147+
S::Future: Unpin,
148+
B: Unpin,
149+
{
150+
let mut zelf = Some(self);
151+
futures_util::future::poll_fn(move |cx| {
152+
ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
153+
Poll::Ready(Ok(zelf.take().unwrap().into_parts()))
154+
})
155+
}
156+
157+
/// Enable this connection to support higher-level HTTP upgrades.
158+
///
159+
/// See [the `upgrade` module](crate::upgrade) for more.
160+
pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<I, S>
161+
where
162+
I: Send,
163+
{
164+
upgrades::UpgradeableConnection { inner: Some(self) }
165+
}
166+
}
167+
168+
impl<I, B, S> Future for Connection<I, S>
169+
where
170+
S: HttpService<IncomingBody, ResBody = B>,
171+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
172+
I: AsyncRead + AsyncWrite + Unpin + 'static,
173+
B: Body + 'static,
174+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
175+
{
176+
type Output = crate::Result<()>;
177+
178+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
179+
match ready!(Pin::new(&mut self.conn).poll(cx)) {
180+
Ok(done) => {
181+
match done {
182+
proto::Dispatched::Shutdown => {}
183+
proto::Dispatched::Upgrade(pending) => {
184+
// With no `Send` bound on `I`, we can't try to do
185+
// upgrades here. In case a user was trying to use
186+
// `Body::on_upgrade` with this API, send a special
187+
// error letting them know about that.
188+
pending.manual();
189+
}
190+
};
191+
return Poll::Ready(Ok(()));
192+
}
193+
Err(e) => Poll::Ready(Err(e)),
194+
}
195+
}
196+
}
197+
198+
// ===== impl Builder =====
199+
200+
impl Builder {
201+
/// Create a new connection builder.
202+
pub fn new() -> Self {
203+
Self {
204+
h1_half_close: false,
205+
h1_keep_alive: true,
206+
h1_title_case_headers: false,
207+
h1_preserve_header_case: false,
208+
h1_header_read_timeout: None,
209+
h1_writev: None,
210+
max_buf_size: None,
211+
pipeline_flush: false,
212+
}
213+
}
214+
/// Set whether HTTP/1 connections should support half-closures.
215+
///
216+
/// Clients can chose to shutdown their write-side while waiting
217+
/// for the server to respond. Setting this to `true` will
218+
/// prevent closing the connection immediately if `read`
219+
/// detects an EOF in the middle of a request.
220+
///
221+
/// Default is `false`.
222+
pub fn half_close(&mut self, val: bool) -> &mut Self {
223+
self.h1_half_close = val;
224+
self
225+
}
226+
227+
/// Enables or disables HTTP/1 keep-alive.
228+
///
229+
/// Default is true.
230+
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
231+
self.h1_keep_alive = val;
232+
self
233+
}
234+
235+
/// Set whether HTTP/1 connections will write header names as title case at
236+
/// the socket level.
237+
///
238+
/// Default is false.
239+
pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
240+
self.h1_title_case_headers = enabled;
241+
self
242+
}
243+
244+
/// Set whether to support preserving original header cases.
245+
///
246+
/// Currently, this will record the original cases received, and store them
247+
/// in a private extension on the `Request`. It will also look for and use
248+
/// such an extension in any provided `Response`.
249+
///
250+
/// Since the relevant extension is still private, there is no way to
251+
/// interact with the original cases. The only effect this can have now is
252+
/// to forward the cases in a proxy-like fashion.
253+
///
254+
/// Default is false.
255+
pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
256+
self.h1_preserve_header_case = enabled;
257+
self
258+
}
259+
260+
/// Set a timeout for reading client request headers. If a client does not
261+
/// transmit the entire header within this time, the connection is closed.
262+
///
263+
/// Default is None.
264+
pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
265+
self.h1_header_read_timeout = Some(read_timeout);
266+
self
267+
}
268+
269+
/// Set whether HTTP/1 connections should try to use vectored writes,
270+
/// or always flatten into a single buffer.
271+
///
272+
/// Note that setting this to false may mean more copies of body data,
273+
/// but may also improve performance when an IO transport doesn't
274+
/// support vectored writes well, such as most TLS implementations.
275+
///
276+
/// Setting this to true will force hyper to use queued strategy
277+
/// which may eliminate unnecessary cloning on some TLS backends
278+
///
279+
/// Default is `auto`. In this mode hyper will try to guess which
280+
/// mode to use
281+
pub fn writev(&mut self, val: bool) -> &mut Self {
282+
self.h1_writev = Some(val);
283+
self
284+
}
285+
286+
/// Set the maximum buffer size for the connection.
287+
///
288+
/// Default is ~400kb.
289+
///
290+
/// # Panics
291+
///
292+
/// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
293+
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
294+
assert!(
295+
max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
296+
"the max_buf_size cannot be smaller than the minimum that h1 specifies."
297+
);
298+
self.max_buf_size = Some(max);
299+
self
300+
}
301+
302+
/// Aggregates flushes to better support pipelined responses.
303+
///
304+
/// Experimental, may have bugs.
305+
///
306+
/// Default is false.
307+
pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
308+
self.pipeline_flush = enabled;
309+
self
310+
}
311+
312+
// /// Set the timer used in background tasks.
313+
// pub fn timer<M>(&mut self, timer: M) -> &mut Self
314+
// where
315+
// M: Timer + Send + Sync + 'static,
316+
// {
317+
// self.timer = Time::Timer(Arc::new(timer));
318+
// self
319+
// }
320+
321+
/// Bind a connection together with a [`Service`](crate::service::Service).
322+
///
323+
/// This returns a Future that must be polled in order for HTTP to be
324+
/// driven on the connection.
325+
///
326+
/// # Example
327+
///
328+
/// ```
329+
/// # use hyper::{Body as Incoming, Request, Response};
330+
/// # use hyper::service::Service;
331+
/// # use hyper::server::conn::http1::Builder;
332+
/// # use tokio::io::{AsyncRead, AsyncWrite};
333+
/// # async fn run<I, S>(some_io: I, some_service: S)
334+
/// # where
335+
/// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
336+
/// # S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static,
337+
/// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
338+
/// # S::Future: Send,
339+
/// # {
340+
/// let http = Builder::new();
341+
/// let conn = http.serve_connection(some_io, some_service);
342+
///
343+
/// if let Err(e) = conn.await {
344+
/// eprintln!("server connection error: {}", e);
345+
/// }
346+
/// # }
347+
/// # fn main() {}
348+
/// ```
349+
pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S>
350+
where
351+
S: HttpService<IncomingBody>,
352+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
353+
S::ResBody: 'static,
354+
<S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
355+
I: AsyncRead + AsyncWrite + Unpin,
356+
{
357+
let mut conn = proto::Conn::new(io);
358+
if !self.h1_keep_alive {
359+
conn.disable_keep_alive();
360+
}
361+
if self.h1_half_close {
362+
conn.set_allow_half_close();
363+
}
364+
if self.h1_title_case_headers {
365+
conn.set_title_case_headers();
366+
}
367+
if self.h1_preserve_header_case {
368+
conn.set_preserve_header_case();
369+
}
370+
if let Some(header_read_timeout) = self.h1_header_read_timeout {
371+
conn.set_http1_header_read_timeout(header_read_timeout);
372+
}
373+
if let Some(writev) = self.h1_writev {
374+
if writev {
375+
conn.set_write_strategy_queue();
376+
} else {
377+
conn.set_write_strategy_flatten();
378+
}
379+
}
380+
conn.set_flush_pipeline(self.pipeline_flush);
381+
if let Some(max) = self.max_buf_size {
382+
conn.set_max_buf_size(max);
383+
}
384+
let sd = proto::h1::dispatch::Server::new(service);
385+
let proto = proto::h1::Dispatcher::new(sd, conn);
386+
Connection { conn: proto }
387+
}
388+
}
389+
390+
mod upgrades {
391+
use crate::upgrade::Upgraded;
392+
393+
use super::*;
394+
395+
// A future binding a connection with a Service with Upgrade support.
396+
//
397+
// This type is unnameable outside the crate.
398+
#[must_use = "futures do nothing unless polled"]
399+
#[allow(missing_debug_implementations)]
400+
pub struct UpgradeableConnection<T, S>
401+
where
402+
S: HttpService<IncomingBody>,
403+
{
404+
pub(super) inner: Option<Connection<T, S>>,
405+
}
406+
407+
impl<I, B, S> UpgradeableConnection<I, S>
408+
where
409+
S: HttpService<IncomingBody, ResBody = B>,
410+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
411+
I: AsyncRead + AsyncWrite + Unpin,
412+
B: Body + 'static,
413+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
414+
{
415+
/// Start a graceful shutdown process for this connection.
416+
///
417+
/// This `Connection` should continue to be polled until shutdown
418+
/// can finish.
419+
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
420+
Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
421+
}
422+
}
423+
424+
impl<I, B, S> Future for UpgradeableConnection<I, S>
425+
where
426+
S: HttpService<IncomingBody, ResBody = B>,
427+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
428+
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
429+
B: Body + 'static,
430+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
431+
{
432+
type Output = crate::Result<()>;
433+
434+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
435+
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
436+
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
437+
Ok(proto::Dispatched::Upgrade(pending)) => {
438+
let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
439+
pending.fulfill(Upgraded::new(io, buf));
440+
Poll::Ready(Ok(()))
441+
}
442+
Err(e) => Poll::Ready(Err(e)),
443+
}
444+
}
445+
}
446+
}

‎src/server/conn/http2.rs

+257
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
//! HTTP/2 Server Connections
2+
3+
use std::error::Error as StdError;
4+
use std::fmt;
5+
use std::time::Duration;
6+
7+
use pin_project_lite::pin_project;
8+
use tokio::io::{AsyncRead, AsyncWrite};
9+
10+
use crate::body::{Body as IncomingBody, HttpBody as Body};
11+
use crate::common::exec::ConnStreamExec;
12+
use crate::common::{task, Future, Pin, Poll, Unpin};
13+
use crate::proto;
14+
use crate::service::HttpService;
15+
16+
pin_project! {
17+
/// A future binding an HTTP/2 connection with a Service.
18+
///
19+
/// Polling this future will drive HTTP forward.
20+
#[must_use = "futures do nothing unless polled"]
21+
pub struct Connection<T, S, E>
22+
where
23+
S: HttpService<IncomingBody>,
24+
{
25+
conn: proto::h2::Server<T, S, S::ResBody, E>,
26+
}
27+
}
28+
29+
/// A configuration builder for HTTP/2 server connections.
30+
#[derive(Clone, Debug)]
31+
pub struct Builder<E> {
32+
exec: E,
33+
h2_builder: proto::h2::server::Config,
34+
}
35+
36+
// ===== impl Connection =====
37+
38+
impl<I, S, E> fmt::Debug for Connection<I, S, E>
39+
where
40+
S: HttpService<IncomingBody>,
41+
{
42+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43+
f.debug_struct("Connection").finish()
44+
}
45+
}
46+
47+
impl<I, B, S, E> Connection<I, S, E>
48+
where
49+
S: HttpService<IncomingBody, ResBody = B>,
50+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
51+
I: AsyncRead + AsyncWrite + Unpin,
52+
B: Body + 'static,
53+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
54+
E: ConnStreamExec<S::Future, B>,
55+
{
56+
/// Start a graceful shutdown process for this connection.
57+
///
58+
/// This `Connection` should continue to be polled until shutdown
59+
/// can finish.
60+
///
61+
/// # Note
62+
///
63+
/// This should only be called while the `Connection` future is still
64+
/// pending. If called after `Connection::poll` has resolved, this does
65+
/// nothing.
66+
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
67+
self.conn.graceful_shutdown();
68+
}
69+
}
70+
71+
impl<I, B, S, E> Future for Connection<I, S, E>
72+
where
73+
S: HttpService<IncomingBody, ResBody = B>,
74+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
75+
I: AsyncRead + AsyncWrite + Unpin + 'static,
76+
B: Body + 'static,
77+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
78+
E: ConnStreamExec<S::Future, B>,
79+
{
80+
type Output = crate::Result<()>;
81+
82+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
83+
match ready!(Pin::new(&mut self.conn).poll(cx)) {
84+
Ok(_done) => {
85+
//TODO: the proto::h2::Server no longer needs to return
86+
//the Dispatched enum
87+
Poll::Ready(Ok(()))
88+
}
89+
Err(e) => Poll::Ready(Err(e)),
90+
}
91+
}
92+
}
93+
94+
// ===== impl Builder =====
95+
96+
impl<E> Builder<E> {
97+
/// Create a new connection builder.
98+
///
99+
/// This starts with the default options, and an executor.
100+
pub fn new(exec: E) -> Self {
101+
Self {
102+
exec: exec,
103+
h2_builder: Default::default(),
104+
}
105+
}
106+
107+
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
108+
/// stream-level flow control.
109+
///
110+
/// Passing `None` will do nothing.
111+
///
112+
/// If not set, hyper will use a default.
113+
///
114+
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
115+
pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
116+
if let Some(sz) = sz.into() {
117+
self.h2_builder.adaptive_window = false;
118+
self.h2_builder.initial_stream_window_size = sz;
119+
}
120+
self
121+
}
122+
123+
/// Sets the max connection-level flow control for HTTP2.
124+
///
125+
/// Passing `None` will do nothing.
126+
///
127+
/// If not set, hyper will use a default.
128+
pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
129+
if let Some(sz) = sz.into() {
130+
self.h2_builder.adaptive_window = false;
131+
self.h2_builder.initial_conn_window_size = sz;
132+
}
133+
self
134+
}
135+
136+
/// Sets whether to use an adaptive flow control.
137+
///
138+
/// Enabling this will override the limits set in
139+
/// `initial_stream_window_size` and
140+
/// `initial_connection_window_size`.
141+
pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
142+
use proto::h2::SPEC_WINDOW_SIZE;
143+
144+
self.h2_builder.adaptive_window = enabled;
145+
if enabled {
146+
self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
147+
self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
148+
}
149+
self
150+
}
151+
152+
/// Sets the maximum frame size to use for HTTP2.
153+
///
154+
/// Passing `None` will do nothing.
155+
///
156+
/// If not set, hyper will use a default.
157+
pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
158+
if let Some(sz) = sz.into() {
159+
self.h2_builder.max_frame_size = sz;
160+
}
161+
self
162+
}
163+
164+
/// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
165+
/// connections.
166+
///
167+
/// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
168+
///
169+
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
170+
pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
171+
self.h2_builder.max_concurrent_streams = max.into();
172+
self
173+
}
174+
175+
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
176+
/// connection alive.
177+
///
178+
/// Pass `None` to disable HTTP2 keep-alive.
179+
///
180+
/// Default is currently disabled.
181+
///
182+
/// # Cargo Feature
183+
///
184+
pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
185+
self.h2_builder.keep_alive_interval = interval.into();
186+
self
187+
}
188+
189+
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
190+
///
191+
/// If the ping is not acknowledged within the timeout, the connection will
192+
/// be closed. Does nothing if `keep_alive_interval` is disabled.
193+
///
194+
/// Default is 20 seconds.
195+
///
196+
/// # Cargo Feature
197+
///
198+
pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
199+
self.h2_builder.keep_alive_timeout = timeout;
200+
self
201+
}
202+
203+
/// Set the maximum write buffer size for each HTTP/2 stream.
204+
///
205+
/// Default is currently ~400KB, but may change.
206+
///
207+
/// # Panics
208+
///
209+
/// The value must be no larger than `u32::MAX`.
210+
pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
211+
assert!(max <= std::u32::MAX as usize);
212+
self.h2_builder.max_send_buffer_size = max;
213+
self
214+
}
215+
216+
/// Enables the [extended CONNECT protocol].
217+
///
218+
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
219+
pub fn enable_connect_protocol(&mut self) -> &mut Self {
220+
self.h2_builder.enable_connect_protocol = true;
221+
self
222+
}
223+
224+
/// Sets the max size of received header frames.
225+
///
226+
/// Default is currently ~16MB, but may change.
227+
pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
228+
self.h2_builder.max_header_list_size = max;
229+
self
230+
}
231+
232+
// /// Set the timer used in background tasks.
233+
// pub fn timer<M>(&mut self, timer: M) -> &mut Self
234+
// where
235+
// M: Timer + Send + Sync + 'static,
236+
// {
237+
// self.timer = Time::Timer(Arc::new(timer));
238+
// self
239+
// }
240+
241+
/// Bind a connection together with a [`Service`](crate::service::Service).
242+
///
243+
/// This returns a Future that must be polled in order for HTTP to be
244+
/// driven on the connection.
245+
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
246+
where
247+
S: HttpService<IncomingBody, ResBody = Bd>,
248+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
249+
Bd: Body + 'static,
250+
Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
251+
I: AsyncRead + AsyncWrite + Unpin,
252+
E: ConnStreamExec<S::Future, Bd>,
253+
{
254+
let proto = proto::h2::Server::new(io, service, &self.h2_builder, self.exec.clone());
255+
Connection { conn: proto }
256+
}
257+
}

‎tests/server.rs

+138
Original file line numberDiff line numberDiff line change
@@ -2641,6 +2641,144 @@ async fn http2_keep_alive_count_server_pings() {
26412641
.expect("timed out waiting for pings");
26422642
}
26432643

2644+
// Tests for backported 1.0 APIs
2645+
mod backports {
2646+
use super::*;
2647+
use hyper::server::conn::{http1, http2};
2648+
2649+
#[tokio::test]
2650+
async fn http_connect() {
2651+
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
2652+
let addr = listener.local_addr().unwrap();
2653+
2654+
let (tx, rx) = oneshot::channel();
2655+
2656+
thread::spawn(move || {
2657+
let mut tcp = connect(&addr);
2658+
tcp.write_all(
2659+
b"\
2660+
CONNECT localhost:80 HTTP/1.1\r\n\
2661+
\r\n\
2662+
eagerly optimistic\
2663+
",
2664+
)
2665+
.expect("write 1");
2666+
let mut buf = [0; 256];
2667+
tcp.read(&mut buf).expect("read 1");
2668+
2669+
let expected = "HTTP/1.1 200 OK\r\n";
2670+
assert_eq!(s(&buf[..expected.len()]), expected);
2671+
let _ = tx.send(());
2672+
2673+
let n = tcp.read(&mut buf).expect("read 2");
2674+
assert_eq!(s(&buf[..n]), "foo=bar");
2675+
tcp.write_all(b"bar=foo").expect("write 2");
2676+
});
2677+
2678+
let (socket, _) = listener.accept().await.unwrap();
2679+
let conn = http1::Builder::new().serve_connection(
2680+
socket,
2681+
service_fn(|_| {
2682+
// In 1.0 we would use `http_body_util::Empty::<Bytes>::new()` to construct
2683+
// an empty body
2684+
let res = Response::builder().status(200).body(Body::empty()).unwrap();
2685+
future::ready(Ok::<_, hyper::Error>(res))
2686+
}),
2687+
);
2688+
2689+
let parts = conn.without_shutdown().await.unwrap();
2690+
assert_eq!(parts.read_buf, "eagerly optimistic");
2691+
2692+
// wait so that we don't write until other side saw 101 response
2693+
rx.await.unwrap();
2694+
2695+
let mut io = parts.io;
2696+
io.write_all(b"foo=bar").await.unwrap();
2697+
let mut vec = vec![];
2698+
io.read_to_end(&mut vec).await.unwrap();
2699+
assert_eq!(vec, b"bar=foo");
2700+
}
2701+
2702+
#[tokio::test]
2703+
async fn h2_connect() {
2704+
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
2705+
let addr = listener.local_addr().unwrap();
2706+
2707+
let conn = connect_async(addr).await;
2708+
2709+
let (h2, connection) = h2::client::handshake(conn).await.unwrap();
2710+
tokio::spawn(async move {
2711+
connection.await.unwrap();
2712+
});
2713+
let mut h2 = h2.ready().await.unwrap();
2714+
2715+
async fn connect_and_recv_bread(
2716+
h2: &mut SendRequest<Bytes>,
2717+
) -> (RecvStream, SendStream<Bytes>) {
2718+
let request = Request::connect("localhost").body(()).unwrap();
2719+
let (response, send_stream) = h2.send_request(request, false).unwrap();
2720+
let response = response.await.unwrap();
2721+
assert_eq!(response.status(), StatusCode::OK);
2722+
2723+
let mut body = response.into_body();
2724+
let bytes = body.data().await.unwrap().unwrap();
2725+
assert_eq!(&bytes[..], b"Bread?");
2726+
let _ = body.flow_control().release_capacity(bytes.len());
2727+
2728+
(body, send_stream)
2729+
}
2730+
2731+
tokio::spawn(async move {
2732+
let (mut recv_stream, mut send_stream) = connect_and_recv_bread(&mut h2).await;
2733+
2734+
send_stream.send_data("Baguette!".into(), true).unwrap();
2735+
2736+
assert!(recv_stream.data().await.unwrap().unwrap().is_empty());
2737+
});
2738+
2739+
// In 1.0 the `Body` struct is renamed to `IncomingBody`
2740+
let svc = service_fn(move |req: Request<Body>| {
2741+
let on_upgrade = hyper::upgrade::on(req);
2742+
2743+
tokio::spawn(async move {
2744+
let mut upgraded = on_upgrade.await.expect("on_upgrade");
2745+
upgraded.write_all(b"Bread?").await.unwrap();
2746+
2747+
let mut vec = vec![];
2748+
upgraded.read_to_end(&mut vec).await.unwrap();
2749+
assert_eq!(s(&vec), "Baguette!");
2750+
2751+
upgraded.shutdown().await.unwrap();
2752+
});
2753+
2754+
future::ok::<_, hyper::Error>(
2755+
// In 1.0 we would use `http_body_util::Empty::<Bytes>::new()` to construct
2756+
// an empty body
2757+
Response::builder().status(200).body(Body::empty()).unwrap(),
2758+
)
2759+
});
2760+
2761+
let (socket, _) = listener.accept().await.unwrap();
2762+
http2::Builder::new(TokioExecutor)
2763+
.serve_connection(socket, svc)
2764+
.await
2765+
.unwrap();
2766+
}
2767+
2768+
#[derive(Clone)]
2769+
/// An Executor that uses the tokio runtime.
2770+
pub struct TokioExecutor;
2771+
2772+
impl<F> hyper::rt::Executor<F> for TokioExecutor
2773+
where
2774+
F: std::future::Future + Send + 'static,
2775+
F::Output: Send + 'static,
2776+
{
2777+
fn execute(&self, fut: F) {
2778+
tokio::task::spawn(fut);
2779+
}
2780+
}
2781+
}
26442782
// -------------------------------------------------
26452783
// the Server that is used to run all the tests with
26462784
// -------------------------------------------------

0 commit comments

Comments
 (0)
Please sign in to comment.