diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index b238b9237d6..8bfd50a2c2f 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; +import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO; import com.google.common.annotations.VisibleForTesting; @@ -488,8 +489,7 @@ public Runnable start(Listener listener) { synchronized (lock) { frameWriter = new ExceptionHandlingFrameWriter(OkHttpClientTransport.this, testFrameWriter, testFrameLogger); - outboundFlow = - new OutboundFlowController(OkHttpClientTransport.this, frameWriter, initialWindowSize); + outboundFlow = new OutboundFlowController(OkHttpClientTransport.this, frameWriter); } serializingExecutor.execute(new Runnable() { @Override @@ -515,7 +515,7 @@ public void run() { synchronized (lock) { frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter); - outboundFlow = new OutboundFlowController(this, frameWriter, initialWindowSize); + outboundFlow = new OutboundFlowController(this, frameWriter); } final CountDownLatch latch = new CountDownLatch(1); // Connecting in the serializingExecutor, so that some stream operations like synStream @@ -605,11 +605,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() }); // Schedule to send connection preface & settings before any other write. try { - synchronized (lock) { - frameWriter.connectionPreface(); - Settings settings = new Settings(); - frameWriter.settings(settings); - } + sendConnectionPrefaceAndSettings(); } finally { latch.countDown(); } @@ -629,6 +625,23 @@ public void run() { return null; } + /** + * Should only be called once when the transport is first established. + */ + @VisibleForTesting + void sendConnectionPrefaceAndSettings() { + synchronized (lock) { + frameWriter.connectionPreface(); + Settings settings = new Settings(); + OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize); + frameWriter.settings(settings); + if (initialWindowSize > DEFAULT_WINDOW_SIZE) { + frameWriter.windowUpdate( + Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE); + } + } + } + private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, String proxyUsername, String proxyPassword) throws StatusException { try { diff --git a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java index 441bb21151c..c935363213d 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java @@ -17,6 +17,7 @@ package io.grpc.okhttp; import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID; +import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; import static java.lang.Math.ceil; import static java.lang.Math.max; import static java.lang.Math.min; @@ -38,11 +39,11 @@ class OutboundFlowController { private final OutboundFlowState connectionState; OutboundFlowController( - OkHttpClientTransport transport, FrameWriter frameWriter, int initialWindowSize) { + OkHttpClientTransport transport, FrameWriter frameWriter) { this.transport = Preconditions.checkNotNull(transport, "transport"); this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter"); - this.initialWindowSize = initialWindowSize; - connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, initialWindowSize); + this.initialWindowSize = DEFAULT_WINDOW_SIZE; + connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE); } /** diff --git a/okhttp/src/main/java/io/grpc/okhttp/Utils.java b/okhttp/src/main/java/io/grpc/okhttp/Utils.java index 7ae35f9a379..2dc5f1e1ec9 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/Utils.java +++ b/okhttp/src/main/java/io/grpc/okhttp/Utils.java @@ -42,6 +42,7 @@ class Utils { * is sent to expand the window. */ static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f; + static final int DEFAULT_WINDOW_SIZE = 65535; static final int CONNECTION_STREAM_ID = 0; public static Metadata convertHeaders(List
http2Headers) { diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 769d0097f90..016fea83ea9 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -404,6 +404,36 @@ public void maxMessageSizeShouldBeEnforced() throws Exception { shutdownAndVerify(); } + @Test + public void includeInitialWindowSizeInFirstSettings() throws Exception { + int initialWindowSize = 65535; + startTransport( + DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, initialWindowSize, null); + clientTransport.sendConnectionPrefaceAndSettings(); + + ArgumentCaptor settings = ArgumentCaptor.forClass(Settings.class); + verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture()); + assertEquals(65535, settings.getValue().get(7)); + } + + /** + * A "large" window size is anything over 65535 (the starting size for any connection-level + * flow control value). + */ + @Test + public void includeInitialWindowSizeInFirstSettings_largeWindowSize() throws Exception { + int initialWindowSize = 75535; // 65535 + 10000 + startTransport( + DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, initialWindowSize, null); + clientTransport.sendConnectionPrefaceAndSettings(); + + ArgumentCaptor settings = ArgumentCaptor.forClass(Settings.class); + verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture()); + assertEquals(75535, settings.getValue().get(7)); + + verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, 10000); + } + /** * When nextFrame throws IOException, the transport should be aborted. */ @@ -836,39 +866,39 @@ public void windowUpdateWithInboundFlowControl() throws Exception { shutdownAndVerify(); } + /** + * Outbound flow control where the initial flow control window stays at the default size of 65535. + */ @Test public void outboundFlowControl() throws Exception { - outboundFlowControl(INITIAL_WINDOW_SIZE); - } - - private void outboundFlowControl(int windowSize) throws Exception { - startTransport( - DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, windowSize, null); + initTransport(); MockStreamListener listener = new MockStreamListener(); OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(listener); + + // Outbound window always starts at 65535 until changed by Settings.INITIAL_WINDOW_SIZE + int initialOutboundWindowSize = 65535; + int messageLength = initialOutboundWindowSize / 2 + 1; + // The first message should be sent out. - int messageLength = windowSize / 2 + 1; InputStream input = new ByteArrayInputStream(new byte[messageLength]); stream.writeMessage(input); stream.flush(); verify(frameWriter, timeout(TIME_OUT_MS)).data( eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); - // The second message should be partially sent out. input = new ByteArrayInputStream(new byte[messageLength]); stream.writeMessage(input); stream.flush(); - int partiallySentSize = - windowSize - messageLength - HEADER_LENGTH; + int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; verify(frameWriter, timeout(TIME_OUT_MS)) .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); - // Get more credit, the rest data should be sent out. - frameHandler().windowUpdate(3, windowSize); - frameHandler().windowUpdate(0, windowSize); + // Get more credit so the rest of the data should be sent out. + frameHandler().windowUpdate(3, initialOutboundWindowSize); + frameHandler().windowUpdate(0, initialOutboundWindowSize); verify(frameWriter, timeout(TIME_OUT_MS)).data( eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH - partiallySentSize)); @@ -878,14 +908,90 @@ private void outboundFlowControl(int windowSize) throws Exception { shutdownAndVerify(); } + /** + * Outbound flow control where the initial window size is reduced before a stream is started. + */ @Test public void outboundFlowControl_smallWindowSize() throws Exception { - outboundFlowControl(100); + initTransport(); + + int initialOutboundWindowSize = 100; + setInitialWindowSize(initialOutboundWindowSize); + + MockStreamListener listener = new MockStreamListener(); + OkHttpClientStream stream = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream.start(listener); + + int messageLength = 75; + // The first message should be sent out. + InputStream input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + verify(frameWriter, timeout(TIME_OUT_MS)).data( + eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); + + // The second message should be partially sent out. + input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; + verify(frameWriter, timeout(TIME_OUT_MS)) + .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); + + // Get more credit so the rest of the data should be sent out. + frameHandler().windowUpdate(3, initialOutboundWindowSize); + verify(frameWriter, timeout(TIME_OUT_MS)).data( + eq(false), eq(3), any(Buffer.class), + eq(messageLength + HEADER_LENGTH - partiallySentSize)); + + stream.cancel(Status.CANCELLED); + listener.waitUntilStreamClosed(); + shutdownAndVerify(); } + /** + * Outbound flow control where the initial window size is increased before a stream is started. + */ @Test public void outboundFlowControl_bigWindowSize() throws Exception { - outboundFlowControl(INITIAL_WINDOW_SIZE * 2); + initTransport(); + + int initialOutboundWindowSize = 131070; // 65535 * 2 + setInitialWindowSize(initialOutboundWindowSize); + frameHandler().windowUpdate(0, 65535); + + MockStreamListener listener = new MockStreamListener(); + OkHttpClientStream stream = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream.start(listener); + + int messageLength = 100000; + // The first message should be sent out. + InputStream input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + verify(frameWriter, timeout(TIME_OUT_MS)).data( + eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); + + // The second message should be partially sent out. + input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; + verify(frameWriter, timeout(TIME_OUT_MS)) + .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); + + // Get more credit so the rest of the data should be sent out. + frameHandler().windowUpdate(0, initialOutboundWindowSize); + frameHandler().windowUpdate(3, initialOutboundWindowSize); + verify(frameWriter, timeout(TIME_OUT_MS)).data( + eq(false), eq(3), any(Buffer.class), + eq(messageLength + HEADER_LENGTH - partiallySentSize)); + + stream.cancel(Status.CANCELLED); + listener.waitUntilStreamClosed(); + shutdownAndVerify(); } @Test