From 83a3b25e807e9a330d7174356e25278afdd5bfbe Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 1 May 2020 15:39:22 -0700 Subject: [PATCH] netty: allow to use bandwidth delay product (#6979) --- .../integration/AbstractInteropTest.java | 2 +- .../integration/AutoWindowSizingOnTest.java | 11 +- .../integration/NettyFlowControlTest.java | 121 +++++++++++----- .../io/grpc/netty/AbstractNettyHandler.java | 58 ++++++-- .../grpc/netty/InternalHandlerSettings.java | 44 ------ .../java/io/grpc/netty/ListeningEncoder.java | 136 ++++++++++++++++++ .../io/grpc/netty/NettyChannelBuilder.java | 27 +++- .../io/grpc/netty/NettyClientHandler.java | 20 ++- .../io/grpc/netty/NettyClientTransport.java | 9 +- .../io/grpc/netty/NettyHandlerSettings.java | 72 ---------- .../main/java/io/grpc/netty/NettyServer.java | 6 +- .../io/grpc/netty/NettyServerBuilder.java | 24 +++- .../io/grpc/netty/NettyServerHandler.java | 16 ++- .../io/grpc/netty/NettyServerTransport.java | 5 +- .../io/grpc/netty/NettyClientHandlerTest.java | 1 + .../grpc/netty/NettyClientTransportTest.java | 7 +- .../io/grpc/netty/NettyHandlerTestBase.java | 1 + .../io/grpc/netty/NettyServerHandlerTest.java | 1 + .../java/io/grpc/netty/NettyServerTest.java | 4 + 19 files changed, 365 insertions(+), 200 deletions(-) delete mode 100644 netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java create mode 100644 netty/src/main/java/io/grpc/netty/ListeningEncoder.java delete mode 100644 netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index dc8819eaf92..60a72b5f8d3 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1907,7 +1907,7 @@ protected int operationTimeoutMillis() { * Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is * chosen as a maximum amount of memory a large test would need. */ - private static void assumeEnoughMemory() { + protected static void assumeEnoughMemory() { Runtime r = Runtime.getRuntime(); long usedMem = r.totalMemory() - r.freeMemory(); long actuallyFreeMemory = r.maxMemory() - usedMem; diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java index 462ad480a4a..dc19d286881 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java @@ -18,23 +18,15 @@ import io.grpc.ManagedChannel; import io.grpc.internal.AbstractServerImplBuilder; -import io.grpc.netty.InternalHandlerSettings; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; -import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class AutoWindowSizingOnTest extends AbstractInteropTest { - @BeforeClass - public static void turnOnAutoWindow() { - InternalHandlerSettings.enable(true); - InternalHandlerSettings.autoWindowOn(true); - } - @Override protected AbstractServerImplBuilder getServerBuilder() { return NettyServerBuilder.forPort(0) @@ -45,7 +37,8 @@ protected AbstractServerImplBuilder getServerBuilder() { protected ManagedChannel createChannel() { NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) .negotiationType(NegotiationType.PLAINTEXT) - .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) + .initialFlowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW); // Disable the default census stats interceptor, use testing interceptor instead. io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false); return builder.intercept(createCensusStatsClientInterceptor()).build(); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java index e976696ff59..4d7c8bdbca7 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java @@ -24,7 +24,11 @@ import io.grpc.ServerBuilder; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; -import io.grpc.netty.InternalHandlerSettings; +import io.grpc.netty.GrpcHttp2ConnectionHandler; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory; +import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator; +import io.grpc.netty.InternalProtocolNegotiators; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; @@ -32,6 +36,9 @@ import io.grpc.testing.integration.Messages.ResponseParameters; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import io.netty.channel.ChannelHandler; +import io.netty.handler.codec.http2.Http2Stream; +import io.netty.util.AsciiString; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -40,10 +47,11 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -63,9 +71,11 @@ public class NettyFlowControlTest { private static final int REGULAR_WINDOW = 64 * 1024; private static final int MAX_WINDOW = 8 * 1024 * 1024; - private static ManagedChannel channel; - private static Server server; - private static TrafficControlProxy proxy; + private final CapturingProtocolNegotiationFactory capturingPnFactory + = new CapturingProtocolNegotiationFactory(); + private ManagedChannel channel; + private Server server; + private TrafficControlProxy proxy; private int proxyPort; private int serverPort; @@ -74,11 +84,6 @@ public class NettyFlowControlTest { new ThreadPoolExecutor(1, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(), new DefaultThreadFactory("flowcontrol-test-pool", true)); - @BeforeClass - public static void setUp() { - InternalHandlerSettings.enable(true); - InternalHandlerSettings.autoWindowOn(true); - } @AfterClass public static void shutDownTests() { @@ -93,8 +98,13 @@ public void initTest() { @After public void endTest() throws IOException { - proxy.shutDown(); - server.shutdown(); + if (proxy != null) { + proxy.shutDown(); + } + server.shutdownNow(); + if (channel != null) { + channel.shutdownNow(); + } } @Test @@ -102,7 +112,7 @@ public void largeBdp() throws InterruptedException, IOException { proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy.start(); proxyPort = proxy.getPort(); - resetConnection(REGULAR_WINDOW); + createAndStartChannel(REGULAR_WINDOW); doTest(HIGH_BAND, MED_LAT); } @@ -111,16 +121,17 @@ public void smallBdp() throws InterruptedException, IOException { proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy.start(); proxyPort = proxy.getPort(); - resetConnection(REGULAR_WINDOW); + createAndStartChannel(REGULAR_WINDOW); doTest(LOW_BAND, MED_LAT); } @Test + @Ignore("enable once 2 pings between data is no longer necessary") public void verySmallWindowMakesProgress() throws InterruptedException, IOException { proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy.start(); proxyPort = proxy.getPort(); - resetConnection(TINY_WINDOW); + createAndStartChannel(TINY_WINDOW); doTest(HIGH_BAND, MED_LAT); } @@ -142,9 +153,10 @@ private void doTest(int bandwidth, int latency) throws InterruptedException { .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2)); StreamingOutputCallRequest request = builder.build(); - TestStreamObserver observer = new TestStreamObserver(expectedWindow); + TestStreamObserver observer = + new TestStreamObserver(capturingPnFactory.grpcHandlerRef, expectedWindow); stub.streamingOutputCall(request, observer); - int lastWindow = observer.waitFor(); + int lastWindow = observer.waitFor(5, TimeUnit.SECONDS); // deal with cases that either don't cause a window update or hit max window expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW)); @@ -160,24 +172,21 @@ private void doTest(int bandwidth, int latency) throws InterruptedException { /** * Resets client/server and their flow control windows. */ - private void resetConnection(int clientFlowControlWindow) - throws InterruptedException { - if (channel != null) { - if (!channel.isShutdown()) { - channel.shutdown(); - channel.awaitTermination(100, TimeUnit.MILLISECONDS); - } - } - channel = NettyChannelBuilder.forAddress(new InetSocketAddress("localhost", proxyPort)) - .flowControlWindow(clientFlowControlWindow) - .negotiationType(NegotiationType.PLAINTEXT) - .build(); + private void createAndStartChannel(int clientFlowControlWindow) { + NettyChannelBuilder channelBuilder = + NettyChannelBuilder + .forAddress(new InetSocketAddress("localhost", proxyPort)) + .initialFlowControlWindow(clientFlowControlWindow) + .negotiationType(NegotiationType.PLAINTEXT); + InternalNettyChannelBuilder.setProtocolNegotiatorFactory(channelBuilder, capturingPnFactory); + channel = channelBuilder.build(); } private void startServer(int serverFlowControlWindow) { ServerBuilder builder = - NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0)) - .flowControlWindow(serverFlowControlWindow); + NettyServerBuilder + .forAddress(new InetSocketAddress("localhost", 0)) + .initialFlowControlWindow(serverFlowControlWindow); builder.addService(ServerInterceptors.intercept( new TestServiceImpl(Executors.newScheduledThreadPool(2)), ImmutableList.of())); @@ -193,20 +202,25 @@ private void startServer(int serverFlowControlWindow) { */ private static class TestStreamObserver implements StreamObserver { - long startRequestNanos; + final AtomicReference grpcHandlerRef; + final long startRequestNanos; long endRequestNanos; - private final CountDownLatch latch = new CountDownLatch(1); - long expectedWindow; + final CountDownLatch latch = new CountDownLatch(1); + final long expectedWindow; int lastWindow; - public TestStreamObserver(long window) { + public TestStreamObserver( + AtomicReference grpcHandlerRef, long window) { + this.grpcHandlerRef = grpcHandlerRef; startRequestNanos = System.nanoTime(); expectedWindow = window; } @Override public void onNext(StreamingOutputCallResponse value) { - lastWindow = InternalHandlerSettings.getLatestClientWindow(); + GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get(); + Http2Stream connectionStream = grpcHandler.connection().connectionStream(); + lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream); if (lastWindow >= expectedWindow) { onCompleted(); } @@ -227,9 +241,40 @@ public long getElapsedTime() { return endRequestNanos - startRequestNanos; } - public int waitFor() throws InterruptedException { - latch.await(); + public int waitFor(long duration, TimeUnit unit) throws InterruptedException { + latch.await(duration, unit); return lastWindow; } } + + private static class CapturingProtocolNegotiationFactory implements ProtocolNegotiatorFactory { + + AtomicReference grpcHandlerRef = new AtomicReference<>(); + + @Override + public ProtocolNegotiator buildProtocolNegotiator() { + return new CapturingProtocolNegotiator(); + } + + private class CapturingProtocolNegotiator implements ProtocolNegotiator { + + final ProtocolNegotiator delegate = InternalProtocolNegotiators.plaintext(); + + @Override + public AsciiString scheme() { + return delegate.scheme(); + } + + @Override + public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { + CapturingProtocolNegotiationFactory.this.grpcHandlerRef.set(grpcHandler); + return delegate.newHandler(grpcHandler); + } + + @Override + public void close() { + delegate.close(); + } + } + } } diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index 59267c24301..6fcd03fdc42 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -16,9 +16,12 @@ package io.grpc.netty; +import static com.google.common.base.Preconditions.checkArgument; import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; import com.google.common.annotations.VisibleForTesting; +import io.grpc.netty.ListeningEncoder.Http2OutboundFrameListener; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2ConnectionDecoder; @@ -35,10 +38,15 @@ */ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1; - private boolean autoTuneFlowControlOn = false; - private int initialConnectionWindow; + private static final int MAX_ALLOWED_PING = 2; + + private final int initialConnectionWindow; + private final PingCountingListener pingCountingListener = new PingCountingListener(); + private final FlowControlPinger flowControlPing = new FlowControlPinger(MAX_ALLOWED_PING); + + private boolean autoTuneFlowControlOn; private ChannelHandlerContext ctx; - private final FlowControlPinger flowControlPing = new FlowControlPinger(); + private boolean initialWindowSent = false; private static final long BDP_MEASUREMENT_PING = 1234; @@ -46,7 +54,8 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { ChannelPromise channelUnused, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, - Http2Settings initialSettings) { + Http2Settings initialSettings, + boolean autoFlowControl) { super(channelUnused, decoder, encoder, initialSettings); // During a graceful shutdown, wait until all streams are closed. @@ -55,6 +64,10 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { // Extract the connection window from the settings if it was set. this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : initialSettings.initialWindowSize(); + this.autoTuneFlowControlOn = autoFlowControl; + if (encoder instanceof ListeningEncoder) { + ((ListeningEncoder) encoder).setListener(pingCountingListener); + } } @Override @@ -92,12 +105,12 @@ protected final ChannelHandlerContext ctx() { * Sends initial connection window to the remote endpoint if necessary. */ private void sendInitialConnectionWindow() throws Http2Exception { - if (ctx.channel().isActive() && initialConnectionWindow > 0) { + if (!initialWindowSent && ctx.channel().isActive()) { Http2Stream connectionStream = connection().connectionStream(); int currentSize = connection().local().flowController().windowSize(connectionStream); int delta = initialConnectionWindow - currentSize; decoder().flowController().incrementWindowSize(connectionStream, delta); - initialConnectionWindow = -1; + initialWindowSent = true; ctx.flush(); } } @@ -118,6 +131,7 @@ void setAutoTuneFlowControl(boolean isOn) { final class FlowControlPinger { private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; + private final int maxAllowedPing; private int pingCount; private int pingReturn; private boolean pinging; @@ -125,6 +139,11 @@ final class FlowControlPinger { private float lastBandwidth; // bytes per second private long lastPingTime; + public FlowControlPinger(int maxAllowedPing) { + checkArgument(maxAllowedPing > 0, "maxAllowedPing must be positive"); + this.maxAllowedPing = maxAllowedPing; + } + public long payload() { return BDP_MEASUREMENT_PING; } @@ -137,7 +156,7 @@ public void onDataRead(int dataLength, int paddingLength) { if (!autoTuneFlowControlOn) { return; } - if (!isPinging()) { + if (!isPinging() && pingCountingListener.pingCount < maxAllowedPing) { setPinging(true); sendPing(ctx()); } @@ -168,7 +187,6 @@ public void updateWindow() throws Http2Exception { settings.initialWindowSize(targetWindow); frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); } - } private boolean isPinging() { @@ -216,4 +234,28 @@ void setDataSizeAndSincePing(int dataSize) { lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1); } } + + private static class PingCountingListener extends Http2OutboundFrameListener { + int pingCount = 0; + + @Override + public void onWindowUpdate(int streamId, int windowSizeIncrement) { + pingCount = 0; + super.onWindowUpdate(streamId, windowSizeIncrement); + } + + @Override + public void onPing(boolean ack, long data) { + if (!ack) { + pingCount++; + } + super.onPing(ack, data); + } + + @Override + public void onData(int streamId, ByteBuf data, int padding, boolean endStream) { + pingCount = 0; + super.onData(streamId, data, padding, endStream); + } + } } diff --git a/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java b/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java deleted file mode 100644 index ed6a552f8b6..00000000000 --- a/netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2016 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.netty; - -import com.google.common.annotations.VisibleForTesting; -import io.grpc.Internal; - -/** - * Controlled accessor to {@link NettyHandlerSettings}. - */ -@VisibleForTesting // Visible for tests in other packages. -@Internal -public final class InternalHandlerSettings { - - public static void enable(boolean enable) { - NettyHandlerSettings.enable(enable); - } - - public static synchronized void autoWindowOn(boolean autoFlowControl) { - NettyHandlerSettings.autoWindowOn(autoFlowControl); - } - - public static synchronized int getLatestClientWindow() { - return NettyHandlerSettings.getLatestClientWindow(); - } - - public static synchronized int getLatestServerWindow() { - return NettyHandlerSettings.getLatestServerWindow(); - } -} diff --git a/netty/src/main/java/io/grpc/netty/ListeningEncoder.java b/netty/src/main/java/io/grpc/netty/ListeningEncoder.java new file mode 100644 index 00000000000..03270d399ae --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/ListeningEncoder.java @@ -0,0 +1,136 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.StreamBufferingEncoder; + +/** A ListeningEncoder notifies {@link Http2OutboundFrameListener} on http2 outbound frame event. */ +interface ListeningEncoder { + + void setListener(Http2OutboundFrameListener listener); + + /** + * Partial implementation of (Listening subset of event) event listener for outbound http2 + * frames. + */ + class Http2OutboundFrameListener { + + /** Notifies on outbound WINDOW_UPDATE frame. */ + public void onWindowUpdate(int streamId, int windowSizeIncrement) {} + + /** Notifies on outbound PING frame. */ + public void onPing(boolean ack, long data) {} + + /** Notifies on outbound DATA frame. */ + public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {} + } + + /** A {@link StreamBufferingEncoder} notifies http2 outbound frame event. */ + final class ListeningStreamBufferingEncoder + extends StreamBufferingEncoder implements ListeningEncoder { + + private Http2OutboundFrameListener listener = new Http2OutboundFrameListener(); + + public ListeningStreamBufferingEncoder(Http2ConnectionEncoder encoder) { + super(encoder); + } + + @Override + public void setListener(Http2OutboundFrameListener listener) { + this.listener = checkNotNull(listener, "listener"); + } + + @Override + public ChannelFuture writePing( + ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { + listener.onPing(ack, data); + return super.writePing(ctx, ack, data, promise); + } + + @Override + public ChannelFuture writeWindowUpdate( + ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { + listener.onWindowUpdate(streamId, windowSizeIncrement); + return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); + } + + @Override + public ChannelFuture writeData( + ChannelHandlerContext ctx, + int streamId, + ByteBuf data, + int padding, + boolean eos, + ChannelPromise promise) { + listener.onData(streamId, data, padding, eos); + return super.writeData(ctx, streamId, data, padding, eos, promise); + } + } + + /** A {@link DefaultHttp2ConnectionEncoder} notifies http2 outbound frame event. */ + final class ListeningDefaultHttp2ConnectionEncoder + extends DefaultHttp2ConnectionEncoder implements ListeningEncoder { + + private Http2OutboundFrameListener listener = new Http2OutboundFrameListener(); + + public ListeningDefaultHttp2ConnectionEncoder( + Http2Connection connection, Http2FrameWriter frameWriter) { + super(connection, frameWriter); + } + + @Override + public void setListener(Http2OutboundFrameListener listener) { + this.listener = checkNotNull(listener, "listener"); + } + + @Override + public ChannelFuture writePing( + ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { + listener.onPing(ack, data); + return super.writePing(ctx, ack, data, promise); + } + + @Override + public ChannelFuture writeWindowUpdate( + ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { + listener.onWindowUpdate(streamId, windowSizeIncrement); + return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); + } + + @Override + public ChannelFuture writeData( + ChannelHandlerContext ctx, + int streamId, + ByteBuf data, + int padding, + boolean eos, + ChannelPromise promise) { + listener.onData(streamId, data, padding, eos); + return super.writeData(ctx, streamId, data, padding, eos, promise); + } + } +} diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 85f06119497..31c5d64221d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -84,6 +84,7 @@ public final class NettyChannelBuilder private ChannelFactory channelFactory = DEFAULT_CHANNEL_FACTORY; private ObjectPool eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL; private SslContext sslContext; + private boolean autoFlowControl; private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED; @@ -247,12 +248,26 @@ public NettyChannelBuilder sslContext(SslContext sslContext) { } /** - * Sets the flow control window in bytes. If not called, the default value - * is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). + * Sets the initial flow control window in bytes. Setting initial flow control window enables auto + * flow control tuning using bandwidth-delay product algorithm. To disable auto flow control + * tuning, use {@link #flowControlWindow(int)}. + */ + public NettyChannelBuilder initialFlowControlWindow(int initialFlowControlWindow) { + checkArgument(initialFlowControlWindow > 0, "initialFlowControlWindow must be positive"); + this.flowControlWindow = initialFlowControlWindow; + this.autoFlowControl = true; + return this; + } + + /** + * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control + * tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not + * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). */ public NettyChannelBuilder flowControlWindow(int flowControlWindow) { checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); this.flowControlWindow = flowControlWindow; + this.autoFlowControl = false; return this; } @@ -405,7 +420,7 @@ protected ClientTransportFactory buildTransportFactory() { return new NettyTransportFactory( negotiator, channelFactory, channelOptions, - eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(), + eventLoopGroupPool, autoFlowControl, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory, localSocketPicker, useGetForSafeMethods); } @@ -521,6 +536,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto private final Map, ?> channelOptions; private final ObjectPool groupPool; private final EventLoopGroup group; + private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -536,7 +552,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto NettyTransportFactory(ProtocolNegotiator protocolNegotiator, ChannelFactory channelFactory, Map, ?> channelOptions, ObjectPool groupPool, - int flowControlWindow, int maxMessageSize, int maxHeaderListSize, + boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, boolean useGetForSafeMethods) { @@ -545,6 +561,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto this.channelOptions = new HashMap, Object>(channelOptions); this.groupPool = groupPool; this.group = groupPool.getObject(); + this.autoFlowControl = autoFlowControl; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -584,7 +601,7 @@ public void run() { // TODO(carl-mastrangelo): Pass channelLogger in. NettyClientTransport transport = new NettyClientTransport( serverAddress, channelFactory, channelOptions, group, - localNegotiator, flowControlWindow, + localNegotiator, autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(), diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 677358f4fef..f02b885c2b0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -38,6 +38,7 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; +import io.grpc.netty.ListeningEncoder.ListeningStreamBufferingEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -57,6 +58,7 @@ import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionDecoder; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FlowController; @@ -131,6 +133,7 @@ protected void handleNotInUse() { static NettyClientHandler newHandler( ClientTransportLifecycleManager lifecycleManager, @Nullable KeepAliveManager keepAliveManager, + boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, Supplier stopwatchFactory, @@ -155,6 +158,7 @@ static NettyClientHandler newHandler( frameWriter, lifecycleManager, keepAliveManager, + autoFlowControl, flowControlWindow, maxHeaderListSize, stopwatchFactory, @@ -171,6 +175,7 @@ static NettyClientHandler newHandler( Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, + boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, Supplier stopwatchFactory, @@ -192,8 +197,9 @@ static NettyClientHandler newHandler( frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); - StreamBufferingEncoder encoder = new StreamBufferingEncoder( - new DefaultHttp2ConnectionEncoder(connection, frameWriter)); + StreamBufferingEncoder encoder = + new ListeningStreamBufferingEncoder( + new DefaultHttp2ConnectionEncoder(connection, frameWriter)); // Create the local flow controller configured to auto-refill the connection window. connection.local().flowController( @@ -230,12 +236,13 @@ public TransportTracer.FlowControlWindows read() { tooManyPingsRunnable, transportTracer, eagAttributes, - authority); + authority, + autoFlowControl); } private NettyClientHandler( Http2ConnectionDecoder decoder, - StreamBufferingEncoder encoder, + Http2ConnectionEncoder encoder, Http2Settings settings, ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, @@ -243,8 +250,9 @@ private NettyClientHandler( final Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, - String authority) { - super(/* channelUnused= */ null, decoder, encoder, settings); + String authority, + boolean autoFlowControl) { + super(/* channelUnused= */ null, decoder, encoder, settings, autoFlowControl); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 81831ac918c..bd00da98384 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -79,6 +79,7 @@ class NettyClientTransport implements ConnectionClientTransport { private final String authorityString; private final AsciiString authority; private final AsciiString userAgent; + private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -106,8 +107,9 @@ class NettyClientTransport implements ConnectionClientTransport { NettyClientTransport( SocketAddress address, ChannelFactory channelFactory, Map, ?> channelOptions, EventLoopGroup group, - ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, - int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, + ProtocolNegotiator negotiator, boolean autoFlowControl, int flowControlWindow, + int maxMessageSize, int maxHeaderListSize, + long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, LocalSocketPicker localSocketPicker, ChannelLogger channelLogger, @@ -118,6 +120,7 @@ class NettyClientTransport implements ConnectionClientTransport { this.group = Preconditions.checkNotNull(group, "group"); this.channelFactory = channelFactory; this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions"); + this.autoFlowControl = autoFlowControl; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -214,6 +217,7 @@ public Runnable start(Listener transportListener) { handler = NettyClientHandler.newHandler( lifecycleManager, keepAliveManager, + autoFlowControl, flowControlWindow, maxHeaderListSize, GrpcUtil.STOPWATCH_SUPPLIER, @@ -221,7 +225,6 @@ public Runnable start(Listener transportListener) { transportTracer, eagAttributes, authorityString); - NettyHandlerSettings.setAutoWindow(handler); ChannelHandler negotiationHandler = negotiator.newHandler(handler); diff --git a/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java b/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java deleted file mode 100644 index a977bb7049f..00000000000 --- a/netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2016 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.netty; - -import com.google.common.base.Preconditions; - -/** - * Allows autoFlowControl to be turned on and off from interop testing and flow control windows to - * be accessed. - */ -final class NettyHandlerSettings { - - private static volatile boolean enabled; - - private static boolean autoFlowControlOn; - // These will be the most recently created handlers created using NettyClientTransport and - // NettyServerTransport - private static AbstractNettyHandler clientHandler; - private static AbstractNettyHandler serverHandler; - - static void setAutoWindow(AbstractNettyHandler handler) { - if (!enabled) { - return; - } - synchronized (NettyHandlerSettings.class) { - handler.setAutoTuneFlowControl(autoFlowControlOn); - if (handler instanceof NettyClientHandler) { - clientHandler = handler; - } else if (handler instanceof NettyServerHandler) { - serverHandler = handler; - } else { - throw new RuntimeException("Expecting NettyClientHandler or NettyServerHandler"); - } - } - } - - public static void enable(boolean enable) { - enabled = enable; - } - - public static synchronized void autoWindowOn(boolean autoFlowControl) { - autoFlowControlOn = autoFlowControl; - } - - public static synchronized int getLatestClientWindow() { - return getLatestWindow(clientHandler); - } - - public static synchronized int getLatestServerWindow() { - return getLatestWindow(serverHandler); - } - - private static synchronized int getLatestWindow(AbstractNettyHandler handler) { - Preconditions.checkNotNull(handler); - return handler.decoder().flowController() - .initialWindowSize(handler.connection().connectionStream()); - } -} diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index da80a79e481..0cd4e749af5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -78,6 +78,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private EventLoopGroup workerGroup; private ServerListener listener; private Channel channel; + private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -106,7 +107,8 @@ class NettyServer implements InternalServer, InternalWithLogId { ProtocolNegotiator protocolNegotiator, List streamTracerFactories, TransportTracer.Factory transportTracerFactory, - int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, + int maxStreamsPerConnection, boolean autoFlowControl, int flowControlWindow, + int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, @@ -127,6 +129,7 @@ class NettyServer implements InternalServer, InternalWithLogId { this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); this.transportTracerFactory = transportTracerFactory; this.maxStreamsPerConnection = maxStreamsPerConnection; + this.autoFlowControl = autoFlowControl; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -205,6 +208,7 @@ public void initChannel(Channel ch) { streamTracerFactories, transportTracerFactory.create(), maxStreamsPerConnection, + autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index bb10b2189de..73d3e964364 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -94,6 +94,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder 0, "initialFlowControlWindow must be positive"); + this.flowControlWindow = initialFlowControlWindow; + this.autoFlowControl = true; + return this; + } + + /** + * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control + * tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not + * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). */ public NettyServerBuilder flowControlWindow(int flowControlWindow) { checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); this.flowControlWindow = flowControlWindow; + this.autoFlowControl = false; return this; } @@ -564,8 +579,9 @@ protected List buildTransportServers( listenAddress, channelFactory, channelOptions, childChannelOptions, bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, - flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, - keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, + autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, + keepAliveTimeInNanos, keepAliveTimeoutInNanos, + maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz()); transportServers.add(transportServer); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index d0354e5e2a6..b40ee7bd453 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -45,6 +45,7 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; +import io.grpc.netty.ListeningEncoder.ListeningDefaultHttp2ConnectionEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelFuture; @@ -54,7 +55,6 @@ import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; -import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Headers; @@ -139,6 +139,7 @@ static NettyServerHandler newHandler( List streamTracerFactories, TransportTracer transportTracer, int maxStreams, + boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, @@ -165,6 +166,7 @@ static NettyServerHandler newHandler( streamTracerFactories, transportTracer, maxStreams, + autoFlowControl, flowControlWindow, maxHeaderListSize, maxMessageSize, @@ -186,6 +188,7 @@ static NettyServerHandler newHandler( List streamTracerFactories, TransportTracer transportTracer, int maxStreams, + boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, @@ -217,7 +220,8 @@ static NettyServerHandler newHandler( connection.local().flowController( new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); - Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); + Http2ConnectionEncoder encoder = + new ListeningDefaultHttp2ConnectionEncoder(connection, frameWriter); encoder = new Http2ControlFrameLimitEncoder(encoder, 10000); Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); @@ -238,7 +242,8 @@ static NettyServerHandler newHandler( keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, - keepAliveEnforcer); + keepAliveEnforcer, + autoFlowControl); } private NettyServerHandler( @@ -256,8 +261,9 @@ private NettyServerHandler( long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, - final KeepAliveEnforcer keepAliveEnforcer) { - super(channelUnused, decoder, encoder, settings); + final KeepAliveEnforcer keepAliveEnforcer, + boolean autoFlowControl) { + super(channelUnused, decoder, encoder, settings, autoFlowControl); final MaxConnectionIdleManager maxConnectionIdleManager; if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 82ed972552f..3638dd5b0d6 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -64,6 +64,7 @@ class NettyServerTransport implements ServerTransport { private NettyServerHandler grpcHandler; private ServerTransportListener listener; private boolean terminated; + private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -84,6 +85,7 @@ class NettyServerTransport implements ServerTransport { List streamTracerFactories, TransportTracer transportTracer, int maxStreams, + boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, @@ -101,6 +103,7 @@ class NettyServerTransport implements ServerTransport { Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); this.maxStreams = maxStreams; + this.autoFlowControl = autoFlowControl; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -121,7 +124,6 @@ public void start(ServerTransportListener listener) { // Create the Netty handler for the pipeline. grpcHandler = createHandler(listener, channelUnused); - NettyHandlerSettings.setAutoWindow(grpcHandler); // Notify when the channel closes. final class TerminationNotifier implements ChannelFutureListener { @@ -258,6 +260,7 @@ private NettyServerHandler createHandler( streamTracerFactories, transportTracer, maxStreams, + autoFlowControl, flowControlWindow, maxHeaderListSize, maxMessageSize, diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index c5f81325bd5..f7e5f2c94e8 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -790,6 +790,7 @@ public Stopwatch get() { frameWriter(), lifecycleManager, mockKeepAliveManager, + false, flowControlWindow, maxHeaderListSize, stopwatchSupplier, diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 48cd0603672..8762f0b659b 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -191,7 +191,7 @@ public void setSoLingerChannelOption() throws IOException { channelOptions.put(ChannelOption.SO_LINGER, soLinger); NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group, - newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, + newNegotiator(), false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), new FakeChannelLogger(), false); @@ -437,7 +437,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(CantConstructChannel.class), new HashMap, Object>(), group, - newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, + newNegotiator(), false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), new FakeChannelLogger(), false); @@ -752,7 +752,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max } NettyClientTransport transport = new NettyClientTransport( address, channelFactory, new HashMap, Object>(), group, - negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, + negotiator, false, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, new TransportTracer(), eagAttributes, new SocketPicker(), new FakeChannelLogger(), false); @@ -774,6 +774,7 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr Collections.emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, + false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, MAX_CONNECTION_IDLE_NANOS_DISABLED, diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java index fab3c767923..aab000b9279 100644 --- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java +++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java @@ -101,6 +101,7 @@ protected void manualSetUp() throws Exception {} protected final TransportTracer transportTracer = new TransportTracer(); protected int flowControlWindow = DEFAULT_WINDOW_SIZE; + protected boolean autoFlowControl = false; private final FakeClock fakeClock = new FakeClock(); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 453d8222f57..cf28cf79315 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -1116,6 +1116,7 @@ protected NettyServerHandler newHandler() { Arrays.asList(streamTracerFactory), transportTracer, maxConcurrentStreams, + autoFlowControl, flowControlWindow, maxHeaderListSize, DEFAULT_MAX_MESSAGE_SIZE, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index 0e19b0875e4..3413503b8e2 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -100,6 +100,7 @@ class TestProtocolNegotiator implements ProtocolNegotiator { Collections.emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore + false, // ignore 1, // ignore 1, // ignore 1, // ignore @@ -146,6 +147,7 @@ public void getPort_notStarted() { Collections.emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore + false, // ignore 1, // ignore 1, // ignore 1, // ignore @@ -186,6 +188,7 @@ public void childChannelOptions() throws Exception { Collections.emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore + false, // ignore 1, // ignore 1, // ignore 1, // ignore @@ -238,6 +241,7 @@ public void channelzListenSocket() throws Exception { Collections.emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore + false, // ignore 1, // ignore 1, // ignore 1, // ignore