Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty: allow to use bandwidth delay product #6979

Merged
merged 10 commits into from May 1, 2020
Expand Up @@ -24,14 +24,21 @@
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.netty.InternalHandlerSettings;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory;
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -40,6 +47,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -63,9 +71,11 @@ public class NettyFlowControlTest {
private static final int REGULAR_WINDOW = 64 * 1024;
private static final int MAX_WINDOW = 8 * 1024 * 1024;

private static ManagedChannel channel;
private static Server server;
private static TrafficControlProxy proxy;
private final CapturingProtocolNegotiationFactory capturingPnFactory
= new CapturingProtocolNegotiationFactory();
private ManagedChannel channel;
private Server server;
private TrafficControlProxy proxy;

private int proxyPort;
private int serverPort;
Expand All @@ -88,16 +98,21 @@ public void initTest() {

@After
public void endTest() throws IOException {
proxy.shutDown();
server.shutdown();
if (proxy != null) {
proxy.shutDown();
}
server.shutdownNow();
if (channel != null) {
channel.shutdownNow();
}
}

@Test
public void largeBdp() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(REGULAR_WINDOW);
createAndStartChannel(REGULAR_WINDOW);
doTest(HIGH_BAND, MED_LAT);
}

Expand All @@ -106,7 +121,7 @@ public void smallBdp() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(REGULAR_WINDOW);
createAndStartChannel(REGULAR_WINDOW);
doTest(LOW_BAND, MED_LAT);
}

Expand All @@ -116,7 +131,7 @@ public void verySmallWindowMakesProgress() throws InterruptedException, IOExcept
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start();
proxyPort = proxy.getPort();
resetConnection(TINY_WINDOW);
createAndStartChannel(TINY_WINDOW);
doTest(HIGH_BAND, MED_LAT);
}

Expand All @@ -138,9 +153,10 @@ private void doTest(int bandwidth, int latency) throws InterruptedException {
.addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2));
StreamingOutputCallRequest request = builder.build();

TestStreamObserver observer = new TestStreamObserver(expectedWindow);
TestStreamObserver observer =
new TestStreamObserver(capturingPnFactory.grpcHandlerRef, expectedWindow);
stub.streamingOutputCall(request, observer);
int lastWindow = observer.waitFor();
int lastWindow = observer.waitFor(5, TimeUnit.SECONDS);

// deal with cases that either don't cause a window update or hit max window
expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW));
Expand All @@ -156,18 +172,14 @@ private void doTest(int bandwidth, int latency) throws InterruptedException {
/**
* Resets client/server and their flow control windows.
*/
private void resetConnection(int clientFlowControlWindow)
throws InterruptedException {
if (channel != null) {
if (!channel.isShutdown()) {
channel.shutdown();
channel.awaitTermination(100, TimeUnit.MILLISECONDS);
}
}
channel = NettyChannelBuilder.forAddress(new InetSocketAddress("localhost", proxyPort))
.initialFlowControlWindow(clientFlowControlWindow)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
private void createAndStartChannel(int clientFlowControlWindow) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder
.forAddress(new InetSocketAddress("localhost", proxyPort))
.initialFlowControlWindow(clientFlowControlWindow)
.negotiationType(NegotiationType.PLAINTEXT);
InternalNettyChannelBuilder.setProtocolNegotiatorFactory(channelBuilder, capturingPnFactory);
channel = channelBuilder.build();
}

private void startServer(int serverFlowControlWindow) {
Expand All @@ -190,20 +202,25 @@ private void startServer(int serverFlowControlWindow) {
*/
private static class TestStreamObserver implements StreamObserver<StreamingOutputCallResponse> {

long startRequestNanos;
final AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef;
final long startRequestNanos;
long endRequestNanos;
private final CountDownLatch latch = new CountDownLatch(1);
long expectedWindow;
final CountDownLatch latch = new CountDownLatch(1);
final long expectedWindow;
int lastWindow;

public TestStreamObserver(long window) {
public TestStreamObserver(
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
this.grpcHandlerRef = grpcHandlerRef;
startRequestNanos = System.nanoTime();
expectedWindow = window;
}

@Override
public void onNext(StreamingOutputCallResponse value) {
lastWindow = InternalHandlerSettings.getLatestClientWindow();
GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get();
Http2Stream connectionStream = grpcHandler.connection().connectionStream();
lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
if (lastWindow >= expectedWindow) {
onCompleted();
}
Expand All @@ -224,9 +241,40 @@ public long getElapsedTime() {
return endRequestNanos - startRequestNanos;
}

public int waitFor() throws InterruptedException {
latch.await(5, TimeUnit.SECONDS);
public int waitFor(long duration, TimeUnit unit) throws InterruptedException {
latch.await(duration, unit);
return lastWindow;
}
}

private static class CapturingProtocolNegotiationFactory implements ProtocolNegotiatorFactory {

AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef = new AtomicReference<>();

@Override
public ProtocolNegotiator buildProtocolNegotiator() {
return new CapturingProtocolNegotiator();
}

private class CapturingProtocolNegotiator implements ProtocolNegotiator {

final ProtocolNegotiator delegate = InternalProtocolNegotiators.plaintext();

@Override
public AsciiString scheme() {
return delegate.scheme();
}

@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
CapturingProtocolNegotiationFactory.this.grpcHandlerRef.set(grpcHandler);
return delegate.newHandler(grpcHandler);
}

@Override
public void close() {
delegate.close();;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/;;/;/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops =p

}
}
}
}
36 changes: 0 additions & 36 deletions netty/src/main/java/io/grpc/netty/InternalHandlerSettings.java

This file was deleted.

2 changes: 0 additions & 2 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Expand Up @@ -225,7 +225,6 @@ public Runnable start(Listener transportListener) {
transportTracer,
eagAttributes,
authorityString);
NettyHandlerSettings.setAutoWindow(handler);

ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Expand Down Expand Up @@ -310,7 +309,6 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (keepAliveManager != null) {
keepAliveManager.onTransportStarted();
}
channel.closeFuture().addListener(NettyHandlerSettings.cleanUpTask());

return null;
}
Expand Down
79 changes: 0 additions & 79 deletions netty/src/main/java/io/grpc/netty/NettyHandlerSettings.java

This file was deleted.

6 changes: 1 addition & 5 deletions netty/src/main/java/io/grpc/netty/NettyServerTransport.java
Expand Up @@ -124,7 +124,6 @@ public void start(ServerTransportListener listener) {

// Create the Netty handler for the pipeline.
grpcHandler = createHandler(listener, channelUnused);
NettyHandlerSettings.setAutoWindow(grpcHandler);

// Notify when the channel closes.
final class TerminationNotifier implements ChannelFutureListener {
Expand All @@ -144,10 +143,7 @@ public void operationComplete(ChannelFuture future) throws Exception {

ChannelFutureListener terminationNotifier = new TerminationNotifier();
channelUnused.addListener(terminationNotifier);
channel
.closeFuture()
.addListener(terminationNotifier)
.addListener(NettyHandlerSettings.cleanUpTask());
channel.closeFuture().addListener(terminationNotifier);

channel.pipeline().addLast(bufferingHandler);
}
Expand Down