Skip to content

Commit

Permalink
Ensure that the HTTP server supports WebSocket upgrades for HTTP/1.0 …
Browse files Browse the repository at this point in the history
…non persistent connections.
  • Loading branch information
vietj committed Oct 10, 2023
1 parent 5224240 commit 68348ab
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 37 deletions.
63 changes: 31 additions & 32 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private abstract static class Stream {

private Object trace;
private Object metric;
private HttpRequestHead request;
private HttpResponseHead response;
private boolean responseEnded;
private long bytesRead;
Expand Down Expand Up @@ -422,7 +423,6 @@ private static class StreamImpl extends Stream implements HttpClientStream {
private final InboundBuffer<Object> queue;
private boolean reset;
private boolean closed;
private HttpRequestHead request;
private Handler<HttpResponseHead> headHandler;
private Handler<Buffer> chunkHandler;
private Handler<MultiMap> endHandler;
Expand Down Expand Up @@ -554,7 +554,7 @@ public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boo
private void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, Handler<AsyncResult<Void>> handler) {
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
this.request = request;
((Stream)this).request = request;
conn.beginRequest(this, request, chunked, buf, end, connect, handler);
} else {
eventLoop.execute(() -> writeHead(request, chunked, buf, end, connect, handler));
Expand Down Expand Up @@ -840,40 +840,14 @@ private void handleResponseBegin(Stream stream, HttpResponseHead response) {
} else {
HttpRequestHead request;
synchronized (this) {
request = ((StreamImpl)stream).request;
request = stream.request;
stream.response = response;

if (metrics != null) {
metrics.responseBegin(stream.metric, response);
}

//
if (response.statusCode != 100 && request.method != HttpMethod.CONNECT) {
// See https://tools.ietf.org/html/rfc7230#section-6.3
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
this.close = true;
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
this.close = true;
}
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
if (keepAliveHeader != null) {
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
if (timeout != -1) {
this.keepAliveTimeout = timeout;
}
}
}
}

//
stream.handleHead(response);

if (isConnect) {
if ((request.method == HttpMethod.CONNECT &&
response.statusCode == 200) || (
Expand Down Expand Up @@ -922,19 +896,44 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) {

private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
boolean check;
HttpResponseHead response;
synchronized (this) {
if (stream.response == null) {
response = stream.response;
if (response == null) {
// 100-continue
return;
}
responses.pop();
close |= !options.isKeepAlive();
HttpRequestHead request = stream.request;
if ((request.method != HttpMethod.CONNECT && response.statusCode != 101)) {
// See https://tools.ietf.org/html/rfc7230#section-6.3
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
boolean close = !options.isKeepAlive();
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
close = true;
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
close = true;
}
this.close = close;
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
if (keepAliveHeader != null) {
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
if (timeout != -1) {
this.keepAliveTimeout = timeout;
}
}
}
stream.responseEnded = true;
check = requests.peek() != stream;
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
tracer.receiveResponse(stream.context, stream.response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
tracer.receiveResponse(stream.context, response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
}
if (metrics != null) {
metrics.responseEnd(stream.metric, stream.bytesRead);
Expand Down
20 changes: 15 additions & 5 deletions src/test/java/io/vertx/core/http/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.vertx.test.tls.Cert;
import io.vertx.test.tls.Trust;
import org.junit.Test;
import org.junit.Ignore;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
Expand Down Expand Up @@ -2478,7 +2479,7 @@ public void testServerWebSocketPingPong() {
}));
await();
}

@Test
public void testWebSocketPausePing() throws InterruptedException {
server = vertx.createHttpServer(new HttpServerOptions().setIdleTimeout(1).setPort(DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST));
Expand Down Expand Up @@ -2951,17 +2952,26 @@ private void testCloseCustomPayloadFromClient(Consumer<WebSocket> closeOp) {
}

@Test
public void testServerWebSocketHandshakeWithNonPersistentConnection() {
public void testServerWebSocketHandshakeWithNonPersistentHTTP1_0Connection() {
testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion.HTTP_1_0);
}

@Ignore
@Test
public void testServerWebSocketHandshakeWithNonPersistentHTTP1_1Connection() {
// Cannot pass until we merge connection header as it implies a "Connection: upgrade, close" header
testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion.HTTP_1_1);
}

private void testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion version) {
server = vertx.createHttpServer();
server.webSocketHandler(ws -> {
ws.frameHandler(frame -> {
ws.close();
});
});
server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(v1 -> {
handshake(vertx.createHttpClient(), req -> {
MultiMap headers = req.headers();
headers.add("Connection", "close");
handshake(vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(version).setKeepAlive(false)), req -> {
req.send(onSuccess(resp -> {
assertEquals(101, resp.statusCode());
resp.endHandler(v -> {
Expand Down

0 comments on commit 68348ab

Please sign in to comment.