Skip to content

Commit

Permalink
Internal refactor of the WebSocket connection implementation
Browse files Browse the repository at this point in the history
Move WebSocket related code from `Http1xConnectionBase` to its own `ConnectionBase` subclass in order to simplify `Http1xConnectionBase` and ease its maintenance. The WebSocket connection has now its own Netty handler instead of relying on the HTTP/1.x handler. The HTTP/1.x handler now does not need to take in account WebSocket frames and WebSocket state.

Simplify server WebSocket code by moving the code related to WebSocket accept/reject API to a WebSocket implementation that proxies an accepted `WebSocket` using the same approach than `ClientWebSocket`
  • Loading branch information
vietj committed Apr 29, 2024
1 parent d946575 commit 5991c28
Show file tree
Hide file tree
Showing 20 changed files with 964 additions and 398 deletions.
5 changes: 4 additions & 1 deletion src/main/java/io/vertx/core/http/ServerWebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ default ServerWebSocket resume() {
*
* @throws IllegalStateException when the WebSocket handshake is already set
*/
void reject();
default void reject() {
// SC_BAD_GATEWAY
reject(502);
}

/**
* Like {@link #reject()} but with a {@code status}.
Expand Down
52 changes: 28 additions & 24 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> implements HttpClientConnectionInternal {
public class Http1xClientConnection extends Http1xConnection implements HttpClientConnectionInternal {

private static final Logger log = LoggerFactory.getLogger(Http1xClientConnection.class);

Expand Down Expand Up @@ -757,14 +757,11 @@ public void handleMessage(Object msg) {
handleChunk((ByteBuf) msg);
} else if (msg instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) msg;
if (webSocket == null) {
if (pendingFrames == null) {
pendingFrames = new ArrayDeque<>();
}
pendingFrames.add(frame);
} else {
handleWsFrame(frame);
if (pendingFrames == null) {
pendingFrames = new ArrayDeque<>();
}
// Todo: use the new feature to park frames within the handler later
pendingFrames.add(frame);
} else {
invalidMessageHandler.handle(msg);
}
Expand Down Expand Up @@ -1005,18 +1002,30 @@ synchronized void toWebSocket(
vertx.cancelTimer(timer);
}
if (future.isSuccess()) {
WebSocketImpl ws = new WebSocketImpl(
context,
Http1xClientConnection.this,
version != V00,
options.getClosingTimeout(),
options.getMaxFrameSize(),
options.getMaxMessageSize(),
registerWriteHandlers);

VertxHandler<WebSocketConnection> handler = VertxHandler.create(ctx -> {
WebSocketConnection conn = new WebSocketConnection(context, ctx, client.metrics());
WebSocketImpl webSocket = new WebSocketImpl(
context,
conn,
version != V00,
options.getClosingTimeout(),
options.getMaxFrameSize(),
options.getMaxMessageSize(),
registerWriteHandlers);
conn.webSocket(webSocket);
conn.metric(Http1xClientConnection.this.metric());
return conn;
});

ChannelPipeline pipeline = chctx.pipeline();
pipeline.replace(VertxHandler.class, "handler", handler);

WebSocketImpl ws = (WebSocketImpl) handler.getConnection().webSocket();
ws.headers(new HeadersAdaptor(future.getNow()));
ws.subProtocol(handshaker.actualSubprotocol());
ws.registerHandler(vertx.eventBus());
webSocket = ws;

HttpClientMetrics metrics = client.metrics();
if (metrics != null) {
ws.setMetric(metrics.connected(ws));
Expand All @@ -1027,7 +1036,7 @@ synchronized void toWebSocket(
pendingFrames = null;
WebSocketFrame frame;
while ((frame = toResubmit.poll()) != null) {
handleWsFrame(frame);
handler.getConnection().handleWsFrame(frame);
}
}
promise.complete(ws);
Expand Down Expand Up @@ -1170,19 +1179,14 @@ protected void handleClosed() {
evictionHandler.handle(null);
}
}
WebSocketImpl ws;
VertxTracer tracer = context.tracer();
List<Stream> allocatedStreams;
List<Stream> sentStreams;
synchronized (this) {
ws = webSocket;
sentStreams = new ArrayList<>(responses);
allocatedStreams = new ArrayList<>(requests);
allocatedStreams.removeAll(responses);
}
if (ws != null) {
ws.handleConnectionClosed();
}
for (Stream stream : allocatedStreams) {
stream.context.execute(HttpUtils.CONNECTION_CLOSED_EXCEPTION, stream::handleClosed);
}
Expand All @@ -1200,7 +1204,7 @@ protected void handleClosed() {

protected void handleIdle(IdleStateEvent event) {
synchronized (this) {
if (webSocket == null && responses.isEmpty() && requests.isEmpty()) {
if (responses.isEmpty() && requests.isEmpty()) {
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.stream.ChunkedFile;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
Expand All @@ -33,89 +26,29 @@
import io.vertx.core.http.GoAway;
import io.vertx.core.http.Http2Settings;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.WebSocketFrameType;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.ShutdownEvent;

import static io.vertx.core.net.impl.VertxHandler.safeBuffer;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
abstract class Http1xConnectionBase<S extends WebSocketImplBase<S>> extends ConnectionBase implements io.vertx.core.http.HttpConnection {
abstract class Http1xConnection extends ConnectionBase implements io.vertx.core.http.HttpConnection {

protected S webSocket;
protected Handler<Void> shutdownHandler;

Http1xConnectionBase(ContextInternal context, ChannelHandlerContext chctx) {
Http1xConnection(ContextInternal context, ChannelHandlerContext chctx) {
super(context, chctx);
}

void handleWsFrame(WebSocketFrame msg) {
WebSocketFrameInternal frame = decodeFrame(msg);
WebSocketImplBase<?> w;
synchronized (this) {
w = webSocket;
}
if (w != null) {
w.context.execute(frame, w::handleFrame);
}
}

private WebSocketFrameInternal decodeFrame(io.netty.handler.codec.http.websocketx.WebSocketFrame msg) {
ByteBuf payload = safeBuffer(msg.content());
boolean isFinal = msg.isFinalFragment();
WebSocketFrameType frameType;
if (msg instanceof BinaryWebSocketFrame) {
frameType = WebSocketFrameType.BINARY;
} else if (msg instanceof CloseWebSocketFrame) {
frameType = WebSocketFrameType.CLOSE;
} else if (msg instanceof PingWebSocketFrame) {
frameType = WebSocketFrameType.PING;
} else if (msg instanceof PongWebSocketFrame) {
frameType = WebSocketFrameType.PONG;
} else if (msg instanceof TextWebSocketFrame) {
frameType = WebSocketFrameType.TEXT;
} else if (msg instanceof ContinuationWebSocketFrame) {
frameType = WebSocketFrameType.CONTINUATION;
} else {
throw new IllegalStateException("Unsupported WebSocket msg " + msg);
}
return new WebSocketFrameImpl(frameType, payload, isFinal);
}

@Override
public Future<Void> close() {
S sock;
synchronized (this) {
sock = webSocket;
}
if (sock == null) {
return super.close();
} else {
sock.close();
return closeFuture();
}
}

@Override
protected void handleWriteQueueDrained() {
if (webSocket != null) {
webSocket.context.execute(webSocket::handleWriteQueueDrained);
}
}

@Override
public Http1xConnectionBase closeHandler(Handler<Void> handler) {
return (Http1xConnectionBase) super.closeHandler(handler);
public Http1xConnection closeHandler(Handler<Void> handler) {
return (Http1xConnection) super.closeHandler(handler);
}

@Override
public Http1xConnectionBase exceptionHandler(Handler<Throwable> handler) {
return (Http1xConnectionBase) super.exceptionHandler(handler);
public Http1xConnection exceptionHandler(Handler<Throwable> handler) {
return (Http1xConnection) super.exceptionHandler(handler);
}

@Override
Expand Down Expand Up @@ -202,8 +135,6 @@ protected long sizeof(Object obj) {
return ((LastHttpContent) obj).content().readableBytes();
} else if (obj instanceof HttpContent) {
return ((HttpContent) obj).content().readableBytes();
} else if (obj instanceof WebSocketFrame) {
return ((WebSocketFrame) obj).content().readableBytes();
} else if (obj instanceof FileRegion) {
return ((FileRegion) obj).count();
} else if (obj instanceof ChunkedFile) {
Expand Down
32 changes: 5 additions & 27 deletions src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketDecoderConfig;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
Expand Down Expand Up @@ -74,7 +73,7 @@
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocketImpl> implements HttpServerConnection {
public class Http1xServerConnection extends Http1xConnection implements HttpServerConnection {

private final String serverOrigin;
private final Supplier<ContextInternal> streamContextSupplier;
Expand Down Expand Up @@ -166,7 +165,7 @@ public HttpServerMetrics metrics() {

public void handleMessage(Object msg) {
assert msg != null;
if (requestInProgress == null && wantClose && webSocket == null) {
if (requestInProgress == null && wantClose) {
// Discard message
return;
}
Expand Down Expand Up @@ -204,8 +203,6 @@ private void handleOther(Object msg) {
// concrete type check first
if (msg instanceof DefaultHttpContent || msg instanceof HttpContent) {
onContent(msg);
} else if (msg instanceof WebSocketFrame) {
handleWsFrame((WebSocketFrame) msg);
}
}

Expand Down Expand Up @@ -273,7 +270,7 @@ void responseComplete() {
responseInProgress = null;
DecoderResult result = request.decoderResult();
if (result.isSuccess()) {
if (requestInProgress == request || webSocket != null) {
if (requestInProgress == request) {
// Deferred
} else {
if (wantClose && shutdownTimerID == -1L) {
Expand Down Expand Up @@ -357,8 +354,6 @@ void createWebSocket(Http1xServerRequest request, PromiseInternal<ServerWebSocke
context.execute(() -> {
if (request != responseInProgress) {
promise.fail("Invalid request");
} else if (webSocket != null) {
promise.complete(webSocket);
} else if (!(request.nettyRequest() instanceof FullHttpRequest)) {
promise.fail(new IllegalStateException());
} else {
Expand All @@ -369,25 +364,12 @@ void createWebSocket(Http1xServerRequest request, PromiseInternal<ServerWebSocke
promise.fail(e);
return;
}
webSocket = new ServerWebSocketImpl(
promise.context(),
this,
handshaker.version() != WebSocketVersion.V00,
options.getWebSocketClosingTimeout(),
request,
handshaker,
options.getMaxWebSocketFrameSize(),
options.getMaxWebSocketMessageSize(),
options.isRegisterWebSocketWriteHandlers());
if (METRICS_ENABLED && metrics != null) {
webSocket.setMetric(metrics.connected(metric(), request.metric(), webSocket));
}
promise.complete(webSocket);
promise.complete(new ServerWebSocketHandshaker(request, handshaker, options));
}
});
}

private WebSocketServerHandshaker createHandshaker(Http1xServerRequest request) throws WebSocketHandshakeException {
public WebSocketServerHandshaker createHandshaker(Http1xServerRequest request) throws WebSocketHandshakeException {
// As a fun part, Firefox 6.0.2 supports Websockets protocol '7'. But,
// it doesn't send a normal 'Connection: Upgrade' header. Instead it
// sends: 'Connection: keep-alive, Upgrade'. Brilliant.
Expand Down Expand Up @@ -528,7 +510,6 @@ void write103EarlyHints(HttpHeaders headers, PromiseInternal<Void> promise) {
protected void handleClosed() {
Http1xServerRequest responseInProgress = this.responseInProgress;
Http1xServerRequest requestInProgress = this.requestInProgress;
ServerWebSocketImpl ws = this.webSocket;
if (requestInProgress != null) {
requestInProgress.context.execute(v -> {
requestInProgress.handleException(HttpUtils.CONNECTION_CLOSED_EXCEPTION);
Expand All @@ -539,9 +520,6 @@ protected void handleClosed() {
responseInProgress.handleException(HttpUtils.CONNECTION_CLOSED_EXCEPTION);
});
}
if (ws != null) {
ws.context.execute(v -> ws.handleConnectionClosed());
}
super.handleClosed();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public void handle(HttpServerRequest req) {
// handle((Http1xServerRequest) req, wsHandler);
((Http1xServerRequest)req).webSocket().onComplete(ar -> {
if (ar.succeeded()) {
ServerWebSocketImpl ws = (ServerWebSocketImpl) ar.result();
ServerWebSocketHandshaker ws = (ServerWebSocketHandshaker) ar.result();
wsHandler.handle(ws);
ws.tryHandshake(101);
ws.tryAccept();
} else {
// ????
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,13 @@ public Future<Void> shutdown(long timeout, TimeUnit unit) {
closeTimeoutUnit = unit;
closeSequence = null;
}
ContextInternal ctx = vertx.getOrCreateContext();
if (seq == null) {
return vertx.getOrCreateContext().succeededFuture();
return ctx.succeededFuture();
} else {
return seq.close();
Promise<Void> p = ctx.promise();
seq.close().onComplete(p);
return p.future();
}
}

Expand Down

0 comments on commit 5991c28

Please sign in to comment.