Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty: provide an option to lower netty allocator chunk size from 16MB to 2MB #6407

Merged
merged 8 commits into from
Nov 14, 2019
13 changes: 11 additions & 2 deletions netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -73,6 +74,8 @@ public final class NettyChannelBuilder
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);

private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<>();
Expand Down Expand Up @@ -419,7 +422,7 @@ protected ClientTransportFactory buildTransportFactory() {

return new NettyTransportFactory(
negotiator, channelFactory, channelOptions,
eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory, localSocketPicker, useGetForSafeMethods);
}
Expand Down Expand Up @@ -534,6 +537,8 @@ private static final class NettyTransportFactory implements ClientTransportFacto
private final Map<ChannelOption<?>, ?> channelOptions;
private final ObjectPool<? extends EventLoopGroup> groupPool;
private final EventLoopGroup group;
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private final ByteBufAllocator allocator;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
Expand All @@ -549,6 +554,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
ObjectPool<? extends ByteBufAllocator> allocatorPool,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
Expand All @@ -558,6 +564,8 @@ private static final class NettyTransportFactory implements ClientTransportFacto
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.groupPool = groupPool;
this.group = groupPool.getObject();
this.allocatorPool = allocatorPool;
this.allocator = allocatorPool.getObject();
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
Expand Down Expand Up @@ -596,7 +604,7 @@ public void run() {

// TODO(carl-mastrangelo): Pass channelLogger in.
NettyClientTransport transport = new NettyClientTransport(
serverAddress, channelFactory, channelOptions, group,
serverAddress, channelFactory, channelOptions, group, allocator,
localNegotiator, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
Expand All @@ -617,6 +625,7 @@ public void close() {
}
closed = true;

allocatorPool.returnObject(allocator);
protocolNegotiator.close();
groupPool.returnObject(group);
}
Expand Down
6 changes: 6 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.netty;

import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -43,6 +44,7 @@
import io.grpc.internal.TransportTracer;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -74,6 +76,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final SocketAddress remoteAddress;
private final ChannelFactory<? extends Channel> channelFactory;
private final EventLoopGroup group;
private final ByteBufAllocator allocator;
private final ProtocolNegotiator negotiator;
private final String authorityString;
private final AsciiString authority;
Expand Down Expand Up @@ -105,6 +108,7 @@ class NettyClientTransport implements ConnectionClientTransport {
NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
ByteBufAllocator allocator,
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Expand All @@ -115,6 +119,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.allocator = Preconditions.checkNotNull(allocator, "allocator");
this.channelFactory = channelFactory;
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
this.flowControlWindow = flowControlWindow;
Expand Down Expand Up @@ -225,6 +230,7 @@ public Runnable start(Listener transportListener) {
ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Bootstrap b = new Bootstrap();
b.option(ALLOCATOR, allocator);
b.attr(LOGGER_KEY, channelLogger);
b.group(eventLoop);
b.channelFactory(channelFactory);
Expand Down
27 changes: 22 additions & 5 deletions netty/src/main/java/io/grpc/netty/NettyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;

Expand All @@ -37,6 +38,7 @@
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -73,8 +75,10 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final int maxStreamsPerConnection;
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ByteBufAllocator allocator;
private ServerListener listener;
private Channel channel;
private final int flowControlWindow;
Expand All @@ -87,7 +91,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final long maxConnectionAgeGraceInNanos;
private final boolean permitKeepAliveWithoutCalls;
private final long permitKeepAliveTimeInNanos;
private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter();
private final ReferenceCounted sharedResourceReferenceCounter =
new SharedResourceReferenceCounter();
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer.Factory transportTracerFactory;
private final InternalChannelz channelz;
Expand All @@ -100,6 +105,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
Map<ChannelOption<?>, ?> channelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
ObjectPool<? extends ByteBufAllocator> allocatorPool,
ProtocolNegotiator protocolNegotiator,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
Expand All @@ -115,8 +121,10 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool");
this.bossGroup = bossGroupPool.getObject();
this.workerGroup = workerGroupPool.getObject();
this.allocator = allocatorPool.getObject();
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracerFactory = transportTracerFactory;
Expand Down Expand Up @@ -155,6 +163,8 @@ public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");

ServerBootstrap b = new ServerBootstrap();
b.option(ALLOCATOR, allocator);
b.childOption(ALLOCATOR, allocator);
b.group(bossGroup, workerGroup);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
Expand Down Expand Up @@ -210,7 +220,7 @@ public void initChannel(Channel ch) {
}
// `channel` shutdown can race with `ch` initialization, so this is only safe to increment
// inside the lock.
eventLoopReferenceCounter.retain();
sharedResourceReferenceCounter.retain();
transportListener = listener.transportCreated(transport);
}

Expand All @@ -224,7 +234,7 @@ final class LoopReleaser implements ChannelFutureListener {
public void operationComplete(ChannelFuture future) throws Exception {
if (!done) {
done = true;
eventLoopReferenceCounter.release();
sharedResourceReferenceCounter.release();
}
}
}
Expand Down Expand Up @@ -281,7 +291,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
synchronized (NettyServer.this) {
listener.serverShutdown();
}
eventLoopReferenceCounter.release();
sharedResourceReferenceCounter.release();
}
});
try {
Expand All @@ -305,7 +315,7 @@ public String toString() {
.toString();
}

class EventLoopReferenceCounter extends AbstractReferenceCounted {
class SharedResourceReferenceCounter extends AbstractReferenceCounted {
@Override
protected void deallocate() {
try {
Expand All @@ -320,6 +330,13 @@ protected void deallocate() {
}
} finally {
workerGroup = null;
try {
if (allocator != null) {
allocatorPool.returnObject(allocator);
}
} finally {
allocator = null;
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -78,6 +79,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);

private final List<SocketAddress> listenAddresses = new ArrayList<>();

Expand Down Expand Up @@ -534,7 +537,7 @@ protected List<NettyServer> buildTransportServers(
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
Expand Down
38 changes: 38 additions & 0 deletions netty/src/main/java/io/grpc/netty/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders;
import io.grpc.netty.NettySocketSupport.NativeSocketOptions;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
Expand Down Expand Up @@ -83,6 +85,42 @@ class Utils {
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;

public static final Resource<ByteBufAllocator> BYTE_BUF_ALLOCATOR =
new Resource<ByteBufAllocator>() {
@Override
public ByteBufAllocator create() {
if (Boolean.parseBoolean(
System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
// 2MiB, thus reducing the maxOrder to 8.
maxOrder = 8;
} else {
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
}
return new PooledByteBufAllocator(
PooledByteBufAllocator.defaultPreferDirect(),
PooledByteBufAllocator.defaultNumHeapArena(),
PooledByteBufAllocator.defaultNumDirectArena(),
PooledByteBufAllocator.defaultPageSize(),
maxOrder,
PooledByteBufAllocator.defaultTinyCacheSize(),
PooledByteBufAllocator.defaultSmallCacheSize(),
PooledByteBufAllocator.defaultNormalCacheSize(),
PooledByteBufAllocator.defaultUseCacheForAllThreads());
} else {
return ByteBufAllocator.DEFAULT;
}
}

@Override
public void close(ByteBufAllocator allocator) {
// PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC.
}
};

public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;

Expand Down
14 changes: 10 additions & 4 deletions netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@
import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
Expand Down Expand Up @@ -123,6 +126,7 @@ public class NettyClientTransportTest {
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
new LinkedBlockingQueue<>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR);
private final EchoServerListener serverListener = new EchoServerListener();
private final InternalChannelz channelz = new InternalChannelz();
private Runnable tooManyPingsRunnable = new Runnable() {
Expand Down Expand Up @@ -153,6 +157,7 @@ public void teardown() throws Exception {
}

group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator);
}

@Test
Expand Down Expand Up @@ -190,7 +195,7 @@ public void setSoLingerChannelOption() throws IOException {
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker(), new FakeChannelLogger(), false);
Expand Down Expand Up @@ -435,7 +440,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception {
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
new HashMap<ChannelOption<?>, Object>(), group,
new HashMap<ChannelOption<?>, Object>(), group, allocator,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
Expand Down Expand Up @@ -705,7 +710,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
}
NettyClientTransport transport = new NettyClientTransport(
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group, allocator,
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
Expand All @@ -723,7 +728,8 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr
TestUtils.testServerAddress(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
new FixedObjectPool<>(group), new FixedObjectPool<>(group),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
Expand Down