From 8c346d00e573c4247427e64be0ce47fe9f75ac84 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Thu, 16 Jan 2020 11:53:37 -0800 Subject: [PATCH] netty: fix a race for channelz at server transport creation A race condition was reported by user in #6601: `ServerImpl.start()` calls `NettyServer.start()` while holding `ServerImpl.lock`. `NettyServer.start()` awaits a submitted runnable in eventloop. However, this pending runnable may never be executed because the eventloop might be executing some other task, like `ServerListenerImpl.transportCreated()`, that is trying to acquire `ServerImpl.lock` causing a deadlock. This PR resolves the particular issue reported in #6601 for server with a single port, but `NettyServer` (https://github.com/grpc/grpc-java/blob/v1.26.0/netty/src/main/java/io/grpc/netty/NettyServer.java#L251) and `ServerImpl` (https://github.com/grpc/grpc-java/blob/v1.26.0/core/src/main/java/io/grpc/internal/ServerImpl.java#L184) in general still have the same potential risk of deadlock, which need further fix. --- .../main/java/io/grpc/netty/NettyServer.java | 23 ++++------ .../java/io/grpc/netty/NettyServerTest.java | 46 +++++++++++++------ 2 files changed, 40 insertions(+), 29 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 5c400198817..478687b6abc 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -56,7 +56,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -93,9 +92,8 @@ class NettyServer implements InternalServer, InternalWithLogId { private final List streamTracerFactories; private final TransportTracer.Factory transportTracerFactory; private final InternalChannelz channelz; - // Only modified in event loop but safe to read any time. Set at startup and unset at shutdown. - private final AtomicReference> listenSocketStats = - new AtomicReference<>(); + // Only modified in event loop but safe to read any time. + private volatile InternalInstrumented listenSocketStats; NettyServer( SocketAddress address, ChannelFactory channelFactory, @@ -149,7 +147,7 @@ public SocketAddress getListenSocketAddress() { @Override public InternalInstrumented getListenSocketStats() { - return listenSocketStats.get(); + return listenSocketStats; } @Override @@ -251,19 +249,13 @@ public void operationComplete(ChannelFuture future) throws Exception { throw new IOException("Failed to bind", future.cause()); } channel = future.channel(); - Future channelzFuture = channel.eventLoop().submit(new Runnable() { + channel.eventLoop().execute(new Runnable() { @Override public void run() { - InternalInstrumented listenSocket = new ListenSocket(channel); - listenSocketStats.set(listenSocket); - channelz.addListenSocket(listenSocket); + listenSocketStats = new ListenSocket(channel); + channelz.addListenSocket(listenSocketStats); } }); - try { - channelzFuture.await(); - } catch (InterruptedException ex) { - throw new RuntimeException("Interrupted while registering listen socket to channelz", ex); - } } @Override @@ -278,7 +270,8 @@ public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { log.log(Level.WARNING, "Error shutting down server", future.cause()); } - InternalInstrumented stats = listenSocketStats.getAndSet(null); + InternalInstrumented stats = listenSocketStats; + listenSocketStats = null; if (stats != null) { channelz.removeListenSocket(stats); } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index b873785e4ff..141cb9972b9 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -29,16 +29,19 @@ import io.grpc.InternalInstrumented; import io.grpc.Metadata; import io.grpc.ServerStreamTracer; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; -import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; +import io.netty.channel.ReflectiveChannelFactory; import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.AsciiString; import java.net.InetSocketAddress; import java.net.Socket; @@ -48,6 +51,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -55,6 +59,13 @@ @RunWith(JUnit4.class) public class NettyServerTest { private final InternalChannelz channelz = new InternalChannelz(); + private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(1); + + @After + public void tearDown() throws Exception { + eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS); + eventLoop.awaitTermination(5, TimeUnit.SECONDS); + } @Test public void startStop() throws Exception { @@ -79,10 +90,10 @@ class TestProtocolNegotiator implements ProtocolNegotiator { TestProtocolNegotiator protocolNegotiator = new TestProtocolNegotiator(); NettyServer ns = new NettyServer( addr, - Utils.DEFAULT_SERVER_CHANNEL_FACTORY, + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), - SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), - SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + new FixedObjectPool<>(eventLoop), + new FixedObjectPool<>(eventLoop), protocolNegotiator, Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -119,14 +130,14 @@ public void serverShutdown() { } @Test - public void getPort_notStarted() throws Exception { + public void getPort_notStarted() { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - Utils.DEFAULT_SERVER_CHANNEL_FACTORY, + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), - SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), - SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + new FixedObjectPool<>(eventLoop), + new FixedObjectPool<>(eventLoop), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -161,10 +172,10 @@ public void childChannelOptions() throws Exception { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - Utils.DEFAULT_SERVER_CHANNEL_FACTORY, + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), channelOptions, - SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), - SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + new FixedObjectPool<>(eventLoop), + new FixedObjectPool<>(eventLoop), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -211,10 +222,10 @@ public void channelzListenSocket() throws Exception { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - Utils.DEFAULT_SERVER_CHANNEL_FACTORY, + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), - SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), - SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + new FixedObjectPool<>(eventLoop), + new FixedObjectPool<>(eventLoop), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -239,8 +250,15 @@ public void serverShutdown() { shutdownCompleted.set(null); } }); + assertThat(((InetSocketAddress) ns.getListenSocketAddress()).getPort()).isGreaterThan(0); + // SocketStats won't be available until the event loop task of adding SocketStats created by + // ns.start() complete. So submit a noop task and await until it's drained. + eventLoop.submit(new Runnable() { + @Override + public void run() {} + }).await(5, TimeUnit.SECONDS); InternalInstrumented listenSocket = ns.getListenSocketStats(); assertSame(listenSocket, channelz.getSocket(id(listenSocket)));