Skip to content

Commit

Permalink
h2: disable auto read for stream channels
Browse files Browse the repository at this point in the history
Motivation:
HTTP/2 stream/child channel use cases don't disable autoRead
explicitly. This means we may read and queue more data than
desirable.

Modifications:
- Disable auto read for h2 stream/child channels.
  • Loading branch information
Scottmitch committed Jan 16, 2024
1 parent b6b98cc commit e7e0068
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -108,6 +109,8 @@ class ErrorHandlingTest {
private BlockingTesterClient blockingClient;
@Nullable
private Publisher<TestRequest> requestPublisher;
@Nullable
private GrpcExecutionStrategy clientExecutionStrategy;

private enum TestMode {
HttpClientFilterThrows,
Expand Down Expand Up @@ -158,6 +161,7 @@ boolean isSafeNoOffload() {
private void setUp(TestMode testMode, GrpcExecutionStrategy serverStrategy,
GrpcExecutionStrategy clientStrategy) throws Exception {
this.testMode = testMode;
this.clientExecutionStrategy = clientStrategy;
cannedResponse = TestResponse.newBuilder().setMessage("foo").build();
ServiceFactory serviceFactory;
StreamingHttpServiceFilterFactory serviceFilterFactory = IDENTITY_FILTER;
Expand Down Expand Up @@ -643,7 +647,15 @@ private void verifyException(final Throwable cause) {
assertNotNull(cause);
assertThat(assertThrows(GrpcStatusException.class, () -> {
throw cause;
}).status().code(), equalTo(expectedStatus()));
}).status().code(), either(equalTo(expectedStatus())).or(equalTo(expectedStatusSecondary())));
}

private GrpcStatusCode expectedStatusSecondary() {
// The server writes trailers with expected status then a RST stream frame. The client may not be done with its
// write operation and the RST may be result in CANCELLED being propagated instead.
return testMode == TestMode.ServiceSecondOperatorThrowsGrpcException &&
clientExecutionStrategy != null && clientExecutionStrategy.hasOffloads() ?
GrpcStatusCode.CANCELLED : expectedStatus();
}

private GrpcStatusCode expectedStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.data.jackson.JacksonSerializerFactory.JACKSON;
import static io.servicetalk.http.api.HeaderUtils.isTransferEncodingChunked;
import static io.servicetalk.http.api.HttpHeaderNames.CONNECTION;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
Expand All @@ -157,6 +158,7 @@
import static io.servicetalk.http.api.HttpResponseStatus.EXPECTATION_FAILED;
import static io.servicetalk.http.api.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.api.HttpSerializers.jsonStreamingSerializer;
import static io.servicetalk.http.api.HttpSerializers.textSerializerUtf8;
import static io.servicetalk.http.netty.AbstractStreamingHttpConnection.MAX_CONCURRENCY_NO_OFFLOADING;
import static io.servicetalk.http.netty.AsyncContextHttpFilterVerifier.K1;
Expand Down Expand Up @@ -1306,6 +1308,24 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
}
}

@ParameterizedTest(name = "{displayName} [{index}] client={0}, h2PriorKnowledge={1}")
@MethodSource("clientExecutors")
void backpressureNoSOOEForLargePayloads(HttpTestExecutionStrategy strategy, boolean h2PriorKnowledge)
throws Exception {
setUp(strategy, h2PriorKnowledge);
InetSocketAddress serverAddress = bindHttpEchoServer();
StreamingHttpClient client = forSingleAddress(HostAndPort.of(serverAddress))
.protocols(h2PriorKnowledge ? h2Default() : h1Default())
.executionStrategy(clientExecutionStrategy).buildStreaming();

StreamingHttpRequest request = client.post("/").payloadBody(Publisher.range(0, 10_000),
jsonStreamingSerializer(JACKSON.streamingSerializerDeserializer(Integer.class)));
StreamingHttpResponse response = client.request(request).toFuture().get();

response.messageBody().ignoreElements().toFuture().get();
client.close();
}

@ParameterizedTest(name = "{displayName} [{index}] client={0}, h2PriorKnowledge={1}")
@MethodSource("clientExecutors")
void serverGracefulClose(HttpTestExecutionStrategy strategy, boolean h2PriorKnowledge) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ private static <Read, Write> DefaultNettyConnection<Read, Write> initChildChanne
Predicate<Object> shouldWait, UnaryOperator<Throwable> enrichProtocolError) {
assert parent == null || parent.executionContext() == executionContext;
assert channel.eventLoop() == toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor()).eventLoopGroup();

// For h2 the parent channel must use auto read because control frames and flow controlled frames are on the
// same socket, and we must read in timely manner to avoid deadlock. Child channel should not use auto
// read as read is explicitly called by NettyChannelPublisher according to the Subscription.request(n) demand.
channel.config().setAutoRead(false);

DefaultNettyConnection<Read, Write> connection = new DefaultNettyConnection<>(channel, parent, executionContext,
closeHandler, flushStrategy, idleTimeoutMs, protocol, sslConfig, sslSession, parentChannelConfig,
streamObserver.streamEstablished(), isClient, shouldWait, enrichProtocolError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ final class NettyChannelPublisher<T> extends SubscribablePublisher<T> {
private boolean requested;
@Nullable
private SubscriptionImpl subscription;
/**
* The size of the queue is bound by {@link SubscriptionImpl#request(long)} demand. Using reactive operators to
* transform data and letting ServiceTalk subscribe will take care of backpressure automatically.
*/
@Nullable
private Queue<Object> pending;
@Nullable
Expand Down

0 comments on commit e7e0068

Please sign in to comment.