Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: eliminate panics in server worker implementation #6856

Merged
merged 5 commits into from Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 13 additions & 9 deletions server.go
Expand Up @@ -144,7 +144,8 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData

serverWorkerChannel chan func()
serverWorkerChannel chan func()
serverWorkerChannelClose func()
}

type serverOptions struct {
Expand Down Expand Up @@ -623,15 +624,14 @@ func (s *Server) serverWorker() {
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannel = make(chan func())
s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
close(s.serverWorkerChannel)
})
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
}

func (s *Server) stopServerWorkers() {
close(s.serverWorkerChannel)
}

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
Expand Down Expand Up @@ -1898,15 +1898,19 @@ func (s *Server) stop(graceful bool) {
s.closeServerTransportsLocked()
}

if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
}

for len(s.conns) != 0 {
s.cv.Wait()
}
s.conns = nil

if s.opts.numServerWorkers > 0 {
// Closing the channel (only once, via grpcsync.OnceFunc) after all the
// connections have been closed above ensures that there are no
// goroutines executing the callback passed to st.HandleStreams (where
// the channel is written to).
s.serverWorkerChannelClose()
}

if s.events != nil {
s.events.Finish()
s.events = nil
Expand Down
93 changes: 93 additions & 0 deletions server_ext_test.go
Expand Up @@ -21,14 +21,20 @@ package grpc_test
import (
"context"
"io"
"runtime"
"sync"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/status"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

// TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server
Expand Down Expand Up @@ -97,3 +103,90 @@ func (s) TestServer_MaxHandlers(t *testing.T) {
t.Fatal("Received unexpected RPC error:", err)
}
}

// Tests the case where the stream worker goroutine option is enabled, and a
// number of RPCs are initiated around the same time that Stop() is called. This
// used to result in a write to a closed channel. This test verifies that there
// is no panic.
func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
// This deferred stop takes care of stopping the server when one of the
// below grpc.Dials fail, and the test exits early.
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
const numChannels = 20
const numRPCLoops = 20

// Create a bunch of clientconns and ensure that they are READY by making an
// RPC on them.
ccs := make([]*grpc.ClientConn, numChannels)
for i := 0; i < numChannels; i++ {
var err error
ccs[i], err = grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err)
}
defer ccs[i].Close()
client := testgrpc.NewTestServiceClient(ccs[i])
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: delete this WaitForReady as it should not be necessary and could persist a mistaken understanding that it might be necessary. (We used to have a bug waaay back when that would have required it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

t.Fatalf("EmptyCall() failed: %v", err)
}
}

// Make a bunch of concurrent RPCs on the above clientconns. These will
// eventually race with Stop(), and will start to fail.
var wg sync.WaitGroup
for i := 0; i < numChannels; i++ {
client := testgrpc.NewTestServiceClient(ccs[i])
for j := 0; j < numRPCLoops; j++ {
wg.Add(1)
go func(client testgrpc.TestServiceClient) {
defer wg.Done()
for {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably why you're getting the deadline exceeded errors. Just leave these using the base ctx and it should be fine I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh crap, didn't see this comment. But figure out the same anyways :). Could have saved 30m if I had looked earlier, but I think the investigation helped.

_, err := client.EmptyCall(sCtx, &testpb.Empty{})
if err == nil {
continue
}
if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Unavailable {
// Once Stop() has been called on the server, we expect
// subsequent calls to fail with Unavailable or
// DeadlineExceeded, the latter happens when the client
// channel moves to TRANSIENT_FAILURE and will never
// recover from there.
return
}
t.Errorf("EmptyCall() failed: %v", err)
return
}
}(client)
}
}

// Call Stop() concurrently with the above RPC attempts.
ss.Stop()
wg.Wait()
}

// Tests the case where the stream worker goroutine option is enabled, and both
// Stop() and GracefulStop() care called. This used to result in a close of a
// closed channel. This test verifies that there is no panic.
func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
defer ss.Stop()

if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
t.Fatalf("Failed to create client to stub server: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(ss.CC)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

ss.S.GracefulStop()
}
1 change: 1 addition & 0 deletions stream_test.go
Expand Up @@ -32,6 +32,7 @@ import (
)

const defaultTestTimeout = 10 * time.Second
const defaultTestShortTimeout = 10 * time.Millisecond

type s struct {
grpctest.Tester
Expand Down