Skip to content

Commit

Permalink
netty: allow to use bandwidth delay product (grpc#6979)
Browse files Browse the repository at this point in the history
  • Loading branch information
creamsoup authored and dfawley committed Jan 15, 2021
1 parent 2cc4e14 commit d90b254
Show file tree
Hide file tree
Showing 19 changed files with 365 additions and 200 deletions.
Expand Up @@ -1907,7 +1907,7 @@ protected int operationTimeoutMillis() {
* Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is
* chosen as a maximum amount of memory a large test would need.
*/
private static void assumeEnoughMemory() {
protected static void assumeEnoughMemory() {
Runtime r = Runtime.getRuntime();
long usedMem = r.totalMemory() - r.freeMemory();
long actuallyFreeMemory = r.maxMemory() - usedMem;
Expand Down
Expand Up @@ -18,23 +18,15 @@

import io.grpc.ManagedChannel;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.netty.InternalHandlerSettings;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class AutoWindowSizingOnTest extends AbstractInteropTest {

@BeforeClass
public static void turnOnAutoWindow() {
InternalHandlerSettings.enable(true);
InternalHandlerSettings.autoWindowOn(true);
}

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
Expand All @@ -45,7 +37,8 @@ protected AbstractServerImplBuilder<?> getServerBuilder() {
protected ManagedChannel createChannel() {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
.negotiationType(NegotiationType.PLAINTEXT)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.initialFlowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW);
// Disable the default census stats interceptor, use testing interceptor instead.
io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false);
return builder.intercept(createCensusStatsClientInterceptor()).build();
Expand Down
Expand Up @@ -24,14 +24,21 @@
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.netty.InternalHandlerSettings;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory;
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -40,10 +47,11 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -63,9 +71,11 @@ public class NettyFlowControlTest {
private static final int REGULAR_WINDOW = 64 * 1024;
private static final int MAX_WINDOW = 8 * 1024 * 1024;

private static ManagedChannel channel;
private static Server server;
private static TrafficControlProxy proxy;
private final CapturingProtocolNegotiationFactory capturingPnFactory
= new CapturingProtocolNegotiationFactory();
private ManagedChannel channel;
private Server server;
private TrafficControlProxy proxy;

private int proxyPort;
private int serverPort;
Expand All @@ -74,11 +84,6 @@ public class NettyFlowControlTest {
new ThreadPoolExecutor(1, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("flowcontrol-test-pool", true));

@BeforeClass
public static void setUp() {
InternalHandlerSettings.enable(true);
InternalHandlerSettings.autoWindowOn(true);
}

@AfterClass
public static void shutDownTests() {
Expand All @@ -93,16 +98,21 @@ public void initTest() {

@After
public void endTest() throws IOException {
proxy.shutDown();
server.shutdown();
if (proxy != null) {
proxy.shutDown();
}
server.shutdownNow();
if (channel != null) {
channel.shutdownNow();
}
}

@Test
public void largeBdp() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(REGULAR_WINDOW);
createAndStartChannel(REGULAR_WINDOW);
doTest(HIGH_BAND, MED_LAT);
}

Expand All @@ -111,16 +121,17 @@ public void smallBdp() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(REGULAR_WINDOW);
createAndStartChannel(REGULAR_WINDOW);
doTest(LOW_BAND, MED_LAT);
}

@Test
@Ignore("enable once 2 pings between data is no longer necessary")
public void verySmallWindowMakesProgress() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(TINY_WINDOW);
createAndStartChannel(TINY_WINDOW);
doTest(HIGH_BAND, MED_LAT);
}

Expand All @@ -142,9 +153,10 @@ private void doTest(int bandwidth, int latency) throws InterruptedException {
.addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2));
StreamingOutputCallRequest request = builder.build();

TestStreamObserver observer = new TestStreamObserver(expectedWindow);
TestStreamObserver observer =
new TestStreamObserver(capturingPnFactory.grpcHandlerRef, expectedWindow);
stub.streamingOutputCall(request, observer);
int lastWindow = observer.waitFor();
int lastWindow = observer.waitFor(5, TimeUnit.SECONDS);

// deal with cases that either don't cause a window update or hit max window
expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW));
Expand All @@ -160,24 +172,21 @@ private void doTest(int bandwidth, int latency) throws InterruptedException {
/**
* Resets client/server and their flow control windows.
*/
private void resetConnection(int clientFlowControlWindow)
throws InterruptedException {
if (channel != null) {
if (!channel.isShutdown()) {
channel.shutdown();
channel.awaitTermination(100, TimeUnit.MILLISECONDS);
}
}
channel = NettyChannelBuilder.forAddress(new InetSocketAddress("localhost", proxyPort))
.flowControlWindow(clientFlowControlWindow)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
private void createAndStartChannel(int clientFlowControlWindow) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder
.forAddress(new InetSocketAddress("localhost", proxyPort))
.initialFlowControlWindow(clientFlowControlWindow)
.negotiationType(NegotiationType.PLAINTEXT);
InternalNettyChannelBuilder.setProtocolNegotiatorFactory(channelBuilder, capturingPnFactory);
channel = channelBuilder.build();
}

private void startServer(int serverFlowControlWindow) {
ServerBuilder<?> builder =
NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0))
.flowControlWindow(serverFlowControlWindow);
NettyServerBuilder
.forAddress(new InetSocketAddress("localhost", 0))
.initialFlowControlWindow(serverFlowControlWindow);
builder.addService(ServerInterceptors.intercept(
new TestServiceImpl(Executors.newScheduledThreadPool(2)),
ImmutableList.<ServerInterceptor>of()));
Expand All @@ -193,20 +202,25 @@ private void startServer(int serverFlowControlWindow) {
*/
private static class TestStreamObserver implements StreamObserver<StreamingOutputCallResponse> {

long startRequestNanos;
final AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef;
final long startRequestNanos;
long endRequestNanos;
private final CountDownLatch latch = new CountDownLatch(1);
long expectedWindow;
final CountDownLatch latch = new CountDownLatch(1);
final long expectedWindow;
int lastWindow;

public TestStreamObserver(long window) {
public TestStreamObserver(
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
this.grpcHandlerRef = grpcHandlerRef;
startRequestNanos = System.nanoTime();
expectedWindow = window;
}

@Override
public void onNext(StreamingOutputCallResponse value) {
lastWindow = InternalHandlerSettings.getLatestClientWindow();
GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get();
Http2Stream connectionStream = grpcHandler.connection().connectionStream();
lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
if (lastWindow >= expectedWindow) {
onCompleted();
}
Expand All @@ -227,9 +241,40 @@ public long getElapsedTime() {
return endRequestNanos - startRequestNanos;
}

public int waitFor() throws InterruptedException {
latch.await();
public int waitFor(long duration, TimeUnit unit) throws InterruptedException {
latch.await(duration, unit);
return lastWindow;
}
}

private static class CapturingProtocolNegotiationFactory implements ProtocolNegotiatorFactory {

AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef = new AtomicReference<>();

@Override
public ProtocolNegotiator buildProtocolNegotiator() {
return new CapturingProtocolNegotiator();
}

private class CapturingProtocolNegotiator implements ProtocolNegotiator {

final ProtocolNegotiator delegate = InternalProtocolNegotiators.plaintext();

@Override
public AsciiString scheme() {
return delegate.scheme();
}

@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
CapturingProtocolNegotiationFactory.this.grpcHandlerRef.set(grpcHandler);
return delegate.newHandler(grpcHandler);
}

@Override
public void close() {
delegate.close();
}
}
}
}

0 comments on commit d90b254

Please sign in to comment.