Skip to content

Commit

Permalink
netty: support setting options of boss in NettyServer (grpc#6947)
Browse files Browse the repository at this point in the history
  • Loading branch information
asdf2014 authored and dfawley committed Jan 15, 2021
1 parent b705c4e commit 1993e40
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
12 changes: 12 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyServer.java
Expand Up @@ -69,6 +69,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final SocketAddress address;
private final ChannelFactory<? extends ServerChannel> channelFactory;
private final Map<ChannelOption<?>, ?> channelOptions;
private final Map<ChannelOption<?>, ?> childChannelOptions;
private final ProtocolNegotiator protocolNegotiator;
private final int maxStreamsPerConnection;
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
Expand Down Expand Up @@ -99,6 +100,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
NettyServer(
SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions,
Map<ChannelOption<?>, ?> childChannelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
boolean forceHeapBuffer,
Expand All @@ -115,6 +117,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
checkNotNull(childChannelOptions, "childChannelOptions");
this.childChannelOptions = new HashMap<ChannelOption<?>, Object>(childChannelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
this.forceHeapBuffer = forceHeapBuffer;
Expand Down Expand Up @@ -168,6 +172,14 @@ public void start(ServerListener serverListener) throws IOException {

if (channelOptions != null) {
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
@SuppressWarnings("unchecked")
ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
b.option(key, entry.getValue());
}
}

if (childChannelOptions != null) {
for (Map.Entry<ChannelOption<?>, ?> entry : childChannelOptions.entrySet()) {
@SuppressWarnings("unchecked")
ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
b.childOption(key, entry.getValue());
Expand Down
27 changes: 20 additions & 7 deletions netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
Expand Up @@ -85,6 +85,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
private ChannelFactory<? extends ServerChannel> channelFactory =
Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
private final Map<ChannelOption<?>, Object> childChannelOptions = new HashMap<>();
private ObjectPool<? extends EventLoopGroup> bossEventLoopGroupPool =
DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL;
private ObjectPool<? extends EventLoopGroup> workerEventLoopGroupPool =
Expand Down Expand Up @@ -189,10 +190,21 @@ public NettyServerBuilder channelFactory(ChannelFactory<? extends ServerChannel>
* Specifies a channel option. As the underlying channel as well as network implementation may
* ignore this value applications should consider it a hint.
*
* @since 1.30.0
*/
public <T> NettyServerBuilder withOption(ChannelOption<T> option, T value) {
this.channelOptions.put(option, value);
return this;
}

/**
* Specifies a child channel option. As the underlying channel as well as network implementation
* may ignore this value applications should consider it a hint.
*
* @since 1.9.0
*/
public <T> NettyServerBuilder withChildOption(ChannelOption<T> option, T value) {
this.channelOptions.put(option, value);
this.childChannelOptions.put(option, value);
return this;
}

Expand Down Expand Up @@ -549,12 +561,13 @@ protected List<NettyServer> buildTransportServers(
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, forceHeapBuffer, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
listenAddress, channelFactory, channelOptions, childChannelOptions,
bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator,
streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection,
flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos,
keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos,
maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos,
getChannelz());
transportServers.add(transportServer);
}
return Collections.unmodifiableList(transportServers);
Expand Down
Expand Up @@ -769,6 +769,7 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr
TestUtils.testServerAddress(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group), false, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down
10 changes: 7 additions & 3 deletions netty/src/test/java/io/grpc/netty/NettyServerTest.java
Expand Up @@ -92,6 +92,7 @@ class TestProtocolNegotiator implements ProtocolNegotiator {
addr,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
Expand Down Expand Up @@ -137,6 +138,7 @@ public void getPort_notStarted() {
addr,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
Expand All @@ -161,9 +163,9 @@ public void childChannelOptions() throws Exception {
final int originalLowWaterMark = 2097169;
final int originalHighWaterMark = 2097211;

Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
Map<ChannelOption<?>, Object> childChannelOptions = new HashMap<>();

channelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK,
childChannelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(originalLowWaterMark, originalHighWaterMark));

final AtomicInteger lowWaterMark = new AtomicInteger(0);
Expand All @@ -175,7 +177,8 @@ public void childChannelOptions() throws Exception {
NettyServer ns = new NettyServer(
addr,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
channelOptions,
new HashMap<ChannelOption<?>, Object>(),
childChannelOptions,
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
Expand Down Expand Up @@ -227,6 +230,7 @@ public void channelzListenSocket() throws Exception {
addr,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
Expand Down

0 comments on commit 1993e40

Please sign in to comment.