Skip to content

Commit

Permalink
remove forced read, bring in netty update
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Jan 12, 2024
1 parent 641d3f8 commit 4e4f1d5
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ issueManagementUrl=https://github.com/apple/servicetalk/issues
ciManagementUrl=https://github.com/apple/servicetalk/actions

# dependency versions
nettyVersion=4.1.104.Final
nettyVersion=4.1.105.Final-SNAPSHOT
nettyIoUringVersion=0.0.24.Final

jsr305Version=3.0.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.data.jackson.JacksonSerializerFactory.JACKSON;
import static io.servicetalk.http.api.HeaderUtils.isTransferEncodingChunked;
import static io.servicetalk.http.api.HttpHeaderNames.CONNECTION;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
Expand All @@ -157,6 +158,7 @@
import static io.servicetalk.http.api.HttpResponseStatus.EXPECTATION_FAILED;
import static io.servicetalk.http.api.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.api.HttpSerializers.jsonStreamingSerializer;
import static io.servicetalk.http.api.HttpSerializers.textSerializerUtf8;
import static io.servicetalk.http.netty.AbstractStreamingHttpConnection.MAX_CONCURRENCY_NO_OFFLOADING;
import static io.servicetalk.http.netty.AsyncContextHttpFilterVerifier.K1;
Expand Down Expand Up @@ -1306,6 +1308,23 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
}
}

@ParameterizedTest(name = "{displayName} [{index}] client={0}, h2PriorKnowledge={1}")
@MethodSource("clientExecutors")
void backpressureNoSOOEForLargePayloads(HttpTestExecutionStrategy strategy, boolean h2PriorKnowledge) throws Exception {
setUp(strategy, h2PriorKnowledge);
InetSocketAddress serverAddress = bindHttpEchoServer();
StreamingHttpClient client = forSingleAddress(HostAndPort.of(serverAddress))
.protocols(h2PriorKnowledge ? h2Default() : h1Default())
.executionStrategy(clientExecutionStrategy).buildStreaming();

StreamingHttpRequest request = client.post("/").payloadBody(Publisher.range(0, 10000),
jsonStreamingSerializer(JACKSON.streamingSerializerDeserializer(Integer.class)));
StreamingHttpResponse response = client.request(request).toFuture().get();

response.messageBody().ignoreElements().toFuture().get();
client.close();
}

@ParameterizedTest(name = "{displayName} [{index}] client={0}, h2PriorKnowledge={1}")
@MethodSource("clientExecutors")
void serverGracefulClose(HttpTestExecutionStrategy strategy, boolean h2PriorKnowledge) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public final class DefaultNettyConnection<Read, Write> extends NettyChannelListe
private final ChannelConfig parentChannelConfig;
private volatile DataObserver dataObserver;
private final boolean isClient;
private final boolean readOnChannelComplete;
private final Predicate<Object> shouldWait;
private final UnaryOperator<Throwable> enrichProtocolError;
private final TerminalSignalConsumer cleanupStateConsumer = new TerminalSignalConsumer() {
Expand Down Expand Up @@ -171,7 +170,7 @@ private void cleanupOnWriteTerminated() {
private DefaultNettyConnection(
Channel channel, @Nullable ConnectionContext parent, ExecutionContext<?> executionContext,
CloseHandler closeHandler, FlushStrategy flushStrategy,
long idleTimeoutMs, Protocol protocol, boolean readOnChannelComplete,
long idleTimeoutMs, Protocol protocol,
@Nullable SslConfig sslConfig, @Nullable SSLSession sslSession,
@Nullable ChannelConfig parentChannelConfig, DataObserver dataObserver, boolean isClient,
Predicate<Object> shouldWait, UnaryOperator<Throwable> enrichProtocolError) {
Expand Down Expand Up @@ -201,7 +200,6 @@ private DefaultNettyConnection(
this.protocol = requireNonNull(protocol);
this.dataObserver = dataObserver;
this.isClient = isClient;
this.readOnChannelComplete = readOnChannelComplete;
this.shouldWait = requireNonNull(shouldWait);
this.enrichProtocolError = requireNonNull(enrichProtocolError);
}
Expand Down Expand Up @@ -352,7 +350,7 @@ private static <Read, Write> DefaultNettyConnection<Read, Write> initChildChanne
channel.config().setAutoRead(false);

DefaultNettyConnection<Read, Write> connection = new DefaultNettyConnection<>(channel, parent, executionContext,
closeHandler, flushStrategy, idleTimeoutMs, protocol, true, sslConfig, sslSession, parentChannelConfig,
closeHandler, flushStrategy, idleTimeoutMs, protocol, sslConfig, sslSession, parentChannelConfig,
streamObserver.streamEstablished(), isClient, shouldWait, enrichProtocolError);
channel.pipeline().addLast(new NettyToStChannelHandler<>(connection, null,
null, false, NoopConnectionObserver.INSTANCE));
Expand Down Expand Up @@ -513,7 +511,7 @@ protected void handleSubscribe(
@Nullable
final SSLSession sslSession = extractSslSession(sslConfig, pipeline);
DefaultNettyConnection<Read, Write> connection = new DefaultNettyConnection<>(channel, null,
executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, false, sslConfig,
executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, sslConfig,
sslSession, null, NoopDataObserver.INSTANCE, isClient, shouldWait, identity());
channel.attr(CHANNEL_CLOSEABLE_KEY).set(connection);
delayedCancellable = new DelayedCancellable();
Expand Down Expand Up @@ -929,12 +927,6 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
LOGGER.debug("{} Received a user event: {}", ctx.channel(), evt);
if (evt == CloseHandler.InboundDataEndEvent.INSTANCE) {
connection.nettyChannelPublisher.channelOnComplete();
if (connection.readOnChannelComplete) {
// h2 child channel may require read() to notify of channel closure. This constraint is lifted
// with netty enhancement [1] but keep this for some time to give folks time to update Netty.
// [1] https://github.com/netty/netty/pull/13779
ctx.read();
}
} else if (evt == CloseHandler.OutboundDataEndEvent.INSTANCE) {
connection.channelOutboundListener.channelOutboundClosed();
} else if (evt == AbortWritesEvent.INSTANCE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ final class NettyChannelPublisher<T> extends SubscribablePublisher<T> {
private boolean requested;
@Nullable
private SubscriptionImpl subscription;
/**
* The size of the queue is bound by {@link SubscriptionImpl#request(long)} demand. Using reactive operators to
* transform data and letting ServiceTalk subscribe will take care of backpressure automatically.
*/
@Nullable
private Queue<Object> pending;
@Nullable
Expand Down

0 comments on commit 4e4f1d5

Please sign in to comment.