Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty: allow to use bandwidth delay product #6979

Merged
merged 10 commits into from May 1, 2020
Expand Up @@ -1907,7 +1907,7 @@ protected int operationTimeoutMillis() {
* Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is
* chosen as a maximum amount of memory a large test would need.
*/
private static void assumeEnoughMemory() {
protected static void assumeEnoughMemory() {
Runtime r = Runtime.getRuntime();
long usedMem = r.totalMemory() - r.freeMemory();
long actuallyFreeMemory = r.maxMemory() - usedMem;
Expand Down
Expand Up @@ -18,23 +18,15 @@

import io.grpc.ManagedChannel;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.netty.InternalHandlerSettings;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class AutoWindowSizingOnTest extends AbstractInteropTest {

@BeforeClass
public static void turnOnAutoWindow() {
InternalHandlerSettings.enable(true);
InternalHandlerSettings.autoWindowOn(true);
}

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
Expand All @@ -45,7 +37,8 @@ protected AbstractServerImplBuilder<?> getServerBuilder() {
protected ManagedChannel createChannel() {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
.negotiationType(NegotiationType.PLAINTEXT)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.initialFlowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW);
// Disable the default census stats interceptor, use testing interceptor instead.
io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false);
return builder.intercept(createCensusStatsClientInterceptor()).build();
Expand Down
Expand Up @@ -24,14 +24,21 @@
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.netty.InternalHandlerSettings;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory;
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -40,10 +47,11 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -63,9 +71,11 @@ public class NettyFlowControlTest {
private static final int REGULAR_WINDOW = 64 * 1024;
private static final int MAX_WINDOW = 8 * 1024 * 1024;

private static ManagedChannel channel;
private static Server server;
private static TrafficControlProxy proxy;
private final CapturingProtocolNegotiationFactory capturingPnFactory
= new CapturingProtocolNegotiationFactory();
private ManagedChannel channel;
private Server server;
private TrafficControlProxy proxy;

private int proxyPort;
private int serverPort;
Expand All @@ -74,11 +84,6 @@ public class NettyFlowControlTest {
new ThreadPoolExecutor(1, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("flowcontrol-test-pool", true));

@BeforeClass
public static void setUp() {
InternalHandlerSettings.enable(true);
InternalHandlerSettings.autoWindowOn(true);
}

@AfterClass
public static void shutDownTests() {
Expand All @@ -93,16 +98,21 @@ public void initTest() {

@After
public void endTest() throws IOException {
proxy.shutDown();
server.shutdown();
if (proxy != null) {
proxy.shutDown();
}
server.shutdownNow();
if (channel != null) {
channel.shutdownNow();
}
}

@Test
public void largeBdp() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(REGULAR_WINDOW);
createAndStartChannel(REGULAR_WINDOW);
doTest(HIGH_BAND, MED_LAT);
}

Expand All @@ -111,16 +121,17 @@ public void smallBdp() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(REGULAR_WINDOW);
createAndStartChannel(REGULAR_WINDOW);
doTest(LOW_BAND, MED_LAT);
}

@Test
@Ignore("enable once 2 pings between data is no longer necessary")
public void verySmallWindowMakesProgress() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(TINY_WINDOW);
createAndStartChannel(TINY_WINDOW);
doTest(HIGH_BAND, MED_LAT);
}

Expand All @@ -142,9 +153,10 @@ private void doTest(int bandwidth, int latency) throws InterruptedException {
.addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2));
StreamingOutputCallRequest request = builder.build();

TestStreamObserver observer = new TestStreamObserver(expectedWindow);
TestStreamObserver observer =
new TestStreamObserver(capturingPnFactory.grpcHandlerRef, expectedWindow);
stub.streamingOutputCall(request, observer);
int lastWindow = observer.waitFor();
int lastWindow = observer.waitFor(5, TimeUnit.SECONDS);

// deal with cases that either don't cause a window update or hit max window
expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW));
Expand All @@ -160,24 +172,21 @@ private void doTest(int bandwidth, int latency) throws InterruptedException {
/**
* Resets client/server and their flow control windows.
*/
private void resetConnection(int clientFlowControlWindow)
throws InterruptedException {
if (channel != null) {
if (!channel.isShutdown()) {
channel.shutdown();
channel.awaitTermination(100, TimeUnit.MILLISECONDS);
}
}
channel = NettyChannelBuilder.forAddress(new InetSocketAddress("localhost", proxyPort))
.flowControlWindow(clientFlowControlWindow)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
private void createAndStartChannel(int clientFlowControlWindow) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder
.forAddress(new InetSocketAddress("localhost", proxyPort))
.initialFlowControlWindow(clientFlowControlWindow)
.negotiationType(NegotiationType.PLAINTEXT);
InternalNettyChannelBuilder.setProtocolNegotiatorFactory(channelBuilder, capturingPnFactory);
channel = channelBuilder.build();
}

private void startServer(int serverFlowControlWindow) {
ServerBuilder<?> builder =
NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0))
.flowControlWindow(serverFlowControlWindow);
NettyServerBuilder
.forAddress(new InetSocketAddress("localhost", 0))
.initialFlowControlWindow(serverFlowControlWindow);
builder.addService(ServerInterceptors.intercept(
new TestServiceImpl(Executors.newScheduledThreadPool(2)),
ImmutableList.<ServerInterceptor>of()));
Expand All @@ -193,20 +202,25 @@ private void startServer(int serverFlowControlWindow) {
*/
private static class TestStreamObserver implements StreamObserver<StreamingOutputCallResponse> {

long startRequestNanos;
final AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef;
final long startRequestNanos;
long endRequestNanos;
private final CountDownLatch latch = new CountDownLatch(1);
long expectedWindow;
final CountDownLatch latch = new CountDownLatch(1);
final long expectedWindow;
int lastWindow;

public TestStreamObserver(long window) {
public TestStreamObserver(
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
this.grpcHandlerRef = grpcHandlerRef;
startRequestNanos = System.nanoTime();
expectedWindow = window;
}

@Override
public void onNext(StreamingOutputCallResponse value) {
lastWindow = InternalHandlerSettings.getLatestClientWindow();
GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get();
Http2Stream connectionStream = grpcHandler.connection().connectionStream();
lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
if (lastWindow >= expectedWindow) {
onCompleted();
}
Expand All @@ -227,9 +241,40 @@ public long getElapsedTime() {
return endRequestNanos - startRequestNanos;
}

public int waitFor() throws InterruptedException {
latch.await();
public int waitFor(long duration, TimeUnit unit) throws InterruptedException {
latch.await(duration, unit);
return lastWindow;
}
}

private static class CapturingProtocolNegotiationFactory implements ProtocolNegotiatorFactory {

AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef = new AtomicReference<>();

@Override
public ProtocolNegotiator buildProtocolNegotiator() {
return new CapturingProtocolNegotiator();
}

private class CapturingProtocolNegotiator implements ProtocolNegotiator {

final ProtocolNegotiator delegate = InternalProtocolNegotiators.plaintext();

@Override
public AsciiString scheme() {
return delegate.scheme();
}

@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
CapturingProtocolNegotiationFactory.this.grpcHandlerRef.set(grpcHandler);
return delegate.newHandler(grpcHandler);
}

@Override
public void close() {
delegate.close();
}
}
}
}