Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API to allow custom Stream ID creation in Http2FrameCodec #13007

Open
wants to merge 5 commits into
base: 4.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ public boolean isPushPromiseSent() {
/**
* Simple endpoint implementation.
*/
private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
private final boolean server;
/**
* The minimum stream ID allowed when creating the next stream. This only applies at the time the stream is
Expand Down Expand Up @@ -721,6 +721,38 @@ public int incrementAndGetNextStreamId() {
return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
}

/**
* Set a custom stream ID which will be created on next stream initialization.
*
* @param streamId Custom stream ID
* @throws IllegalArgumentException If custom Stream ID is not valid
*/
void setReservationStreamId(int streamId) {
if (!(streamId > nextReservationStreamId && nextReservationStreamId >= 0)) {
throw new IllegalArgumentException("Stream ID must be greater then previous stream ID " +
"and must be greater than 0");
}

final int nextRequestedStreamId = nextReservationStreamId + 2;
if (server) {
// Server Stream ID must be even. If stream ID is valid then we will update 'nextReservationStreamId'.
// Else, if Stream ID not even then we will throw an exception.
if (nextRequestedStreamId % 2 == 0) {
nextReservationStreamId = streamId;
} else {
throw new IllegalArgumentException("Server Stream ID must be even");
}
} else {
// Client Stream ID must be odd. If stream ID is valid then we will update 'nextReservationStreamId'.
// Else, if Stream ID not odd then we will throw an exception.
if (nextRequestedStreamId % 2 != 0) {
throw new IllegalArgumentException("Client Stream ID must be odd");
} else {
nextReservationStreamId = streamId;
}
}
}

private void incrementExpectedStreamId(int streamId) {
if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
nextReservationStreamId = streamId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ public final Http2FrameStream newStream() {
return codec.newStream();
}

/**
* Creates a new {@link Http2FrameStream} object with specified id.
*
* <p>This method is <em>thread-safe</em>.
*/
public Http2FrameStream newStream(int streamId) {
Http2FrameCodec codec = frameCodec;
if (codec == null) {
throw new IllegalStateException(StringUtil.simpleClassName(Http2FrameCodec.class) + " not found." +
" Has the handler been added to a pipeline?");
}
return codec.newStream(streamId);
}

/**
* Allows to iterate over all currently active streams.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static io.netty.util.internal.logging.InternalLogLevel.DEBUG;

/**
Expand Down Expand Up @@ -182,6 +183,12 @@ DefaultHttp2FrameStream newStream() {
return new DefaultHttp2FrameStream();
}

DefaultHttp2FrameStream newStream(int streamId) {
DefaultHttp2FrameStream defaultHttp2FrameStream = new DefaultHttp2FrameStream();
defaultHttp2FrameStream.id = checkPositive(streamId, "Stream ID");
return defaultHttp2FrameStream;
}

/**
* Iterates over all active HTTP/2 streams.
*
Expand Down Expand Up @@ -372,7 +379,7 @@ final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
Http2Stream stream = connection().stream(streamId);
// Upgraded requests are ineligible for stream control. We add the null check
// in case the stream has been deregistered.
if (stream != null && streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
if (stream != null && streamId == HTTP_UPGRADE_STREAM_ID) {
Boolean upgraded = stream.getProperty(upgradeKey);
if (Boolean.TRUE.equals(upgraded)) {
return false;
Expand All @@ -389,7 +396,7 @@ private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame,
}

int lastStreamCreated = connection().remote().lastStreamCreated();
long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
long lastStreamId = lastStreamCreated + (long) frame.extraStreamIds() * 2;
// Check if the computation overflowed.
if (lastStreamId > Integer.MAX_VALUE) {
lastStreamId = Integer.MAX_VALUE;
Expand Down Expand Up @@ -456,7 +463,18 @@ public void operationComplete(ChannelFuture channelFuture) {
private boolean initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream,
ChannelPromise promise) {
final Http2Connection connection = connection();
final int streamId = connection.local().incrementAndGetNextStreamId();
final int streamId;

// If Http2FrameStream ID is defined then we will try to use it.
// Else, we will allocate it from Endpoint.
if (http2FrameStream.id > 0 && connection.local() instanceof DefaultHttp2Connection.DefaultEndpoint) {
((DefaultHttp2Connection.DefaultEndpoint<?>) connection.local())
.setReservationStreamId(http2FrameStream.id);
streamId = http2FrameStream.id;
} else {
streamId = connection.local().incrementAndGetNextStreamId();
}

if (streamId < 0) {
promise.setFailure(new Http2NoMoreStreamIdsException());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ final Http2StreamChannel newOutboundStream() {
return new Http2MultiplexCodecStreamChannel(newStream(), null);
}

final Http2StreamChannel newOutboundStream(int streamId) {
return new Http2MultiplexCodecStreamChannel(newStream(streamId), null);
}

@Override
final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
Http2FrameStream stream = cause.stream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ Http2StreamChannel newOutboundStream() {
return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
}

Http2StreamChannel newOutboundStream(int streamId) {
return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(streamId), null);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
if (cause instanceof Http2FrameStreamException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,30 @@ public Http2StreamChannelBootstrap handler(ChannelHandler handler) {
* @return the {@link Future} that will be notified once the channel was opened successfully or it failed.
*/
public Future<Http2StreamChannel> open() {
return open(channel.eventLoop().<Http2StreamChannel>newPromise());
return open(-1);
}

/**
* Open a new {@link Http2StreamChannel} to use.
* @return the {@link Future} that will be notified once the channel was opened successfully or it failed.
*/
public Future<Http2StreamChannel> open(int streamId) {
return open(channel.eventLoop().<Http2StreamChannel>newPromise(), streamId);
}

/**
* Open a new {@link Http2StreamChannel} to use and notifies the given {@link Promise}.
* @return the {@link Future} that will be notified once the channel was opened successfully or it failed.
*/
public Future<Http2StreamChannel> open(final Promise<Http2StreamChannel> promise) {
return open(promise, -1);
}

/**
* Open a new {@link Http2StreamChannel} to use and notifies the given {@link Promise}.
* @return the {@link Future} that will be notified once the channel was opened successfully or it failed.
*/
public Future<Http2StreamChannel> open(final Promise<Http2StreamChannel> promise, final int streamId) {
try {
ChannelHandlerContext ctx = findCtx();
EventExecutor executor = ctx.executor();
Expand All @@ -122,7 +138,7 @@ public Future<Http2StreamChannel> open(final Promise<Http2StreamChannel> promise
@Override
public void run() {
if (channel.isActive()) {
open0(finalCtx, promise);
open0(finalCtx, promise, streamId);
} else {
promise.setFailure(new ClosedChannelException());
}
Expand Down Expand Up @@ -165,16 +181,32 @@ private ChannelHandlerContext findCtx() throws ClosedChannelException {
*/
@Deprecated
public void open0(ChannelHandlerContext ctx, final Promise<Http2StreamChannel> promise) {
open0(ctx, promise, -1);
}

/**
* @deprecated should not be used directly. Use {@link #open()} or {@link #open(Promise)}
*/
@Deprecated
public void open0(ChannelHandlerContext ctx, final Promise<Http2StreamChannel> promise, int streamId) {
assert ctx.executor().inEventLoop();
if (!promise.setUncancellable()) {
return;
}
final Http2StreamChannel streamChannel;
try {
if (ctx.handler() instanceof Http2MultiplexCodec) {
streamChannel = ((Http2MultiplexCodec) ctx.handler()).newOutboundStream();
if (streamId == -1) {
streamChannel = ((Http2MultiplexCodec) ctx.handler()).newOutboundStream();
} else {
streamChannel = ((Http2MultiplexCodec) ctx.handler()).newOutboundStream(streamId);
}
} else {
streamChannel = ((Http2MultiplexHandler) ctx.handler()).newOutboundStream();
if (streamId == -1) {
streamChannel = ((Http2MultiplexHandler) ctx.handler()).newOutboundStream();
} else {
streamChannel = ((Http2MultiplexHandler) ctx.handler()).newOutboundStream(streamId);
}
}
} catch (Exception e) {
promise.setFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,35 @@ public void operationComplete(ChannelFuture future) throws Exception {
assertTrue(listenerExecuted.isSuccess());
}

@Test
@Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
public void newOutboundStreamWithSpecifiedID() {
final Http2FrameStream stream = frameCodec.newStream(9110);

assertNotNull(stream);
assertTrue(isStreamIdValid(stream.id()));

final Promise<Void> listenerExecuted = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), false).stream(stream))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assertTrue(future.isSuccess());
assertTrue(isStreamIdValid(stream.id()));
assertEquals(9110, stream.id());
listenerExecuted.setSuccess(null);
}
}
);
ByteBuf data = Unpooled.buffer().writeZero(100);
ChannelFuture f = channel.writeAndFlush(new DefaultHttp2DataFrame(data).stream(stream));
assertTrue(f.isSuccess());

listenerExecuted.syncUninterruptibly();
assertTrue(listenerExecuted.isSuccess());
}

@Test
public void newOutboundStreamsShouldBeBuffered() throws Exception {
setUp(Http2FrameCodecBuilder.forServer().encoderEnforceMaxConcurrentStreams(true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ private Http2StreamChannel newOutboundStream(ChannelHandler handler) {
return new Http2StreamChannelBootstrap(clientChannel).handler(handler).open().syncUninterruptibly().getNow();
}

private Http2StreamChannel newOutboundStream(ChannelHandler handler, int streamId) {
return new Http2StreamChannelBootstrap(clientChannel).handler(handler).open(streamId)
.syncUninterruptibly().getNow();
}

@Test
public void multipleOutboundStreams() throws Exception {
Http2StreamChannel childChannel1 = newOutboundStream(new TestChannelInitializer());
Expand Down Expand Up @@ -182,6 +187,39 @@ public void multipleOutboundStreams() throws Exception {
serverLastInboundHandler.checkException();
}

@Test
public void multipleOutboundStreamsWithCustomStreamID() throws Exception {
Http2StreamChannel childChannel1 = newOutboundStream(new TestChannelInitializer(), 9951);
assertTrue(childChannel1.isActive());
assertTrue(isStreamIdValid(childChannel1.stream().id()));
Http2StreamChannel childChannel2 = newOutboundStream(new TestChannelInitializer(), 8787);
assertTrue(childChannel2.isActive());
assertTrue(isStreamIdValid(childChannel2.stream().id()));

Http2Headers headers1 = new DefaultHttp2Headers();
Http2Headers headers2 = new DefaultHttp2Headers();
// Test that streams can be made active (headers sent) in different order than the corresponding channels
// have been created.
childChannel2.writeAndFlush(new DefaultHttp2HeadersFrame(headers2));
childChannel1.writeAndFlush(new DefaultHttp2HeadersFrame(headers1));

Http2HeadersFrame headersFrame2 = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame2);
assertEquals(8787, headersFrame2.stream().id());

Http2HeadersFrame headersFrame1 = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame1);
assertEquals(9951, headersFrame1.stream().id());

assertEquals(8787, childChannel2.stream().id());
assertEquals(9951, childChannel1.stream().id());

childChannel1.close();
childChannel2.close();

serverLastInboundHandler.checkException();
}

@Test
public void createOutboundStream() throws Exception {
Channel childChannel = newOutboundStream(new TestChannelInitializer());
Expand Down