Skip to content

Commit

Permalink
netty: GOAWAY should trigger transportShutdown
Browse files Browse the repository at this point in the history
Long-lived streams or lengthy RPCs can keep the transport open for
minutes after a GOAWAY is received. Previously, during this time any new
RPCs would fail with a message like:

> Cannot create stream 5 since this endpoint has received a GOAWAY frame
> with last stream id 3

All usages of goAwayStatus were replaced with lifecycleManager. Although
note that previously goAwayStatus() would never return null because it
would generate a Status if the current field was null.
getShutdownStatus() does not have this feature, so some code was
rearranged to guarantee the Status is non-null before retrieving it.

The listener handling was simplified by 1) avoiding the need for
thread-safety and 2) moving state keeping into a small class for easy
comprehensibility and simplified usage in tests.

NettyClientTransport.shutdown() no longer calls transportShutdown()
because it lies (because the message can be delayed for quite some time)
and because it was the only usage of lifecycleManager not on the event
loop.

Fixes #1359
  • Loading branch information
ejona86 committed Mar 9, 2016
1 parent bd66bfe commit 465d44a
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 129 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.grpc.netty;

import io.grpc.Status;
import io.grpc.internal.ManagedClientTransport;

/** Maintainer of transport lifecycle status. */
final class ClientTransportLifecycleManager {
private final ManagedClientTransport.Listener listener;
private boolean transportReady;
private boolean transportShutdown;
/** null iff !transportShutdown. */
private Status shutdownStatus;
/** null iff !transportShutdown. */
private Throwable shutdownThrowable;
private boolean transportTerminated;

public ClientTransportLifecycleManager(ManagedClientTransport.Listener listener) {
this.listener = listener;
}

public void notifyReady() {
if (transportReady || transportShutdown) {
return;
}
transportReady = true;
listener.transportReady();
}

public void notifyShutdown(Status s) {
if (transportShutdown) {
return;
}
transportShutdown = true;
shutdownStatus = s;
shutdownThrowable = s.asException();
listener.transportShutdown(s);
}

public void notifyTerminated(Status s) {
if (transportTerminated) {
return;
}
transportTerminated = true;
notifyShutdown(s);
listener.transportTerminated();
}

public Status getShutdownStatus() {
return shutdownStatus;
}

public Throwable getShutdownThrowable() {
return shutdownThrowable;
}
}
19 changes: 17 additions & 2 deletions netty/src/main/java/io/grpc/netty/GracefulCloseCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,20 @@

package io.grpc.netty;

/** A command to trigger close. It is buffered differently than normal close. */
class GracefulCloseCommand {}
import io.grpc.Status;

/**
* A command to trigger close. It is buffered differently than normal close and also includes
* reason for closure.
*/
class GracefulCloseCommand {
private final Status status;

public GracefulCloseCommand(Status status) {
this.status = status;
}

public Status getStatus() {
return status;
}
}
124 changes: 52 additions & 72 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.grpc.internal.ClientTransport.PingCallback;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
import io.grpc.internal.ManagedClientTransport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -86,8 +85,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.Nullable;

/**
* Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
* the context of the Netty Channel thread.
Expand All @@ -108,14 +105,13 @@ class NettyClientHandler extends AbstractNettyHandler {
Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");

private final Http2Connection.PropertyKey streamKey;
private final ClientTransportLifecycleManager lifecycleManager;
private final Ticker ticker;
private final Random random = new Random();
private WriteQueue clientWriteQueue;
private Http2Ping ping;
private Status goAwayStatus;
private Throwable goAwayStatusThrowable;

static NettyClientHandler newHandler(ManagedClientTransport.Listener listener,
static NettyClientHandler newHandler(ClientTransportLifecycleManager lifecycleManager,
int flowControlWindow, int maxHeaderListSize,
Ticker ticker) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Expand All @@ -124,19 +120,20 @@ static NettyClientHandler newHandler(ManagedClientTransport.Listener listener,
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
Http2Connection connection = new DefaultHttp2Connection(false);
return newHandler(connection, frameReader, frameWriter, listener, flowControlWindow, ticker);
return newHandler(
connection, frameReader, frameWriter, lifecycleManager, flowControlWindow, ticker);
}

@VisibleForTesting
static NettyClientHandler newHandler(Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
final ManagedClientTransport.Listener listener,
ClientTransportLifecycleManager lifecycleManager,
int flowControlWindow,
Ticker ticker) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(listener, "listener");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
Preconditions.checkNotNull(ticker, "ticker");

Expand All @@ -145,18 +142,7 @@ static NettyClientHandler newHandler(Http2Connection connection,
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);

StreamBufferingEncoder encoder = new StreamBufferingEncoder(
new DefaultHttp2ConnectionEncoder(connection, frameWriter)) {
private boolean firstSettings = true;

@Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
if (firstSettings) {
firstSettings = false;
listener.transportReady();
}
return super.writeSettingsAck(ctx, promise);
}
};
new DefaultHttp2ConnectionEncoder(connection, frameWriter));

// Create the local flow controller configured to auto-refill the connection window.
connection.local().flowController(new DefaultHttp2LocalFlowController(connection,
Expand All @@ -170,13 +156,15 @@ public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise
settings.initialWindowSize(flowControlWindow);
settings.maxConcurrentStreams(0);

return new NettyClientHandler(decoder, encoder, settings, ticker);
return new NettyClientHandler(decoder, encoder, settings, lifecycleManager, ticker);
}

private NettyClientHandler(Http2ConnectionDecoder decoder,
StreamBufferingEncoder encoder, Http2Settings settings,
ClientTransportLifecycleManager lifecycleManager,
Ticker ticker) {
super(decoder, encoder, settings);
this.lifecycleManager = lifecycleManager;
this.ticker = ticker;

// Set the frame listener on the decoder.
Expand All @@ -187,22 +175,11 @@ private NettyClientHandler(Http2ConnectionDecoder decoder,
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
goAwayStatus(statusFromGoAway(errorCode, ByteBufUtil.getBytes(debugData)));
goingAway();
goingAway(statusFromGoAway(errorCode, ByteBufUtil.getBytes(debugData)));
}
});
}

/**
* Return the reason the handler failed. Only intended to be used by {@link NettyClientTransport}.
* Most other classes should retrieve the transport's shutdown status, since it may be more
* complete.
*/
@Nullable
public Status errorStatus() {
return goAwayStatus;
}

/**
* Handler for commands sent from the stream.
*/
Expand All @@ -220,10 +197,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} else if (msg instanceof SendPingCommand) {
sendPingFrame(ctx, (SendPingCommand) msg, promise);
} else if (msg instanceof GracefulCloseCommand) {
// Explicitly flush to create any buffered streams before sending GOAWAY.
// TODO(ejona): determine if the need to flush is a bug in Netty
flush(ctx);
close(ctx, promise);
gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
} else if (msg == NOOP_MESSAGE) {
ctx.write(Unpooled.EMPTY_BUFFER, promise);
} else {
Expand Down Expand Up @@ -278,7 +252,8 @@ private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
logger.fine("Network channel being closed by the application.");
goAwayStatus(Status.UNAVAILABLE.withDescription("Channel requested transport shutdown"));
lifecycleManager.notifyShutdown(
Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
super.close(ctx, promise);
}

Expand All @@ -289,15 +264,17 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
logger.fine("Network channel is closed");
goAwayStatus(goAwayStatus().augmentDescription("Network channel closed"));
cancelPing();
lifecycleManager.notifyShutdown(
Status.UNAVAILABLE.withDescription("Network closed for unknown reason"));
cancelPing(lifecycleManager.getShutdownThrowable());
// Report status to the application layer for any open streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream clientStream = clientStream(stream);
if (clientStream != null) {
clientStream.transportReportStatus(goAwayStatus, false, new Metadata());
clientStream.transportReportStatus(
lifecycleManager.getShutdownStatus(), false, new Metadata());
}
return true;
}
Expand All @@ -312,7 +289,8 @@ public boolean visit(Http2Stream stream) throws Http2Exception {
protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause,
Http2Exception http2Ex) {
logger.log(Level.FINE, "Caught a connection error", cause);
goAwayStatus(Utils.statusFromThrowable(cause));
lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
// Parent class will shut down the Channel
super.onConnectionError(ctx, cause, http2Ex);
}

Expand Down Expand Up @@ -342,9 +320,9 @@ protected boolean isGracefulShutdownComplete() {
*/
private void createStream(CreateStreamCommand command, final ChannelPromise promise)
throws Exception {
if (goAwayStatus != null) {
if (lifecycleManager.getShutdownThrowable() != null) {
// The connection is going away, just terminate the stream now.
promise.setFailure(goAwayStatusThrowable);
promise.setFailure(lifecycleManager.getShutdownThrowable());
return;
}

Expand Down Expand Up @@ -398,8 +376,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
StreamBufferingEncoder.Http2GoAwayException e =
(StreamBufferingEncoder.Http2GoAwayException) cause;
goAwayStatus(statusFromGoAway(e.errorCode(), e.debugData()));
promise.setFailure(goAwayStatusThrowable);
lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData()));
promise.setFailure(lifecycleManager.getShutdownThrowable());
} else {
promise.setFailure(cause);
}
Expand Down Expand Up @@ -434,9 +412,9 @@ private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
*/
private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
ChannelPromise promise) {
// Don't check goAwayStatus since we want to allow pings after shutdown but before termination.
// After termination, messages will no longer arrive because the pipeline clears all handlers on
// channel close.
// Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
// but before termination. After termination, messages will no longer arrive because the
// pipeline clears all handlers on channel close.

PingCallback callback = msg.callback();
Executor executor = msg.executor();
Expand Down Expand Up @@ -477,12 +455,22 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
ChannelPromise promise) throws Exception {
lifecycleManager.notifyShutdown(msg.getStatus());
// Explicitly flush to create any buffered streams before sending GOAWAY.
// TODO(ejona): determine if the need to flush is a bug in Netty
flush(ctx);
close(ctx, promise);
}

/**
* Handler for a GOAWAY being either sent or received. Fails any streams created after the
* last known stream.
*/
private void goingAway() {
final Status goAwayStatus = goAwayStatus();
private void goingAway(Status status) {
lifecycleManager.notifyShutdown(status);
final Status goAwayStatus = lifecycleManager.getShutdownStatus();
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
try {
connection().forEachActiveStream(new Http2StreamVisitor() {
Expand All @@ -503,27 +491,9 @@ public boolean visit(Http2Stream stream) throws Http2Exception {
}
}

/**
* Returns the appropriate status used to represent the cause for GOAWAY.
*/
private Status goAwayStatus() {
if (goAwayStatus != null) {
return goAwayStatus;
}
return Status.UNAVAILABLE.withDescription("Connection going away, but for unknown reason");
}

private void goAwayStatus(Status status) {
// Don't overwrite if we already have a goAwayStatus.
if (goAwayStatus == null) {
goAwayStatus = status;
goAwayStatusThrowable = status.asException();
}
}

private void cancelPing() {
private void cancelPing(Throwable t) {
if (ping != null) {
ping.failed(goAwayStatus().asException());
ping.failed(t);
ping = null;
}
}
Expand Down Expand Up @@ -565,6 +535,16 @@ private Http2Stream requireHttp2Stream(int streamId) {
}

private class FrameListener extends Http2FrameAdapter {
private boolean firstSettings = true;

@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
if (firstSettings) {
firstSettings = false;
lifecycleManager.notifyReady();
}
}

@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
Expand Down
1 change: 1 addition & 0 deletions netty/src/main/java/io/grpc/netty/NettyClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void operationComplete(ChannelFuture future) throws Exception {

/**
* Intended to be overriden by NettyClientTransport, which has more information about failures.
* May only be called from event loop.
*/
protected abstract Status statusFromFailedFuture(ChannelFuture f);

Expand Down

0 comments on commit 465d44a

Please sign in to comment.