Skip to content

Commit

Permalink
netty: provide an option to lower netty allocator chunk size from 16M…
Browse files Browse the repository at this point in the history
…B to 2MB (grpc#6407)

This would reduce the amount of direct buffer allocations, especially with light traffic. This should mitigate internal issue b/143075435

The change is currently optional and is only effective if system property "io.grpc.netty.useCustomAllocator" is set to "true" ignoring the case.

Internal benchmark results (median of 5 runs) doesn't show any significant change:
```
                          Before (STDEV)           After (STDEV)
grpc-java-java-multi-qps-integrity_only
Actual QPS               717,848 (7,445)         715,061 (2,122) 
QPS per Client CPU        23,768   (799)          23,842   (295)

grpc-java-java-multi-throughput-integrity_only
Actual QPS                35,631   (204)          35,298    (25) 
QPS per Client CPU         3,362    (56)           3,316    (18)

grpc-java-java-single-latency-integrity_only
Median latency (us)          130  (1.82)             125  (5.36)

grpc-java-java-single-throughput-integrity_only
Actual QPS                    593 (5.14)             587  (3.76)
QPS per Client CPU            502 (4.51)             494  (6.92)

```
  • Loading branch information
zhangkun83 authored and ericgribkoff committed Dec 6, 2019
1 parent 3074d50 commit 6a0e39c
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 12 deletions.
13 changes: 11 additions & 2 deletions netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -74,6 +75,8 @@ public final class NettyChannelBuilder
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);

private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<>();
Expand Down Expand Up @@ -420,7 +423,7 @@ protected ClientTransportFactory buildTransportFactory() {

return new NettyTransportFactory(
negotiator, channelFactory, channelOptions,
eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory, localSocketPicker, useGetForSafeMethods);
}
Expand Down Expand Up @@ -535,6 +538,8 @@ private static final class NettyTransportFactory implements ClientTransportFacto
private final Map<ChannelOption<?>, ?> channelOptions;
private final ObjectPool<? extends EventLoopGroup> groupPool;
private final EventLoopGroup group;
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private final ByteBufAllocator allocator;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
Expand All @@ -550,6 +555,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
ObjectPool<? extends ByteBufAllocator> allocatorPool,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
Expand All @@ -559,6 +565,8 @@ private static final class NettyTransportFactory implements ClientTransportFacto
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.groupPool = groupPool;
this.group = groupPool.getObject();
this.allocatorPool = allocatorPool;
this.allocator = allocatorPool.getObject();
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
Expand Down Expand Up @@ -597,7 +605,7 @@ public void run() {

// TODO(carl-mastrangelo): Pass channelLogger in.
NettyClientTransport transport = new NettyClientTransport(
serverAddress, channelFactory, channelOptions, group,
serverAddress, channelFactory, channelOptions, group, allocator,
localNegotiator, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
Expand All @@ -618,6 +626,7 @@ public void close() {
}
closed = true;

allocatorPool.returnObject(allocator);
protocolNegotiator.close();
groupPool.returnObject(group);
}
Expand Down
6 changes: 6 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.netty;

import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -43,6 +44,7 @@
import io.grpc.internal.TransportTracer;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -74,6 +76,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final SocketAddress remoteAddress;
private final ChannelFactory<? extends Channel> channelFactory;
private final EventLoopGroup group;
private final ByteBufAllocator allocator;
private final ProtocolNegotiator negotiator;
private final String authorityString;
private final AsciiString authority;
Expand Down Expand Up @@ -105,6 +108,7 @@ class NettyClientTransport implements ConnectionClientTransport {
NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
ByteBufAllocator allocator,
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Expand All @@ -115,6 +119,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.allocator = Preconditions.checkNotNull(allocator, "allocator");
this.channelFactory = channelFactory;
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
this.flowControlWindow = flowControlWindow;
Expand Down Expand Up @@ -225,6 +230,7 @@ public Runnable start(Listener transportListener) {
ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Bootstrap b = new Bootstrap();
b.option(ALLOCATOR, allocator);
b.attr(LOGGER_KEY, channelLogger);
b.group(eventLoop);
b.channelFactory(channelFactory);
Expand Down
27 changes: 22 additions & 5 deletions netty/src/main/java/io/grpc/netty/NettyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;

Expand All @@ -37,6 +38,7 @@
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -73,8 +75,10 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final int maxStreamsPerConnection;
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ByteBufAllocator allocator;
private ServerListener listener;
private Channel channel;
private final int flowControlWindow;
Expand All @@ -87,7 +91,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final long maxConnectionAgeGraceInNanos;
private final boolean permitKeepAliveWithoutCalls;
private final long permitKeepAliveTimeInNanos;
private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter();
private final ReferenceCounted sharedResourceReferenceCounter =
new SharedResourceReferenceCounter();
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer.Factory transportTracerFactory;
private final InternalChannelz channelz;
Expand All @@ -100,6 +105,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
Map<ChannelOption<?>, ?> channelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
ObjectPool<? extends ByteBufAllocator> allocatorPool,
ProtocolNegotiator protocolNegotiator,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
Expand All @@ -115,8 +121,10 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool");
this.bossGroup = bossGroupPool.getObject();
this.workerGroup = workerGroupPool.getObject();
this.allocator = allocatorPool.getObject();
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracerFactory = transportTracerFactory;
Expand Down Expand Up @@ -155,6 +163,8 @@ public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");

ServerBootstrap b = new ServerBootstrap();
b.option(ALLOCATOR, allocator);
b.childOption(ALLOCATOR, allocator);
b.group(bossGroup, workerGroup);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
Expand Down Expand Up @@ -210,7 +220,7 @@ public void initChannel(Channel ch) {
}
// `channel` shutdown can race with `ch` initialization, so this is only safe to increment
// inside the lock.
eventLoopReferenceCounter.retain();
sharedResourceReferenceCounter.retain();
transportListener = listener.transportCreated(transport);
}

Expand All @@ -224,7 +234,7 @@ final class LoopReleaser implements ChannelFutureListener {
public void operationComplete(ChannelFuture future) throws Exception {
if (!done) {
done = true;
eventLoopReferenceCounter.release();
sharedResourceReferenceCounter.release();
}
}
}
Expand Down Expand Up @@ -281,7 +291,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
synchronized (NettyServer.this) {
listener.serverShutdown();
}
eventLoopReferenceCounter.release();
sharedResourceReferenceCounter.release();
}
});
try {
Expand All @@ -305,7 +315,7 @@ public String toString() {
.toString();
}

class EventLoopReferenceCounter extends AbstractReferenceCounted {
class SharedResourceReferenceCounter extends AbstractReferenceCounted {
@Override
protected void deallocate() {
try {
Expand All @@ -320,6 +330,13 @@ protected void deallocate() {
}
} finally {
workerGroup = null;
try {
if (allocator != null) {
allocatorPool.returnObject(allocator);
}
} finally {
allocator = null;
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -79,6 +80,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);

private final List<SocketAddress> listenAddresses = new ArrayList<>();

Expand Down Expand Up @@ -541,7 +544,7 @@ protected List<NettyServer> buildTransportServers(
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
Expand Down
38 changes: 38 additions & 0 deletions netty/src/main/java/io/grpc/netty/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders;
import io.grpc.netty.NettySocketSupport.NativeSocketOptions;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
Expand Down Expand Up @@ -83,6 +85,42 @@ class Utils {
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;

public static final Resource<ByteBufAllocator> BYTE_BUF_ALLOCATOR =
new Resource<ByteBufAllocator>() {
@Override
public ByteBufAllocator create() {
if (Boolean.parseBoolean(
System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
// 2MiB, thus reducing the maxOrder to 8.
maxOrder = 8;
} else {
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
}
return new PooledByteBufAllocator(
PooledByteBufAllocator.defaultPreferDirect(),
PooledByteBufAllocator.defaultNumHeapArena(),
PooledByteBufAllocator.defaultNumDirectArena(),
PooledByteBufAllocator.defaultPageSize(),
maxOrder,
PooledByteBufAllocator.defaultTinyCacheSize(),
PooledByteBufAllocator.defaultSmallCacheSize(),
PooledByteBufAllocator.defaultNormalCacheSize(),
PooledByteBufAllocator.defaultUseCacheForAllThreads());
} else {
return ByteBufAllocator.DEFAULT;
}
}

@Override
public void close(ByteBufAllocator allocator) {
// PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC.
}
};

public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;

Expand Down
14 changes: 10 additions & 4 deletions netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@
import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
Expand Down Expand Up @@ -123,6 +126,7 @@ public class NettyClientTransportTest {
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
new LinkedBlockingQueue<>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR);
private final EchoServerListener serverListener = new EchoServerListener();
private final InternalChannelz channelz = new InternalChannelz();
private Runnable tooManyPingsRunnable = new Runnable() {
Expand Down Expand Up @@ -153,6 +157,7 @@ public void teardown() throws Exception {
}

group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator);
}

@Test
Expand Down Expand Up @@ -190,7 +195,7 @@ public void setSoLingerChannelOption() throws IOException {
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker(), new FakeChannelLogger(), false);
Expand Down Expand Up @@ -435,7 +440,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception {
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
new HashMap<ChannelOption<?>, Object>(), group,
new HashMap<ChannelOption<?>, Object>(), group, allocator,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
Expand Down Expand Up @@ -705,7 +710,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
}
NettyClientTransport transport = new NettyClientTransport(
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group, allocator,
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
Expand All @@ -723,7 +728,8 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr
TestUtils.testServerAddress(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
new FixedObjectPool<>(group), new FixedObjectPool<>(group),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
Expand Down

0 comments on commit 6a0e39c

Please sign in to comment.