Skip to content

Commit

Permalink
Update Netty 4.1.59 -> 4.1.60
Browse files Browse the repository at this point in the history
Modifications:
- Add workaround to ProtocolCompatibilityTest for
grpc/grpc-java#7953.
- Remove ServiceTalk content-length validation which is now implemented
  by Netty.
  • Loading branch information
Scottmitch committed Mar 10, 2021
1 parent f2434a3 commit 2820d26
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 133 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Expand Up @@ -17,7 +17,7 @@
group=io.servicetalk
version=0.38.0-SNAPSHOT

nettyVersion=4.1.59.Final
nettyVersion=4.1.60.Final
tcnativeVersion=2.0.36.Final
jsr305Version=3.0.2

Expand Down
Expand Up @@ -43,8 +43,11 @@
import io.servicetalk.grpc.netty.CompatProto.Compat.ServiceFactory;
import io.servicetalk.grpc.netty.CompatProto.RequestContainer.CompatRequest;
import io.servicetalk.grpc.netty.CompatProto.ResponseContainer.CompatResponse;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
Expand Down Expand Up @@ -100,6 +103,9 @@
import static io.servicetalk.encoding.api.ContentCodings.identity;
import static io.servicetalk.grpc.api.GrpcExecutionStrategies.defaultStrategy;
import static io.servicetalk.grpc.api.GrpcExecutionStrategies.noOffloadsStrategy;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
import static io.servicetalk.http.api.HttpHeaderNames.TRANSFER_ENCODING;
import static io.servicetalk.http.api.HttpHeaderValues.CHUNKED;
import static io.servicetalk.test.resources.DefaultTestCerts.loadServerKey;
import static io.servicetalk.test.resources.DefaultTestCerts.loadServerPem;
import static io.servicetalk.transport.api.SecurityConfigurator.SslProvider.OPENSSL;
Expand Down Expand Up @@ -832,6 +838,20 @@ private static CompatClient serviceTalkClient(final SocketAddress serverAddress,
builder.secure().disableHostnameVerification().provider(OPENSSL)
.trustManager(DefaultTestCerts::loadServerCAPem).commit();
}
// TODO(scott): remove after https://github.com/grpc/grpc-java/issues/7953 is resolved.
builder.appendHttpClientFilter(client -> new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return Single.defer(() -> {
// Force chunked transfer encoding as a workaround for grpc-java bug.
request.headers().remove(CONTENT_LENGTH);
request.headers().set(TRANSFER_ENCODING, CHUNKED);
return delegate.request(strategy, request).subscribeShareContext();
});
}
});
List<ContentCodec> codings = serviceTalkCodingsFor(compression);
return builder.build(new Compat.ClientFactory().supportedMessageCodings(codings));
}
Expand Down
Expand Up @@ -32,37 +32,21 @@
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2ResetFrame;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.servicetalk.buffer.netty.BufferUtils.toByteBuf;
import static io.servicetalk.http.netty.H2ToStH1Utils.h1HeadersToH2Headers;
import static io.servicetalk.http.netty.Http2Exception.newStreamResetException;
import static io.servicetalk.http.netty.HttpObjectEncoder.encodeAndRetain;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.channelError;
import static java.lang.Boolean.getBoolean;
import static java.lang.Math.addExact;

abstract class AbstractH2DuplexHandler extends ChannelDuplexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractH2DuplexHandler.class);
/**
* Temporary opt-out of
* <a href="https://tools.ietf.org/html/rfc7540#section-8.1.2.6">Malformed Requests and Responses</a> checks.
* This is only meant to ease interoperability until violating implementations are fixed.
* <b>Will be removed in future release!</b>
*/
private static final boolean ALLOW_INVALID_CONTENT_LENGTH =
getBoolean("io.servicetalk.http2.allowInvalidContentLength");
final BufferAllocator allocator;
final HttpHeadersFactory headersFactory;
final CloseHandler closeHandler;
private final StreamObserver observer;
private long contentLength = Long.MIN_VALUE;
private long seenContentLength;

AbstractH2DuplexHandler(BufferAllocator allocator, HttpHeadersFactory headersFactory, CloseHandler closeHandler,
StreamObserver observer) {
Expand Down Expand Up @@ -101,7 +85,6 @@ final void readDataFrame(ChannelHandlerContext ctx, Object msg) {
Http2DataFrame dataFrame = (Http2DataFrame) msg;
final int readableBytes = dataFrame.content().readableBytes();
if (readableBytes > 0) {
updateSeenContentLength(readableBytes);
// Copy to unpooled memory before passing to the user
Buffer data = allocator.newBuffer(readableBytes);
ByteBuf nettyData = toByteBuf(data);
Expand All @@ -112,7 +95,6 @@ final void readDataFrame(ChannelHandlerContext ctx, Object msg) {
toRelease = release(dataFrame);
}
if (dataFrame.isEndStream()) {
validateContentLengthMatch();
ctx.fireChannelRead(headersFactory.newEmptyTrailers());
closeHandler.protocolPayloadEndInbound(ctx);
}
Expand Down Expand Up @@ -143,39 +125,4 @@ public void channelInactive(final ChannelHandlerContext ctx) {
}
ctx.fireChannelInactive();
}

final long contentLength(final Http2Headers headers) {
if (contentLength == Long.MIN_VALUE) {
contentLength = HeaderUtils.contentLength(headers.valueIterator(CONTENT_LENGTH), headers::getAll);
}
return contentLength;
}

final void validateContentLengthMatch() {
if (contentLength >= 0 && seenContentLength != contentLength) {
handleUnexpectedContentLength();
}
}

private void updateSeenContentLength(final int readableBytes) {
assert readableBytes >= 0;
if (contentLength < 0) {
return;
}
seenContentLength = addExact(seenContentLength, readableBytes);
if (seenContentLength > contentLength) {
handleUnexpectedContentLength();
}
}

final void handleUnexpectedContentLength() {
final String msg = "Expected content-length " + contentLength + " not equal to the actual length " +
seenContentLength +
". Malformed request/response according to https://tools.ietf.org/html/rfc7540#section-8.1.2.6.";
if (ALLOW_INVALID_CONTENT_LENGTH) {
LOGGER.info(msg);
} else {
throw new IllegalArgumentException(msg);
}
}
}
Expand Up @@ -43,12 +43,12 @@
import static io.servicetalk.http.api.HttpHeaderValues.ZERO;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.api.HttpRequestMethod.CONNECT;
import static io.servicetalk.http.api.HttpRequestMethod.HEAD;
import static io.servicetalk.http.api.HttpResponseMetaDataFactory.newResponseMetaData;
import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.INFORMATIONAL_1XX;
import static io.servicetalk.http.netty.H2ToStH1Utils.h1HeadersToH2Headers;
import static io.servicetalk.http.netty.H2ToStH1Utils.h2HeadersSanitizeForH1;
import static io.servicetalk.http.netty.HeaderUtils.canAddResponseTransferEncodingProtocol;
import static io.servicetalk.http.netty.HeaderUtils.contentLength;
import static io.servicetalk.http.netty.HeaderUtils.shouldAddZeroContentLength;

final class H2ToStH1ClientDuplexHandler extends AbstractH2DuplexHandler {
Expand Down Expand Up @@ -136,14 +136,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (httpStatus != null) {
fireFullResponse(ctx, h2Headers, httpStatus);
} else {
if (!HEAD.equals(method)) {
// https://tools.ietf.org/html/rfc7230#section-3.3
// Responses to the HEAD request method (Section 4.3.2 of [RFC7231]) never include a message
// body because the associated response header fields (e.g., Transfer-Encoding, Content-Length,
// etc.), if present, indicate only what their values would have been if the request method had
// been GET (Section 4.3.1 of [RFC7231]).
validateContentLengthMatch();
}
ctx.fireChannelRead(h2HeadersToH1HeadersClient(h2Headers, null, false));
}
closeHandler.protocolPayloadEndInbound(ctx);
Expand Down Expand Up @@ -175,7 +167,8 @@ private NettyH2HeadersToHttpHeaders h2HeadersToH1HeadersClient(Http2Headers h2He
h2HeadersSanitizeForH1(h2Headers);
if (httpStatus != null) {
final int statusCode = httpStatus.code();
final long contentLength = contentLength(h2Headers);
final long contentLength = contentLength(h2Headers.valueIterator(HttpHeaderNames.CONTENT_LENGTH),
h2Headers::getAll);
if (contentLength < 0) {
if (fullResponse) {
if (shouldAddZeroContentLength(httpStatus.code(), method)) {
Expand All @@ -188,8 +181,6 @@ private NettyH2HeadersToHttpHeaders h2HeadersToH1HeadersClient(Http2Headers h2He
throw new IllegalArgumentException("content-length (" + contentLength +
") header is not expected for status code " + statusCode + " in response to " + method.name() +
" request");
} else if (fullResponse && contentLength > 0 && !HEAD.equals(method)) {
handleUnexpectedContentLength();
}
}
return new NettyH2HeadersToHttpHeaders(h2Headers, headersFactory.validateCookies());
Expand Down
Expand Up @@ -26,6 +26,7 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Headers;
Expand All @@ -47,6 +48,7 @@
import static io.servicetalk.http.netty.H2ToStH1Utils.h1HeadersToH2Headers;
import static io.servicetalk.http.netty.H2ToStH1Utils.h2HeadersSanitizeForH1;
import static io.servicetalk.http.netty.HeaderUtils.clientMaySendPayloadBodyFor;
import static io.servicetalk.http.netty.HeaderUtils.contentLength;

final class H2ToStH1ServerDuplexHandler extends AbstractH2DuplexHandler {
private boolean readHeaders;
Expand Down Expand Up @@ -100,7 +102,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (httpMethod != null) {
fireFullRequest(ctx, h2Headers, httpMethod, path);
} else {
validateContentLengthMatch();
ctx.fireChannelRead(h2TrailersToH1TrailersServer(h2Headers));
}
closeHandler.protocolPayloadEndInbound(ctx);
Expand Down Expand Up @@ -135,16 +136,15 @@ private NettyH2HeadersToHttpHeaders h2HeadersToH1HeadersServer(Http2Headers h2He
h2Headers.remove(Http2Headers.PseudoHeaderName.SCHEME.value());
h2HeadersSanitizeForH1(h2Headers);
if (httpMethod != null) {
final long contentLength = contentLength(h2Headers);
final long contentLength = contentLength(h2Headers.valueIterator(HttpHeaderNames.CONTENT_LENGTH),
h2Headers::getAll);
if (clientMaySendPayloadBodyFor(httpMethod)) {
if (contentLength < 0) {
if (fullRequest) {
h2Headers.set(CONTENT_LENGTH, ZERO);
} else {
h2Headers.add(TRANSFER_ENCODING, CHUNKED);
}
} else if (fullRequest && contentLength > 0) {
handleUnexpectedContentLength();
}
} else if (contentLength >= 0) {
throw new IllegalArgumentException("content-length (" + contentLength +
Expand Down
Expand Up @@ -56,7 +56,6 @@
import static io.servicetalk.transport.netty.internal.CloseHandler.forNonPipelined;
import static java.lang.String.valueOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -303,18 +302,6 @@ private void withContentLength(boolean addTrailers) {
assertThat(channel.inboundMessages(), is(empty()));
}

@Test
public void singleHeadersFrameWithContentLength() {
variant.writeOutbound(channel);

Http2Headers headers = variant.setHeaders(new DefaultHttp2Headers());
headers.setInt(CONTENT_LENGTH, 1);

IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> channel.writeInbound(new DefaultHttp2HeadersFrame(headers, true)));
assertThat(e.getMessage(), containsString("not equal to the actual length"));
}

@Test
public void singleHeadersFrameWithZeroContentLength() {
variant.writeOutbound(channel);
Expand All @@ -330,54 +317,4 @@ public void singleHeadersFrameWithZeroContentLength() {
assertThat(trailers.isEmpty(), is(true));
assertThat(channel.inboundMessages(), is(empty()));
}

@Test
public void lessThanActual() {
invalidContentLength(3, "hello", false);
}

@Test
public void lessThanActualWithTrailers() {
invalidContentLength(3, "hello", true);
}

@Test
public void notEqualToActualLength() {
invalidContentLength(10, "hello", false);
}

@Test
public void notEqualToActualLengthWithTrailers() {
invalidContentLength(10, "hello", true);
}

private void invalidContentLength(int contentLength, String content, boolean addTrailers) {
variant.writeOutbound(channel);

Http2Headers headers = variant.setHeaders(new DefaultHttp2Headers());
headers.setInt(CONTENT_LENGTH, contentLength);
channel.writeInbound(new DefaultHttp2HeadersFrame(headers));

HttpMetaData metaData = channel.readInbound();
assertThat(metaData.headers().get(CONTENT_LENGTH), contentEqualTo(valueOf(contentLength)));

final IllegalArgumentException e;
if (addTrailers) {
if (contentLength < content.length()) {
e = assertThrows(IllegalArgumentException.class, () -> channel.writeInbound(new DefaultHttp2DataFrame(
writeAscii(UnpooledByteBufAllocator.DEFAULT, content))));
} else {
channel.writeInbound(new DefaultHttp2DataFrame(writeAscii(UnpooledByteBufAllocator.DEFAULT, content)));
Buffer buffer = channel.readInbound();
assertThat(buffer, is(equalTo(DEFAULT_ALLOCATOR.fromAscii(content))));

e = assertThrows(IllegalArgumentException.class, () -> channel.writeInbound(
new DefaultHttp2HeadersFrame(new DefaultHttp2Headers().set("trailer", "value"), true)));
}
} else {
e = assertThrows(IllegalArgumentException.class, () -> channel.writeInbound(new DefaultHttp2DataFrame(
writeAscii(UnpooledByteBufAllocator.DEFAULT, content), true)));
}
assertThat(e.getMessage(), containsString("not equal to the actual length"));
}
}

0 comments on commit 2820d26

Please sign in to comment.