Skip to content

Commit

Permalink
h2: disable auto read for stream channels
Browse files Browse the repository at this point in the history
Motivation:
HTTP/2 stream/child channel use cases don't disable autoRead
explicitly. This means we may read and queue more data than
desirable.

Modifications:
- Disable auto read for h2 stream/child channels.
  • Loading branch information
Scottmitch committed Jan 11, 2024
1 parent 8a8e3ab commit 641d3f8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -108,6 +109,8 @@ class ErrorHandlingTest {
private BlockingTesterClient blockingClient;
@Nullable
private Publisher<TestRequest> requestPublisher;
@Nullable
private GrpcExecutionStrategy clientExecutionStrategy;

private enum TestMode {
HttpClientFilterThrows,
Expand Down Expand Up @@ -158,6 +161,7 @@ boolean isSafeNoOffload() {
private void setUp(TestMode testMode, GrpcExecutionStrategy serverStrategy,
GrpcExecutionStrategy clientStrategy) throws Exception {
this.testMode = testMode;
this.clientExecutionStrategy = clientStrategy;
cannedResponse = TestResponse.newBuilder().setMessage("foo").build();
ServiceFactory serviceFactory;
StreamingHttpServiceFilterFactory serviceFilterFactory = IDENTITY_FILTER;
Expand Down Expand Up @@ -643,7 +647,15 @@ private void verifyException(final Throwable cause) {
assertNotNull(cause);
assertThat(assertThrows(GrpcStatusException.class, () -> {
throw cause;
}).status().code(), equalTo(expectedStatus()));
}).status().code(), either(equalTo(expectedStatus())).or(equalTo(expectedStatusSecondary())));
}

private GrpcStatusCode expectedStatusSecondary() {
// The server writes trailers with expected status then a RST stream frame. The client may not be done with its
// write operation and the RST may be result in CANCELLED being propagated instead.
return testMode == TestMode.ServiceSecondOperatorThrowsGrpcException &&
clientExecutionStrategy != null && clientExecutionStrategy.hasOffloads() ?
GrpcStatusCode.CANCELLED : expectedStatus();
}

private GrpcStatusCode expectedStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public final class DefaultNettyConnection<Read, Write> extends NettyChannelListe
private final ChannelConfig parentChannelConfig;
private volatile DataObserver dataObserver;
private final boolean isClient;
private final boolean readOnChannelComplete;
private final Predicate<Object> shouldWait;
private final UnaryOperator<Throwable> enrichProtocolError;
private final TerminalSignalConsumer cleanupStateConsumer = new TerminalSignalConsumer() {
Expand Down Expand Up @@ -170,7 +171,7 @@ private void cleanupOnWriteTerminated() {
private DefaultNettyConnection(
Channel channel, @Nullable ConnectionContext parent, ExecutionContext<?> executionContext,
CloseHandler closeHandler, FlushStrategy flushStrategy,
long idleTimeoutMs, Protocol protocol,
long idleTimeoutMs, Protocol protocol, boolean readOnChannelComplete,
@Nullable SslConfig sslConfig, @Nullable SSLSession sslSession,
@Nullable ChannelConfig parentChannelConfig, DataObserver dataObserver, boolean isClient,
Predicate<Object> shouldWait, UnaryOperator<Throwable> enrichProtocolError) {
Expand Down Expand Up @@ -200,6 +201,7 @@ private DefaultNettyConnection(
this.protocol = requireNonNull(protocol);
this.dataObserver = dataObserver;
this.isClient = isClient;
this.readOnChannelComplete = readOnChannelComplete;
this.shouldWait = requireNonNull(shouldWait);
this.enrichProtocolError = requireNonNull(enrichProtocolError);
}
Expand Down Expand Up @@ -343,8 +345,14 @@ private static <Read, Write> DefaultNettyConnection<Read, Write> initChildChanne
Predicate<Object> shouldWait, UnaryOperator<Throwable> enrichProtocolError) {
assert parent == null || parent.executionContext() == executionContext;
assert channel.eventLoop() == toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor()).eventLoopGroup();

// For h2 the parent channel must use auto read because control frames and flow controlled frames are on the
// same socket, and we must read in timely manner to avoid deadlock. Child channel should not use auto
// read as read is explicitly called by NettyChannelPublisher according to the Subscription.request(n) demand.
channel.config().setAutoRead(false);

DefaultNettyConnection<Read, Write> connection = new DefaultNettyConnection<>(channel, parent, executionContext,
closeHandler, flushStrategy, idleTimeoutMs, protocol, sslConfig, sslSession, parentChannelConfig,
closeHandler, flushStrategy, idleTimeoutMs, protocol, true, sslConfig, sslSession, parentChannelConfig,
streamObserver.streamEstablished(), isClient, shouldWait, enrichProtocolError);
channel.pipeline().addLast(new NettyToStChannelHandler<>(connection, null,
null, false, NoopConnectionObserver.INSTANCE));
Expand Down Expand Up @@ -505,7 +513,7 @@ protected void handleSubscribe(
@Nullable
final SSLSession sslSession = extractSslSession(sslConfig, pipeline);
DefaultNettyConnection<Read, Write> connection = new DefaultNettyConnection<>(channel, null,
executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, sslConfig,
executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, false, sslConfig,
sslSession, null, NoopDataObserver.INSTANCE, isClient, shouldWait, identity());
channel.attr(CHANNEL_CLOSEABLE_KEY).set(connection);
delayedCancellable = new DelayedCancellable();
Expand Down Expand Up @@ -921,6 +929,12 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
LOGGER.debug("{} Received a user event: {}", ctx.channel(), evt);
if (evt == CloseHandler.InboundDataEndEvent.INSTANCE) {
connection.nettyChannelPublisher.channelOnComplete();
if (connection.readOnChannelComplete) {
// h2 child channel may require read() to notify of channel closure. This constraint is lifted
// with netty enhancement [1] but keep this for some time to give folks time to update Netty.
// [1] https://github.com/netty/netty/pull/13779
ctx.read();
}
} else if (evt == CloseHandler.OutboundDataEndEvent.INSTANCE) {
connection.channelOutboundListener.channelOutboundClosed();
} else if (evt == AbortWritesEvent.INSTANCE) {
Expand Down

0 comments on commit 641d3f8

Please sign in to comment.