diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 04ddaeaa33a..036160f07ec 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -406,7 +406,6 @@ func (c *controlBuffer) get(block bool) (interface{}, error) { select { case <-c.ch: case <-c.done: - c.finish() return nil, ErrConnClosing } } @@ -431,6 +430,14 @@ func (c *controlBuffer) finish() { hdr.onOrphaned(ErrConnClosing) } } + // In case throttle() is currently in flight, it needs to be unblocked. + // Otherwise, the transport may not close, since the transport is closed by + // the reader encountering the connection error. + ch, _ := c.trfChan.Load().(*chan struct{}) + if ch != nil { + close(*ch) + } + c.trfChan.Store((*chan struct{})(nil)) c.mu.Unlock() } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 78c53ca70f8..119f01e3ebc 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -402,6 +402,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts // Do not close the transport. Let reader goroutine handle it since // there might be data in the buffers. t.conn.Close() + t.controlBuf.finish() close(t.writerDone) }() return t, nil diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 43a17833d86..21a3c852615 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -295,6 +295,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err } } t.conn.Close() + t.controlBuf.finish() close(t.writerDone) }() go t.keepalive() diff --git a/test/end2end_test.go b/test/end2end_test.go index eb91d09afdf..76ff07a27c2 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -71,6 +71,7 @@ import ( "google.golang.org/grpc/stats" "google.golang.org/grpc/status" "google.golang.org/grpc/tap" + "google.golang.org/grpc/test/bufconn" testpb "google.golang.org/grpc/test/grpc_testing" "google.golang.org/grpc/testdata" ) @@ -7524,3 +7525,74 @@ func (s) TestCanceledRPCCallOptionRace(t *testing.T) { } wg.Wait() } + +func (s) TestClientSettingsFloodCloseConn(t *testing.T) { + // Tests that the server properly closes its transport if the client floods + // settings frames and then closes the connection. + + // Minimize buffer sizes to stimulate failure condition more quickly. + s := grpc.NewServer(grpc.WriteBufferSize(20)) + l := bufconn.Listen(20) + go s.Serve(l) + + // Dial our server and handshake. + conn, err := l.Dial() + if err != nil { + t.Fatalf("Error dialing bufconn: %v", err) + } + + n, err := conn.Write([]byte(http2.ClientPreface)) + if err != nil || n != len(http2.ClientPreface) { + t.Fatalf("Error writing client preface: %v, %v", n, err) + } + + fr := http2.NewFramer(conn, conn) + f, err := fr.ReadFrame() + if err != nil { + t.Fatalf("Error reading initial settings frame: %v", err) + } + if _, ok := f.(*http2.SettingsFrame); ok { + if err := fr.WriteSettingsAck(); err != nil { + t.Fatalf("Error writing settings ack: %v", err) + } + } else { + t.Fatalf("Error reading initial settings frame: type=%T", f) + } + + // Confirm settings can be written, and that an ack is read. + if err = fr.WriteSettings(); err != nil { + t.Fatalf("Error writing settings frame: %v", err) + } + if f, err = fr.ReadFrame(); err != nil { + t.Fatalf("Error reading frame: %v", err) + } + if sf, ok := f.(*http2.SettingsFrame); !ok || !sf.IsAck() { + t.Fatalf("Unexpected frame: %v", f) + } + + // Flood settings frames until a timeout occurs, indiciating the server has + // stopped reading from the connection, then close the conn. + for { + conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)) + if err := fr.WriteSettings(); err != nil { + if to, ok := err.(interface{ Timeout() bool }); !ok || !to.Timeout() { + t.Fatalf("Received unexpected write error: %v", err) + } + break + } + } + conn.Close() + + // If the server does not handle this situation correctly, it will never + // close the transport. This is because its loopyWriter.run() will have + // exited, and thus not handle the goAway the draining process initiates. + // Also, we would see a goroutine leak in this case, as the reader would be + // blocked on the controlBuf's throttle() method indefinitely. + + timer := time.AfterFunc(5*time.Second, func() { + t.Errorf("Timeout waiting for GracefulStop to return") + s.Stop() + }) + s.GracefulStop() + timer.Stop() +}