From ea6a6d02ee02f52efad50267986867a1ba67c3c5 Mon Sep 17 00:00:00 2001 From: Chris Schechter Date: Mon, 24 Feb 2020 18:04:17 -0800 Subject: [PATCH 1/6] okhttp: fix incorrect initial size of outbound flow control window --- .../src/main/java/io/grpc/okhttp/OkHttpClientTransport.java | 6 ++++++ okhttp/src/main/java/io/grpc/okhttp/Utils.java | 1 + 2 files changed, 7 insertions(+) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index b238b9237d6..9f4e2156c86 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; +import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO; import com.google.common.annotations.VisibleForTesting; @@ -608,7 +609,12 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() synchronized (lock) { frameWriter.connectionPreface(); Settings settings = new Settings(); + OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize); frameWriter.settings(settings); + if (initialWindowSize > DEFAULT_WINDOW_SIZE) { + frameWriter.windowUpdate( + Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE); + } } } finally { latch.countDown(); diff --git a/okhttp/src/main/java/io/grpc/okhttp/Utils.java b/okhttp/src/main/java/io/grpc/okhttp/Utils.java index 7ae35f9a379..2dc5f1e1ec9 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/Utils.java +++ b/okhttp/src/main/java/io/grpc/okhttp/Utils.java @@ -42,6 +42,7 @@ class Utils { * is sent to expand the window. */ static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f; + static final int DEFAULT_WINDOW_SIZE = 65535; static final int CONNECTION_STREAM_ID = 0; public static Metadata convertHeaders(List
http2Headers) { From 22eb66e47ec54a0dc134b64ffc66fff66ebc171d Mon Sep 17 00:00:00 2001 From: Chris Schechter Date: Mon, 24 Feb 2020 18:04:22 -0800 Subject: [PATCH 2/6] okhttp: fix incorrect initial size of outbound flow control window --- .../src/main/java/io/grpc/okhttp/OkHttpClientTransport.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 9f4e2156c86..3ce5346e04f 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -489,8 +489,8 @@ public Runnable start(Listener listener) { synchronized (lock) { frameWriter = new ExceptionHandlingFrameWriter(OkHttpClientTransport.this, testFrameWriter, testFrameLogger); - outboundFlow = - new OutboundFlowController(OkHttpClientTransport.this, frameWriter, initialWindowSize); + outboundFlow = new OutboundFlowController( + OkHttpClientTransport.this, frameWriter, DEFAULT_WINDOW_SIZE); } serializingExecutor.execute(new Runnable() { @Override @@ -516,7 +516,7 @@ public void run() { synchronized (lock) { frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter); - outboundFlow = new OutboundFlowController(this, frameWriter, initialWindowSize); + outboundFlow = new OutboundFlowController(this, frameWriter, DEFAULT_WINDOW_SIZE); } final CountDownLatch latch = new CountDownLatch(1); // Connecting in the serializingExecutor, so that some stream operations like synStream From aac0867923badf9bdd92d68e339f13d11441baa9 Mon Sep 17 00:00:00 2001 From: Chris Schechter Date: Mon, 24 Feb 2020 18:04:30 -0800 Subject: [PATCH 3/6] okhttp: simplify OutboundFlowController constructor --- .../main/java/io/grpc/okhttp/OkHttpClientTransport.java | 5 ++--- .../main/java/io/grpc/okhttp/OutboundFlowController.java | 7 ++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 3ce5346e04f..bde9b010543 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -489,8 +489,7 @@ public Runnable start(Listener listener) { synchronized (lock) { frameWriter = new ExceptionHandlingFrameWriter(OkHttpClientTransport.this, testFrameWriter, testFrameLogger); - outboundFlow = new OutboundFlowController( - OkHttpClientTransport.this, frameWriter, DEFAULT_WINDOW_SIZE); + outboundFlow = new OutboundFlowController(OkHttpClientTransport.this, frameWriter); } serializingExecutor.execute(new Runnable() { @Override @@ -516,7 +515,7 @@ public void run() { synchronized (lock) { frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter); - outboundFlow = new OutboundFlowController(this, frameWriter, DEFAULT_WINDOW_SIZE); + outboundFlow = new OutboundFlowController(this, frameWriter); } final CountDownLatch latch = new CountDownLatch(1); // Connecting in the serializingExecutor, so that some stream operations like synStream diff --git a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java index 441bb21151c..c935363213d 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java @@ -17,6 +17,7 @@ package io.grpc.okhttp; import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID; +import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; import static java.lang.Math.ceil; import static java.lang.Math.max; import static java.lang.Math.min; @@ -38,11 +39,11 @@ class OutboundFlowController { private final OutboundFlowState connectionState; OutboundFlowController( - OkHttpClientTransport transport, FrameWriter frameWriter, int initialWindowSize) { + OkHttpClientTransport transport, FrameWriter frameWriter) { this.transport = Preconditions.checkNotNull(transport, "transport"); this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter"); - this.initialWindowSize = initialWindowSize; - connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, initialWindowSize); + this.initialWindowSize = DEFAULT_WINDOW_SIZE; + connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE); } /** From 14ada64ab0cfb69dca0117a5a9122e2035766c88 Mon Sep 17 00:00:00 2001 From: Chris Schechter Date: Mon, 24 Feb 2020 18:04:42 -0800 Subject: [PATCH 4/6] okhttp: unit tests for outbound flow control window changes --- .../okhttp/OkHttpClientTransportTest.java | 31 ++++++------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 769d0097f90..3b4b9bcfdf7 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -838,37 +838,34 @@ public void windowUpdateWithInboundFlowControl() throws Exception { @Test public void outboundFlowControl() throws Exception { - outboundFlowControl(INITIAL_WINDOW_SIZE); - } - - private void outboundFlowControl(int windowSize) throws Exception { - startTransport( - DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, windowSize, null); + initTransport(); MockStreamListener listener = new MockStreamListener(); OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(listener); + + // Outbound window always starts at 65535 until changed by Settings.INITIAL_WINDOW_SIZE + int initialOutboundWindowSize = 65535; + int messageLength = initialOutboundWindowSize / 2 + 1; + // The first message should be sent out. - int messageLength = windowSize / 2 + 1; InputStream input = new ByteArrayInputStream(new byte[messageLength]); stream.writeMessage(input); stream.flush(); verify(frameWriter, timeout(TIME_OUT_MS)).data( eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); - // The second message should be partially sent out. input = new ByteArrayInputStream(new byte[messageLength]); stream.writeMessage(input); stream.flush(); - int partiallySentSize = - windowSize - messageLength - HEADER_LENGTH; + int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; verify(frameWriter, timeout(TIME_OUT_MS)) .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); // Get more credit, the rest data should be sent out. - frameHandler().windowUpdate(3, windowSize); - frameHandler().windowUpdate(0, windowSize); + frameHandler().windowUpdate(3, initialOutboundWindowSize); + frameHandler().windowUpdate(0, initialOutboundWindowSize); verify(frameWriter, timeout(TIME_OUT_MS)).data( eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH - partiallySentSize)); @@ -878,16 +875,6 @@ private void outboundFlowControl(int windowSize) throws Exception { shutdownAndVerify(); } - @Test - public void outboundFlowControl_smallWindowSize() throws Exception { - outboundFlowControl(100); - } - - @Test - public void outboundFlowControl_bigWindowSize() throws Exception { - outboundFlowControl(INITIAL_WINDOW_SIZE * 2); - } - @Test public void outboundFlowControlWithInitialWindowSizeChange() throws Exception { initTransport(); From 8da35397b0ece339398edc2dec5043c3b09216eb Mon Sep 17 00:00:00 2001 From: Chris Schechter Date: Mon, 24 Feb 2020 18:04:50 -0800 Subject: [PATCH 5/6] okhttp: unit testing sending initialWindowSize in settings --- .../io/grpc/okhttp/OkHttpClientTransport.java | 28 ++++++++++------- .../okhttp/OkHttpClientTransportTest.java | 30 +++++++++++++++++++ 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index bde9b010543..8bfd50a2c2f 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -605,16 +605,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() }); // Schedule to send connection preface & settings before any other write. try { - synchronized (lock) { - frameWriter.connectionPreface(); - Settings settings = new Settings(); - OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize); - frameWriter.settings(settings); - if (initialWindowSize > DEFAULT_WINDOW_SIZE) { - frameWriter.windowUpdate( - Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE); - } - } + sendConnectionPrefaceAndSettings(); } finally { latch.countDown(); } @@ -634,6 +625,23 @@ public void run() { return null; } + /** + * Should only be called once when the transport is first established. + */ + @VisibleForTesting + void sendConnectionPrefaceAndSettings() { + synchronized (lock) { + frameWriter.connectionPreface(); + Settings settings = new Settings(); + OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize); + frameWriter.settings(settings); + if (initialWindowSize > DEFAULT_WINDOW_SIZE) { + frameWriter.windowUpdate( + Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE); + } + } + } + private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, String proxyUsername, String proxyPassword) throws StatusException { try { diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 3b4b9bcfdf7..091105efdd2 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -404,6 +404,36 @@ public void maxMessageSizeShouldBeEnforced() throws Exception { shutdownAndVerify(); } + @Test + public void includeInitialWindowSizeInFirstSettings() throws Exception { + int initialWindowSize = 65535; + startTransport( + DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, initialWindowSize, null); + clientTransport.sendConnectionPrefaceAndSettings(); + + ArgumentCaptor settings = ArgumentCaptor.forClass(Settings.class); + verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture()); + assertEquals(65535, settings.getValue().get(7)); + } + + /** + * A "large" window size is anything over 65535 (the starting size for any connection-level + * flow control value). + */ + @Test + public void includeInitialWindowSizeInFirstSettings_largeWindowSize() throws Exception { + int initialWindowSize = 75535; // 65535 + 10000 + startTransport( + DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, initialWindowSize, null); + clientTransport.sendConnectionPrefaceAndSettings(); + + ArgumentCaptor settings = ArgumentCaptor.forClass(Settings.class); + verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture()); + assertEquals(75535, settings.getValue().get(7)); + + verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, 10000); + } + /** * When nextFrame throws IOException, the transport should be aborted. */ From 671dd3319335668adc439a99a590a21c29aa7ba3 Mon Sep 17 00:00:00 2001 From: Chris Schechter Date: Wed, 26 Feb 2020 15:09:33 -0800 Subject: [PATCH 6/6] okhttp: restore test cases for outboundFlowControl big/small window size --- .../okhttp/OkHttpClientTransportTest.java | 91 ++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 091105efdd2..016fea83ea9 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -866,6 +866,9 @@ public void windowUpdateWithInboundFlowControl() throws Exception { shutdownAndVerify(); } + /** + * Outbound flow control where the initial flow control window stays at the default size of 65535. + */ @Test public void outboundFlowControl() throws Exception { initTransport(); @@ -893,7 +896,7 @@ public void outboundFlowControl() throws Exception { verify(frameWriter, timeout(TIME_OUT_MS)) .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); - // Get more credit, the rest data should be sent out. + // Get more credit so the rest of the data should be sent out. frameHandler().windowUpdate(3, initialOutboundWindowSize); frameHandler().windowUpdate(0, initialOutboundWindowSize); verify(frameWriter, timeout(TIME_OUT_MS)).data( @@ -905,6 +908,92 @@ public void outboundFlowControl() throws Exception { shutdownAndVerify(); } + /** + * Outbound flow control where the initial window size is reduced before a stream is started. + */ + @Test + public void outboundFlowControl_smallWindowSize() throws Exception { + initTransport(); + + int initialOutboundWindowSize = 100; + setInitialWindowSize(initialOutboundWindowSize); + + MockStreamListener listener = new MockStreamListener(); + OkHttpClientStream stream = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream.start(listener); + + int messageLength = 75; + // The first message should be sent out. + InputStream input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + verify(frameWriter, timeout(TIME_OUT_MS)).data( + eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); + + // The second message should be partially sent out. + input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; + verify(frameWriter, timeout(TIME_OUT_MS)) + .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); + + // Get more credit so the rest of the data should be sent out. + frameHandler().windowUpdate(3, initialOutboundWindowSize); + verify(frameWriter, timeout(TIME_OUT_MS)).data( + eq(false), eq(3), any(Buffer.class), + eq(messageLength + HEADER_LENGTH - partiallySentSize)); + + stream.cancel(Status.CANCELLED); + listener.waitUntilStreamClosed(); + shutdownAndVerify(); + } + + /** + * Outbound flow control where the initial window size is increased before a stream is started. + */ + @Test + public void outboundFlowControl_bigWindowSize() throws Exception { + initTransport(); + + int initialOutboundWindowSize = 131070; // 65535 * 2 + setInitialWindowSize(initialOutboundWindowSize); + frameHandler().windowUpdate(0, 65535); + + MockStreamListener listener = new MockStreamListener(); + OkHttpClientStream stream = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream.start(listener); + + int messageLength = 100000; + // The first message should be sent out. + InputStream input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + verify(frameWriter, timeout(TIME_OUT_MS)).data( + eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); + + // The second message should be partially sent out. + input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; + verify(frameWriter, timeout(TIME_OUT_MS)) + .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); + + // Get more credit so the rest of the data should be sent out. + frameHandler().windowUpdate(0, initialOutboundWindowSize); + frameHandler().windowUpdate(3, initialOutboundWindowSize); + verify(frameWriter, timeout(TIME_OUT_MS)).data( + eq(false), eq(3), any(Buffer.class), + eq(messageLength + HEADER_LENGTH - partiallySentSize)); + + stream.cancel(Status.CANCELLED); + listener.waitUntilStreamClosed(); + shutdownAndVerify(); + } + @Test public void outboundFlowControlWithInitialWindowSizeChange() throws Exception { initTransport();