Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

okhttp: fix incorrect connection-level flow control handling at beginning of connection #6742

Merged
merged 6 commits into from Feb 27, 2020
29 changes: 21 additions & 8 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -488,8 +489,7 @@ public Runnable start(Listener listener) {
synchronized (lock) {
frameWriter = new ExceptionHandlingFrameWriter(OkHttpClientTransport.this, testFrameWriter,
testFrameLogger);
outboundFlow =
new OutboundFlowController(OkHttpClientTransport.this, frameWriter, initialWindowSize);
outboundFlow = new OutboundFlowController(OkHttpClientTransport.this, frameWriter);
}
serializingExecutor.execute(new Runnable() {
@Override
Expand All @@ -515,7 +515,7 @@ public void run() {

synchronized (lock) {
frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
outboundFlow = new OutboundFlowController(this, frameWriter, initialWindowSize);
outboundFlow = new OutboundFlowController(this, frameWriter);
}
final CountDownLatch latch = new CountDownLatch(1);
// Connecting in the serializingExecutor, so that some stream operations like synStream
Expand Down Expand Up @@ -605,11 +605,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
});
// Schedule to send connection preface & settings before any other write.
try {
synchronized (lock) {
frameWriter.connectionPreface();
Settings settings = new Settings();
frameWriter.settings(settings);
}
sendConnectionPrefaceAndSettings();
} finally {
latch.countDown();
}
Expand All @@ -629,6 +625,23 @@ public void run() {
return null;
}

/**
* Should only be called once when the transport is first established.
*/
@VisibleForTesting
void sendConnectionPrefaceAndSettings() {
synchronized (lock) {
frameWriter.connectionPreface();
Settings settings = new Settings();
OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
frameWriter.settings(settings);
if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
frameWriter.windowUpdate(
Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
}
}
}

private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
String proxyUsername, String proxyPassword) throws StatusException {
try {
Expand Down
Expand Up @@ -17,6 +17,7 @@
package io.grpc.okhttp;

import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
import static java.lang.Math.ceil;
import static java.lang.Math.max;
import static java.lang.Math.min;
Expand All @@ -38,11 +39,11 @@ class OutboundFlowController {
private final OutboundFlowState connectionState;

OutboundFlowController(
OkHttpClientTransport transport, FrameWriter frameWriter, int initialWindowSize) {
OkHttpClientTransport transport, FrameWriter frameWriter) {
this.transport = Preconditions.checkNotNull(transport, "transport");
this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter");
this.initialWindowSize = initialWindowSize;
connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, initialWindowSize);
this.initialWindowSize = DEFAULT_WINDOW_SIZE;
connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE);
}

/**
Expand Down
1 change: 1 addition & 0 deletions okhttp/src/main/java/io/grpc/okhttp/Utils.java
Expand Up @@ -42,6 +42,7 @@ class Utils {
* is sent to expand the window.
*/
static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
static final int DEFAULT_WINDOW_SIZE = 65535;
static final int CONNECTION_STREAM_ID = 0;

public static Metadata convertHeaders(List<Header> http2Headers) {
Expand Down
136 changes: 121 additions & 15 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Expand Up @@ -404,6 +404,36 @@ public void maxMessageSizeShouldBeEnforced() throws Exception {
shutdownAndVerify();
}

@Test
public void includeInitialWindowSizeInFirstSettings() throws Exception {
int initialWindowSize = 65535;
startTransport(
DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, initialWindowSize, null);
clientTransport.sendConnectionPrefaceAndSettings();

ArgumentCaptor<Settings> settings = ArgumentCaptor.forClass(Settings.class);
verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture());
assertEquals(65535, settings.getValue().get(7));
}

/**
* A "large" window size is anything over 65535 (the starting size for any connection-level
* flow control value).
*/
@Test
public void includeInitialWindowSizeInFirstSettings_largeWindowSize() throws Exception {
int initialWindowSize = 75535; // 65535 + 10000
startTransport(
DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, initialWindowSize, null);
clientTransport.sendConnectionPrefaceAndSettings();

ArgumentCaptor<Settings> settings = ArgumentCaptor.forClass(Settings.class);
verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture());
assertEquals(75535, settings.getValue().get(7));

verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, 10000);
}

/**
* When nextFrame throws IOException, the transport should be aborted.
*/
Expand Down Expand Up @@ -836,39 +866,39 @@ public void windowUpdateWithInboundFlowControl() throws Exception {
shutdownAndVerify();
}

/**
* Outbound flow control where the initial flow control window stays at the default size of 65535.
*/
@Test
public void outboundFlowControl() throws Exception {
outboundFlowControl(INITIAL_WINDOW_SIZE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these three tests should still be around. They are trying to test that the outbound flow controller observes the flow control given to it. Since passing windowSize to startTransport() no longer works, these need to get something like frameHandler().settings(false, new Settings()); (but with the INITIAL_WINDOW_SIZE set to windowSize) to change the window size available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I initially thought those cases were already covered by outboundFlowControlWithInitialWindowSizeChange(), but realized it wasn't testing where the initial window size is changed before the stream is started.

}

private void outboundFlowControl(int windowSize) throws Exception {
startTransport(
DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, windowSize, null);
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);

// Outbound window always starts at 65535 until changed by Settings.INITIAL_WINDOW_SIZE
int initialOutboundWindowSize = 65535;
int messageLength = initialOutboundWindowSize / 2 + 1;

// The first message should be sent out.
int messageLength = windowSize / 2 + 1;
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));


// The second message should be partially sent out.
input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
int partiallySentSize =
windowSize - messageLength - HEADER_LENGTH;
int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));

// Get more credit, the rest data should be sent out.
frameHandler().windowUpdate(3, windowSize);
frameHandler().windowUpdate(0, windowSize);
// Get more credit so the rest of the data should be sent out.
frameHandler().windowUpdate(3, initialOutboundWindowSize);
frameHandler().windowUpdate(0, initialOutboundWindowSize);
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class),
eq(messageLength + HEADER_LENGTH - partiallySentSize));
Expand All @@ -878,14 +908,90 @@ private void outboundFlowControl(int windowSize) throws Exception {
shutdownAndVerify();
}

/**
* Outbound flow control where the initial window size is reduced before a stream is started.
*/
@Test
public void outboundFlowControl_smallWindowSize() throws Exception {
outboundFlowControl(100);
initTransport();

int initialOutboundWindowSize = 100;
setInitialWindowSize(initialOutboundWindowSize);

MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);

int messageLength = 75;
// The first message should be sent out.
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));

// The second message should be partially sent out.
input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));

// Get more credit so the rest of the data should be sent out.
frameHandler().windowUpdate(3, initialOutboundWindowSize);
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class),
eq(messageLength + HEADER_LENGTH - partiallySentSize));

stream.cancel(Status.CANCELLED);
listener.waitUntilStreamClosed();
shutdownAndVerify();
}

/**
* Outbound flow control where the initial window size is increased before a stream is started.
*/
@Test
public void outboundFlowControl_bigWindowSize() throws Exception {
outboundFlowControl(INITIAL_WINDOW_SIZE * 2);
initTransport();

int initialOutboundWindowSize = 131070; // 65535 * 2
setInitialWindowSize(initialOutboundWindowSize);
frameHandler().windowUpdate(0, 65535);

MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);

int messageLength = 100000;
// The first message should be sent out.
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));

// The second message should be partially sent out.
input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input);
stream.flush();
int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));

// Get more credit so the rest of the data should be sent out.
frameHandler().windowUpdate(0, initialOutboundWindowSize);
frameHandler().windowUpdate(3, initialOutboundWindowSize);
verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class),
eq(messageLength + HEADER_LENGTH - partiallySentSize));

stream.cancel(Status.CANCELLED);
listener.waitUntilStreamClosed();
shutdownAndVerify();
}

@Test
Expand Down