Skip to content

Commit

Permalink
Handle keep-alive correctly (#333)
Browse files Browse the repository at this point in the history
* Proper handling of keep-alive.

* Fix windows build again.

* Adhere to the HTTP/1.1 spec more (don't return the header as it's implied)
  • Loading branch information
tomusdrw committed Oct 30, 2018
1 parent a39139f commit 2ed142d
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 1 deletion.
19 changes: 18 additions & 1 deletion http/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = middleware::Noop>
rest_api: RestApi,
health_api: Option<(String, String)>,
max_request_body_size: usize,
keep_alive: bool,
}

impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
Expand All @@ -39,6 +40,7 @@ impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
rest_api: RestApi,
health_api: Option<(String, String)>,
max_request_body_size: usize,
keep_alive: bool,
) -> Self {
ServerHandler {
jsonrpc_handler,
Expand All @@ -50,6 +52,7 @@ impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
rest_api,
health_api,
max_request_body_size,
keep_alive,
}
}
}
Expand Down Expand Up @@ -89,6 +92,7 @@ impl<M: Metadata, S: Middleware<M>> Service for ServerHandler<M, S> {
cors_domains: self.cors_domains.clone(),
cors_headers: self.cors_allowed_headers.clone(),
continue_on_invalid_cors: should_continue_on_invalid_cors,
keep_alive: self.keep_alive,
},
is_options: false,
cors_max_age: self.cors_max_age,
Expand All @@ -97,6 +101,8 @@ impl<M: Metadata, S: Middleware<M>> Service for ServerHandler<M, S> {
rest_api: self.rest_api,
health_api: self.health_api.clone(),
max_request_body_size: self.max_request_body_size,
// initial value, overwritten when reading client headers
keep_alive: true,
})
}
}
Expand Down Expand Up @@ -159,6 +165,7 @@ enum RpcHandlerState<M, F, G> where
cors_domains: CorsDomains,
cors_headers: cors::AccessControlAllowHeaders,
continue_on_invalid_cors: bool,
keep_alive: bool,
},
ReadingBody {
body: hyper::Body,
Expand Down Expand Up @@ -210,6 +217,7 @@ pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
rest_api: RestApi,
health_api: Option<(String, String)>,
max_request_body_size: usize,
keep_alive: bool,
}

impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
Expand All @@ -218,10 +226,13 @@ impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
RpcHandlerState::ReadingHeaders { request, cors_domains, cors_headers, continue_on_invalid_cors, } => {
RpcHandlerState::ReadingHeaders {
request, cors_domains, cors_headers, continue_on_invalid_cors, keep_alive,
} => {
// Read cors header
self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains);
self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers);
self.keep_alive = utils::keep_alive(&request, keep_alive);
self.is_options = *request.method() == Method::OPTIONS;
// Read other headers
RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
Expand Down Expand Up @@ -288,6 +299,7 @@ impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
self.cors_max_age,
cors_allow_origin.into(),
cors_allow_headers.into(),
self.keep_alive,
);
Ok(Async::Ready(response))
},
Expand Down Expand Up @@ -502,6 +514,7 @@ impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
cors_max_age: Option<u32>,
cors_allow_origin: Option<HeaderValue>,
cors_allow_headers: Option<Vec<HeaderValue>>,
keep_alive: bool,
) {
let as_header = |m: Method| m.as_str().parse().expect("`Method` will always parse; qed");
let concat = |headers: &[HeaderValue]| {
Expand Down Expand Up @@ -540,6 +553,10 @@ impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
}
}
}

if !keep_alive {
headers.append(header::CONNECTION, HeaderValue::from_static("close"));
}
}

/// Returns true if the `content_type` header indicates a valid JSON
Expand Down
1 change: 1 addition & 0 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
rest_api,
health_api.clone(),
max_request_body_size,
keep_alive,
);
tokio::spawn(http.serve_connection(socket, service)
.map_err(|e| error!("Error serving connection: {:?}", e)));
Expand Down
82 changes: 82 additions & 0 deletions http/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,88 @@ fn should_return_error_in_case_of_unsecure_rest_and_no_method() {
assert_eq!(&response.body, "Supplied content type is not allowed. Content-Type: application/json is required\n");
}

#[test]
fn should_return_connection_header() {
// given
let server = serve(id);
let addr = server.address().clone();

// when
let req = r#"[{"jsonrpc":"2.0","id":1,"method":"hello"}]"#;
let response = request(server,
&format!("\
POST / HTTP/1.1\r\n\
Host: localhost:{}\r\n\
Connection: close\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n\
{}\r\n\
", addr.port(), req.as_bytes().len(), req)
);

// then
assert!(response.headers.contains("connection: close"),
"Headers missing in {}", response.headers);
assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned());
assert_eq!(response.body, world_batch());
}

#[test]
fn should_close_connection_without_keep_alive() {
// given
let server = serve(|builder| builder.keep_alive(false));
let addr = server.address().clone();

// when
let req = r#"[{"jsonrpc":"2.0","id":1,"method":"hello"}]"#;
let response = request(server,
&format!("\
POST / HTTP/1.1\r\n\
Host: localhost:{}\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n\
{}\r\n\
", addr.port(), req.as_bytes().len(), req)
);

// then
assert!(response.headers.contains("connection: close"),
"Header missing in {}", response.headers);
assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned());
assert_eq!(response.body, world_batch());
}

#[test]
fn should_respond_with_close_even_if_client_wants_to_keep_alive() {
// given
let server = serve(|builder| builder.keep_alive(false));
let addr = server.address().clone();

// when
let req = r#"[{"jsonrpc":"2.0","id":1,"method":"hello"}]"#;
let response = request(server,
&format!("\
POST / HTTP/1.1\r\n\
Host: localhost:{}\r\n\
Connection: keep-alive\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n\
{}\r\n\
", addr.port(), req.as_bytes().len(), req)
);

// then
assert!(response.headers.contains("connection: close"),
"Headers missing in {}", response.headers);
assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned());
assert_eq!(response.body, world_batch());
}



fn invalid_host() -> String {
"Provided Host header is not whitelisted.\n".into()
}
Expand Down
19 changes: 19 additions & 0 deletions http/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,22 @@ pub fn cors_allow_headers(
.unwrap_or_else(|_| header::HeaderValue::from_static("unknown"))
)
}

/// Returns an optional value of `Connection` header that should be included in the response.
/// The second parameter defines if server is configured with keep-alive option.
/// Return value of `true` indicates that no `Connection` header should be returned,
/// `false` indicates `Connection: close`.
pub fn keep_alive(
request: &hyper::Request<hyper::Body>,
keep_alive: bool,
) -> bool {
read_header(request, "connection")
.map(|val| match (keep_alive, val) {
// indicate that connection should be closed
(false, _) | (_, "close") => false,
// don't include any headers otherwise
_ => true,
})
// if the client header is not present, close connection if we don't keep_alive
.unwrap_or(keep_alive)
}

0 comments on commit 2ed142d

Please sign in to comment.