Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty: allow to use bandwidth delay product #6979

Merged
merged 10 commits into from May 1, 2020
Expand Up @@ -1907,7 +1907,7 @@ protected int operationTimeoutMillis() {
* Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is
* chosen as a maximum amount of memory a large test would need.
*/
private static void assumeEnoughMemory() {
protected static void assumeEnoughMemory() {
Runtime r = Runtime.getRuntime();
long usedMem = r.totalMemory() - r.freeMemory();
long actuallyFreeMemory = r.maxMemory() - usedMem;
Expand Down
Expand Up @@ -18,23 +18,15 @@

import io.grpc.ManagedChannel;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.netty.InternalHandlerSettings;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class AutoWindowSizingOnTest extends AbstractInteropTest {

@BeforeClass
public static void turnOnAutoWindow() {
InternalHandlerSettings.enable(true);
InternalHandlerSettings.autoWindowOn(true);
}

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
Expand All @@ -45,7 +37,8 @@ protected AbstractServerImplBuilder<?> getServerBuilder() {
protected ManagedChannel createChannel() {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
.negotiationType(NegotiationType.PLAINTEXT)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.initialFlowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW);
// Disable the default census stats interceptor, use testing interceptor instead.
io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false);
return builder.intercept(createCensusStatsClientInterceptor()).build();
Expand Down
Expand Up @@ -43,7 +43,7 @@
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -74,11 +74,6 @@ public class NettyFlowControlTest {
new ThreadPoolExecutor(1, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("flowcontrol-test-pool", true));

@BeforeClass
public static void setUp() {
InternalHandlerSettings.enable(true);
InternalHandlerSettings.autoWindowOn(true);
}

@AfterClass
public static void shutDownTests() {
Expand Down Expand Up @@ -116,6 +111,7 @@ public void smallBdp() throws InterruptedException, IOException {
}

@Test
@Ignore("enable once 2 pings between data is no longer necessary")
public void verySmallWindowMakesProgress() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
Expand Down Expand Up @@ -169,15 +165,16 @@ private void resetConnection(int clientFlowControlWindow)
}
}
channel = NettyChannelBuilder.forAddress(new InetSocketAddress("localhost", proxyPort))
.flowControlWindow(clientFlowControlWindow)
.initialFlowControlWindow(clientFlowControlWindow)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
}

private void startServer(int serverFlowControlWindow) {
ServerBuilder<?> builder =
NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0))
.flowControlWindow(serverFlowControlWindow);
NettyServerBuilder
.forAddress(new InetSocketAddress("localhost", 0))
.initialFlowControlWindow(serverFlowControlWindow);
builder.addService(ServerInterceptors.intercept(
new TestServiceImpl(Executors.newScheduledThreadPool(2)),
ImmutableList.<ServerInterceptor>of()));
Expand Down Expand Up @@ -228,7 +225,7 @@ public long getElapsedTime() {
}

public int waitFor() throws InterruptedException {
latch.await();
latch.await(5, TimeUnit.SECONDS);
return lastWindow;
}
}
Expand Down
62 changes: 54 additions & 8 deletions netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java
Expand Up @@ -16,9 +16,12 @@

package io.grpc.netty;

import static com.google.common.base.Preconditions.checkArgument;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.netty.ListeningEncoder.Http2OutboundFrameListener;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
Expand All @@ -35,18 +38,24 @@
*/
abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1;
private boolean autoTuneFlowControlOn = false;
private int initialConnectionWindow;
private static final int MAX_ALLOWED_PING = 2;

private final int initialConnectionWindow;
private final PingCountingListener pingCountingListener = new PingCountingListener();
private final FlowControlPinger flowControlPing = new FlowControlPinger(MAX_ALLOWED_PING);

private boolean autoTuneFlowControlOn;
private ChannelHandlerContext ctx;
private final FlowControlPinger flowControlPing = new FlowControlPinger();
private boolean initialWindowSent = false;

private static final long BDP_MEASUREMENT_PING = 1234;

AbstractNettyHandler(
ChannelPromise channelUnused,
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
Http2Settings initialSettings,
boolean autoFlowControl) {
super(channelUnused, decoder, encoder, initialSettings);

// During a graceful shutdown, wait until all streams are closed.
Expand All @@ -55,6 +64,10 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
// Extract the connection window from the settings if it was set.
this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
initialSettings.initialWindowSize();
this.autoTuneFlowControlOn = autoFlowControl;
if (encoder instanceof ListeningEncoder) {
((ListeningEncoder) encoder).setListener(pingCountingListener);
}
}

@Override
Expand Down Expand Up @@ -92,16 +105,20 @@ protected final ChannelHandlerContext ctx() {
* Sends initial connection window to the remote endpoint if necessary.
*/
private void sendInitialConnectionWindow() throws Http2Exception {
if (ctx.channel().isActive() && initialConnectionWindow > 0) {
if (!initialWindowSent && ctx.channel().isActive()) {
Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = initialConnectionWindow - currentSize;
decoder().flowController().incrementWindowSize(connectionStream, delta);
initialConnectionWindow = -1;
initialWindowSent = true;
ctx.flush();
}
}

boolean isAutoTuneFlowControlOn() {
return autoTuneFlowControlOn;
}

@VisibleForTesting
FlowControlPinger flowControlPing() {
return flowControlPing;
Expand All @@ -118,13 +135,19 @@ void setAutoTuneFlowControl(boolean isOn) {
final class FlowControlPinger {

private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
private final int maxAllowedPing;
private int pingCount;
private int pingReturn;
private boolean pinging;
private int dataSizeSincePing;
private float lastBandwidth; // bytes per second
private long lastPingTime;

public FlowControlPinger(int maxAllowedPing) {
checkArgument(maxAllowedPing > 0, "maxAllowedPing must be positive");
this.maxAllowedPing = maxAllowedPing;
}

public long payload() {
return BDP_MEASUREMENT_PING;
}
Expand All @@ -137,7 +160,7 @@ public void onDataRead(int dataLength, int paddingLength) {
if (!autoTuneFlowControlOn) {
return;
}
if (!isPinging()) {
if (!isPinging() && pingCountingListener.pingCount < maxAllowedPing) {
setPinging(true);
sendPing(ctx());
}
Expand Down Expand Up @@ -168,7 +191,6 @@ public void updateWindow() throws Http2Exception {
settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
}

}

private boolean isPinging() {
Expand Down Expand Up @@ -216,4 +238,28 @@ void setDataSizeAndSincePing(int dataSize) {
lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1);
}
}

private static class PingCountingListener extends Http2OutboundFrameListener {
int pingCount = 0;

@Override
public void onWindowUpdate(int streamId, int windowSizeIncrement) {
pingCount = 0;
super.onWindowUpdate(streamId, windowSizeIncrement);
}

@Override
public void onPing(boolean ack, long data) {
if (!ack) {
pingCount++;
}
super.onPing(ack, data);
}

@Override
public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {
pingCount = 0;
super.onData(streamId, data, padding, endStream);
}
}
}
Expand Up @@ -26,14 +26,6 @@
@Internal
public final class InternalHandlerSettings {

public static void enable(boolean enable) {
NettyHandlerSettings.enable(enable);
}

public static synchronized void autoWindowOn(boolean autoFlowControl) {
NettyHandlerSettings.autoWindowOn(autoFlowControl);
}

public static synchronized int getLatestClientWindow() {
return NettyHandlerSettings.getLatestClientWindow();
}
Expand Down
136 changes: 136 additions & 0 deletions netty/src/main/java/io/grpc/netty/ListeningEncoder.java
@@ -0,0 +1,136 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.netty;

import static com.google.common.base.Preconditions.checkNotNull;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.StreamBufferingEncoder;

/** A ListeningEncoder notifies {@link Http2OutboundFrameListener} on http2 outbound frame event. */
interface ListeningEncoder {

void setListener(Http2OutboundFrameListener listener);

/**
* Partial implementation of (Listening subset of event) event listener for outbound http2
* frames.
*/
class Http2OutboundFrameListener {

/** Notifies on outbound WINDOW_UPDATE frame. */
public void onWindowUpdate(int streamId, int windowSizeIncrement) {}

/** Notifies on outbound PING frame. */
public void onPing(boolean ack, long data) {}

/** Notifies on outbound DATA frame. */
public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {}
}

/** A {@link StreamBufferingEncoder} notifies http2 outbound frame event. */
final class ListeningStreamBufferingEncoder
extends StreamBufferingEncoder implements ListeningEncoder {

private Http2OutboundFrameListener listener = new Http2OutboundFrameListener();

public ListeningStreamBufferingEncoder(Http2ConnectionEncoder encoder) {
super(encoder);
}

@Override
public void setListener(Http2OutboundFrameListener listener) {
this.listener = checkNotNull(listener, "listener");
}

@Override
public ChannelFuture writePing(
ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
listener.onPing(ack, data);
return super.writePing(ctx, ack, data, promise);
}

@Override
public ChannelFuture writeWindowUpdate(
ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
listener.onWindowUpdate(streamId, windowSizeIncrement);
return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}

@Override
public ChannelFuture writeData(
ChannelHandlerContext ctx,
int streamId,
ByteBuf data,
int padding,
boolean eos,
ChannelPromise promise) {
listener.onData(streamId, data, padding, eos);
return super.writeData(ctx, streamId, data, padding, eos, promise);
}
}

/** A {@link DefaultHttp2ConnectionEncoder} notifies http2 outbound frame event. */
final class ListeningDefaultHttp2ConnectionEncoder
extends DefaultHttp2ConnectionEncoder implements ListeningEncoder {

private Http2OutboundFrameListener listener = new Http2OutboundFrameListener();

public ListeningDefaultHttp2ConnectionEncoder(
Http2Connection connection, Http2FrameWriter frameWriter) {
super(connection, frameWriter);
}

@Override
public void setListener(Http2OutboundFrameListener listener) {
this.listener = checkNotNull(listener, "listener");
}

@Override
public ChannelFuture writePing(
ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
listener.onPing(ack, data);
return super.writePing(ctx, ack, data, promise);
}

@Override
public ChannelFuture writeWindowUpdate(
ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
listener.onWindowUpdate(streamId, windowSizeIncrement);
return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}

@Override
public ChannelFuture writeData(
ChannelHandlerContext ctx,
int streamId,
ByteBuf data,
int padding,
boolean eos,
ChannelPromise promise) {
listener.onData(streamId, data, padding, eos);
return super.writeData(ctx, streamId, data, padding, eos, promise);
}
}
}