Skip to content

Commit

Permalink
okhttp: fix incorrect connection-level flow control handling at begin…
Browse files Browse the repository at this point in the history
…ning of connection (v1.28.x backport)

Specifically, this addresses bugs that occur when the `OkHttpChannelBuilder.flowControlWindow(int)` setting is increased from its default value.

Two changes:
1. On starting a connection, ensure the value of `OkHttpChannelBuilder.flowControlWindow(int)` is sent via Settings.INITIAL_WINDOW_SIZE. Also send a WINDOW_UPDATE after Settings to update the connection-level window.
2. Always initialize the `OutboundFlowController` with an initialWindowSize of 65335 bytes per the [http2 spec](https://http2.github.io/http2-spec/#InitialWindowSize) instead of using the inbound window size.

Fixes #6685
Backport of #6742
  • Loading branch information
voidzcy committed Feb 27, 2020
1 parent a6c93dc commit 849dc2e
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 26 deletions.
29 changes: 21 additions & 8 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -488,8 +489,7 @@ public Runnable start(Listener listener) {
synchronized (lock) {
frameWriter = new ExceptionHandlingFrameWriter(OkHttpClientTransport.this, testFrameWriter,
testFrameLogger);
outboundFlow =
new OutboundFlowController(OkHttpClientTransport.this, frameWriter, initialWindowSize);
outboundFlow = new OutboundFlowController(OkHttpClientTransport.this, frameWriter);
}
serializingExecutor.execute(new Runnable() {
@Override
Expand All @@ -515,7 +515,7 @@ public void run() {

synchronized (lock) {
frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
outboundFlow = new OutboundFlowController(this, frameWriter, initialWindowSize);
outboundFlow = new OutboundFlowController(this, frameWriter);
}
final CountDownLatch latch = new CountDownLatch(1);
// Connecting in the serializingExecutor, so that some stream operations like synStream
Expand Down Expand Up @@ -605,11 +605,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
});
// Schedule to send connection preface & settings before any other write.
try {
synchronized (lock) {
frameWriter.connectionPreface();
Settings settings = new Settings();
frameWriter.settings(settings);
}
sendConnectionPrefaceAndSettings();
} finally {
latch.countDown();
}
Expand All @@ -629,6 +625,23 @@ public void run() {
return null;
}

/**
* Should only be called once when the transport is first established.
*/
@VisibleForTesting
void sendConnectionPrefaceAndSettings() {
synchronized (lock) {
frameWriter.connectionPreface();
Settings settings = new Settings();
OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
frameWriter.settings(settings);
if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
frameWriter.windowUpdate(
Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
}
}
}

private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
String proxyUsername, String proxyPassword) throws StatusException {
try {
Expand Down
Expand Up @@ -17,6 +17,7 @@
package io.grpc.okhttp;

import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
import static java.lang.Math.ceil;
import static java.lang.Math.max;
import static java.lang.Math.min;
Expand All @@ -38,11 +39,11 @@ class OutboundFlowController {
private final OutboundFlowState connectionState;

OutboundFlowController(
OkHttpClientTransport transport, FrameWriter frameWriter, int initialWindowSize) {
OkHttpClientTransport transport, FrameWriter frameWriter) {
this.transport = Preconditions.checkNotNull(transport, "transport");
this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter");
this.initialWindowSize = initialWindowSize;
connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, initialWindowSize);
this.initialWindowSize = DEFAULT_WINDOW_SIZE;
connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE);
}

/**
Expand Down
1 change: 1 addition & 0 deletions okhttp/src/main/java/io/grpc/okhttp/Utils.java
Expand Up @@ -42,6 +42,7 @@ class Utils {
* is sent to expand the window.
*/
static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
static final int DEFAULT_WINDOW_SIZE = 65535;
static final int CONNECTION_STREAM_ID = 0;

public static Metadata convertHeaders(List<Header> http2Headers) {
Expand Down
136 changes: 121 additions & 15 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Expand Up @@ -404,6 +404,36 @@ public void maxMessageSizeShouldBeEnforced() throws Exception {
shutdownAndVerify();
}

@Test
public void includeInitialWindowSizeInFirstSettings() throws Exception {
int initialWindowSize = 65535;
startTransport(
DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, initialWindowSize, null);
clientTransport.sendConnectionPrefaceAndSettings();

ArgumentCaptor<Settings> settings = ArgumentCaptor.forClass(Settings.class);
verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture());
assertEquals(65535, settings.getValue().get(7));
}

/**
* A "large" window size is anything over 65535 (the starting size for any connection-level
* flow control value).
*/
@Test
public void includeInitialWindowSizeInFirstSettings_largeWindowSize() throws Exception {
int initialWindowSize = 75535; // 65535 + 10000
startTransport(
DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, initialWindowSize, null);
clientTransport.sendConnectionPrefaceAndSettings();

ArgumentCaptor<Settings> settings = ArgumentCaptor.forClass(Settings.class);
verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture());
assertEquals(75535, settings.getValue().get(7));

verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, 10000);
}

/**
* When nextFrame throws IOException, the transport should be aborted.
*/
Expand Down Expand Up @@ -836,39 +866,39 @@ public void windowUpdateWithInboundFlowControl() throws Exception {
shutdownAndVerify();
}

/**
* Outbound flow control where the initial flow control window stays at the default size of 65535.
*/
@Test
public void outboundFlowControl() throws Exception {
outboundFlowControl(INITIAL_WINDOW_SIZE);
}

private void outboundFlowControl(int windowSize) throws Exception {
startTransport(
DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, windowSize, null);
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);

// Outbound window always starts at 65535 until changed by Settings.INITIAL_WINDOW_SIZE
int initialOutboundWindowSize = 65535;
int messageLength = initialOutboundWindowSize / 2 + 1;

// The first message should be sent out.
int messageLength = windowSize / 2 + 1;
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));


// The second message should be partially sent out.
input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
int partiallySentSize =
windowSize - messageLength - HEADER_LENGTH;
int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));

// Get more credit, the rest data should be sent out.
frameHandler().windowUpdate(3, windowSize);
frameHandler().windowUpdate(0, windowSize);
// Get more credit so the rest of the data should be sent out.
frameHandler().windowUpdate(3, initialOutboundWindowSize);
frameHandler().windowUpdate(0, initialOutboundWindowSize);
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class),
eq(messageLength + HEADER_LENGTH - partiallySentSize));
Expand All @@ -878,14 +908,90 @@ private void outboundFlowControl(int windowSize) throws Exception {
shutdownAndVerify();
}

/**
* Outbound flow control where the initial window size is reduced before a stream is started.
*/
@Test
public void outboundFlowControl_smallWindowSize() throws Exception {
outboundFlowControl(100);
initTransport();

int initialOutboundWindowSize = 100;
setInitialWindowSize(initialOutboundWindowSize);

MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);

int messageLength = 75;
// The first message should be sent out.
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));

// The second message should be partially sent out.
input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));

// Get more credit so the rest of the data should be sent out.
frameHandler().windowUpdate(3, initialOutboundWindowSize);
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class),
eq(messageLength + HEADER_LENGTH - partiallySentSize));

stream.cancel(Status.CANCELLED);
listener.waitUntilStreamClosed();
shutdownAndVerify();
}

/**
* Outbound flow control where the initial window size is increased before a stream is started.
*/
@Test
public void outboundFlowControl_bigWindowSize() throws Exception {
outboundFlowControl(INITIAL_WINDOW_SIZE * 2);
initTransport();

int initialOutboundWindowSize = 131070; // 65535 * 2
setInitialWindowSize(initialOutboundWindowSize);
frameHandler().windowUpdate(0, 65535);

MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);

int messageLength = 100000;
// The first message should be sent out.
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));

// The second message should be partially sent out.
input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));

// Get more credit so the rest of the data should be sent out.
frameHandler().windowUpdate(0, initialOutboundWindowSize);
frameHandler().windowUpdate(3, initialOutboundWindowSize);
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class),
eq(messageLength + HEADER_LENGTH - partiallySentSize));

stream.cancel(Status.CANCELLED);
listener.waitUntilStreamClosed();
shutdownAndVerify();
}

@Test
Expand Down

0 comments on commit 849dc2e

Please sign in to comment.