Skip to content

Commit

Permalink
netty: add an internal option to disable native buffer (#6619)
Browse files Browse the repository at this point in the history
This is needed for internal rollout where the native memory usage from netty makes task more prone to exceeding memory limits.
  • Loading branch information
zhangkun83 committed Jan 21, 2020
1 parent e7d7c5b commit 74cde7e
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 36 deletions.
Expand Up @@ -43,6 +43,10 @@ public static void setTracingEnabled(NettyServerBuilder builder, boolean value)
builder.setTracingEnabled(value);
}

public static void setForceHeapBuffer(NettyServerBuilder builder, boolean value) {
builder.setForceHeapBuffer(value);
}

/**
* Sets {@link io.grpc.Channel} and {@link io.netty.channel.EventLoopGroup}s to Nio. A major
* benefit over using existing setters is gRPC will manage the life cycle of {@link
Expand Down
Expand Up @@ -226,7 +226,7 @@ public Runnable start(Listener transportListener) {
ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Bootstrap b = new Bootstrap();
b.option(ALLOCATOR, Utils.getByteBufAllocator());
b.option(ALLOCATOR, Utils.getByteBufAllocator(false));
b.attr(LOGGER_KEY, channelLogger);
b.group(eventLoop);
b.channelFactory(channelFactory);
Expand Down
7 changes: 5 additions & 2 deletions netty/src/main/java/io/grpc/netty/NettyServer.java
Expand Up @@ -73,6 +73,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final int maxStreamsPerConnection;
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
private final boolean forceHeapBuffer;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerListener listener;
Expand Down Expand Up @@ -100,6 +101,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
Map<ChannelOption<?>, ?> channelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
boolean forceHeapBuffer,
ProtocolNegotiator protocolNegotiator,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
Expand All @@ -115,6 +117,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
this.forceHeapBuffer = forceHeapBuffer;
this.bossGroup = bossGroupPool.getObject();
this.workerGroup = workerGroupPool.getObject();
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
Expand Down Expand Up @@ -155,8 +158,8 @@ public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");

ServerBootstrap b = new ServerBootstrap();
b.option(ALLOCATOR, Utils.getByteBufAllocator());
b.childOption(ALLOCATOR, Utils.getByteBufAllocator());
b.option(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
b.childOption(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
b.group(bossGroup, workerGroup);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
Expand Down
10 changes: 9 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
Expand Up @@ -89,6 +89,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL;
private ObjectPool<? extends EventLoopGroup> workerEventLoopGroupPool =
DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
private boolean forceHeapBuffer;
private SslContext sslContext;
private ProtocolNegotiator protocolNegotiator;
private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE;
Expand Down Expand Up @@ -268,6 +269,13 @@ NettyServerBuilder workerEventLoopGroupPool(
return this;
}

/**
* Force using heap buffer when custom allocator is enabled.
*/
void setForceHeapBuffer(boolean value) {
forceHeapBuffer = value;
}

/**
* Sets the TLS context to use for encryption. Providing a context enables encryption. It must
* have been configured with {@link GrpcSslContexts}, but options could have been overridden.
Expand Down Expand Up @@ -542,7 +550,7 @@ protected List<NettyServer> buildTransportServers(
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
workerEventLoopGroupPool, forceHeapBuffer, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
Expand Down
74 changes: 43 additions & 31 deletions netty/src/main/java/io/grpc/netty/Utils.java
Expand Up @@ -87,35 +87,13 @@ class Utils {
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;

// This class is initialized on first use, thus provides delayed allocator creation.
private static final class ByteBufAllocatorHolder {
private static final ByteBufAllocator allocator;

static {
if (Boolean.parseBoolean(
System.getProperty("io.grpc.netty.useCustomAllocator", "true"))) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
// 2MiB, thus reducing the maxOrder to 8.
maxOrder = 8;
} else {
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
}
allocator = new PooledByteBufAllocator(
PooledByteBufAllocator.defaultPreferDirect(),
PooledByteBufAllocator.defaultNumHeapArena(),
PooledByteBufAllocator.defaultNumDirectArena(),
PooledByteBufAllocator.defaultPageSize(),
maxOrder,
PooledByteBufAllocator.defaultTinyCacheSize(),
PooledByteBufAllocator.defaultSmallCacheSize(),
PooledByteBufAllocator.defaultNormalCacheSize(),
PooledByteBufAllocator.defaultUseCacheForAllThreads());
} else {
allocator = ByteBufAllocator.DEFAULT;
}
}
private static final class ByteBufAllocatorPreferDirectHolder {
private static final ByteBufAllocator allocator = createByteBufAllocator(true);
}

// This class is initialized on first use, thus provides delayed allocator creation.
private static final class ByteBufAllocatorPreferHeapHolder {
private static final ByteBufAllocator allocator = createByteBufAllocator(false);
}

public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
Expand Down Expand Up @@ -144,8 +122,42 @@ private static final class ByteBufAllocatorHolder {
}
}

public static ByteBufAllocator getByteBufAllocator() {
return ByteBufAllocatorHolder.allocator;
public static ByteBufAllocator getByteBufAllocator(boolean forceHeapBuffer) {
if (Boolean.parseBoolean(
System.getProperty("io.grpc.netty.useCustomAllocator", "true"))) {
if (forceHeapBuffer || !PooledByteBufAllocator.defaultPreferDirect()) {
return ByteBufAllocatorPreferHeapHolder.allocator;
} else {
return ByteBufAllocatorPreferDirectHolder.allocator;
}
} else {
return ByteBufAllocator.DEFAULT;
}
}

private static ByteBufAllocator createByteBufAllocator(boolean preferDirect) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
// 2MiB, thus reducing the maxOrder to 8.
maxOrder = 8;
} else {
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
}
return new PooledByteBufAllocator(
preferDirect,
PooledByteBufAllocator.defaultNumHeapArena(),
// Assuming neither gRPC nor netty are using allocator.directBuffer() to request
// specifically for direct buffers, which is true as I just checked, setting arenas to 0
// will make sure no direct buffer is ever created.
preferDirect ? PooledByteBufAllocator.defaultNumDirectArena() : 0,
PooledByteBufAllocator.defaultPageSize(),
maxOrder,
PooledByteBufAllocator.defaultTinyCacheSize(),
PooledByteBufAllocator.defaultSmallCacheSize(),
PooledByteBufAllocator.defaultNormalCacheSize(),
PooledByteBufAllocator.defaultUseCacheForAllThreads());
}

public static Metadata convertHeaders(Http2Headers http2Headers) {
Expand Down
Expand Up @@ -769,7 +769,7 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr
TestUtils.testServerAddress(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
new FixedObjectPool<>(group), new FixedObjectPool<>(group), false, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
Expand Down
4 changes: 4 additions & 0 deletions netty/src/test/java/io/grpc/netty/NettyServerTest.java
Expand Up @@ -94,6 +94,7 @@ class TestProtocolNegotiator implements ProtocolNegotiator {
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
protocolNegotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down Expand Up @@ -138,6 +139,7 @@ public void getPort_notStarted() {
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down Expand Up @@ -176,6 +178,7 @@ public void childChannelOptions() throws Exception {
channelOptions,
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down Expand Up @@ -226,6 +229,7 @@ public void channelzListenSocket() throws Exception {
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down

0 comments on commit 74cde7e

Please sign in to comment.