From 07ec1d69e8fce0022a302ec53236a6349101aa7e Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 11 Feb 2022 14:14:23 -0800 Subject: [PATCH] comments etc --- balancer/grpclb/grpclb_remote_balancer.go | 5 +- channelz/channelz.go | 3 +- clientconn.go | 5 +- internal/channelz/id.go | 13 ++++ internal/channelz/logging.go | 16 +++-- internal/transport/http2_client.go | 15 ++-- internal/transport/http2_server.go | 15 ++-- server.go | 84 ++++++++++------------- 8 files changed, 72 insertions(+), 84 deletions(-) diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 805bbbb789a..dab1959418e 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -35,7 +35,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/backoff" - "google.golang.org/grpc/internal/channelz" imetadata "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -240,9 +239,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() { // Explicitly set pickfirst as the balancer. dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`)) dopts = append(dopts, grpc.WithResolvers(lb.manualResolver)) - if channelz.IsOn() { - dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID)) - } + dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID)) // Enable Keepalive for grpclb client. dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ diff --git a/channelz/channelz.go b/channelz/channelz.go index c1fb66932ca..a220c47c59a 100644 --- a/channelz/channelz.go +++ b/channelz/channelz.go @@ -31,5 +31,6 @@ package channelz import "google.golang.org/grpc/internal/channelz" -// TODO(easwars): Add comment. +// Identifier is an opaque identifier which uniquely identifies an entity in the +// channelz database. type Identifier = channelz.Identifier diff --git a/clientconn.go b/clientconn.go index 7961970d56a..f4420e7c3cb 100644 --- a/clientconn.go +++ b/clientconn.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" @@ -1320,7 +1321,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose) if err != nil { // newTr is either nil, or closed. - channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err) + channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", pretty.ToJSON(addr), err) return err } @@ -1333,7 +1334,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne newTr.Close(transport.ErrConnClosing) if connectCtx.Err() == context.DeadlineExceeded { err := errors.New("failed to receive server preface within timeout") - channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err) + channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s: %v", pretty.ToJSON(addr), err) return err } return nil diff --git a/internal/channelz/id.go b/internal/channelz/id.go index b4e12cf8043..286a8dff429 100644 --- a/internal/channelz/id.go +++ b/internal/channelz/id.go @@ -20,6 +20,8 @@ package channelz import "fmt" +// Identifier is an opaque identifier which uniquely identifies an entity in the +// channelz database. type Identifier struct { typ RefChannelType id int64 @@ -27,6 +29,7 @@ type Identifier struct { pid *Identifier } +// Type returns the entity type corresponding to id. func (id *Identifier) Type() RefChannelType { if id == nil { return RefUnknown @@ -34,6 +37,7 @@ func (id *Identifier) Type() RefChannelType { return id.typ } +// Int returns the integer identifier corresponding to id. func (id *Identifier) Int() int64 { if id == nil { return 0 @@ -41,6 +45,12 @@ func (id *Identifier) Int() int64 { return id.id } +// String returns a string representation of the entity corresponding to id. + +// This includes some information about the parent as well. Examples: +// Top-level channel: [Channel #channel-number] +// Nested channel: [Channel #parent-channel-number Channel #channel-number] +// Sub channel: [Channel #parent-channel SubChannel #subchannel-number] func (id *Identifier) String() string { if id == nil { return "" @@ -48,6 +58,7 @@ func (id *Identifier) String() string { return id.str } +// Equal returns true if other is the same as id. func (id *Identifier) Equal(other *Identifier) bool { if (id != nil) != (other != nil) { return false @@ -58,6 +69,8 @@ func (id *Identifier) Equal(other *Identifier) bool { return id.typ == other.typ && id.id == other.id && id.pid == other.pid } +// NewIdentifierForTesting returns a new opaque identifier to be used only for +// testing purposes. func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier { return newIdentifer(typ, id, pid) } diff --git a/internal/channelz/logging.go b/internal/channelz/logging.go index f06e7cd9fb8..028bc8ac4dc 100644 --- a/internal/channelz/logging.go +++ b/internal/channelz/logging.go @@ -26,9 +26,13 @@ import ( var logger = grpclog.Component("channelz") +func withParens(id *Identifier) string { + return "[" + id.String() + "] " +} + // Info logs and adds a trace event if channelz is on. func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { - msg := "[" + id.String() + "] " + fmt.Sprint(args...) + msg := withParens(id) + fmt.Sprint(args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, @@ -41,7 +45,7 @@ func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { // Infof logs and adds a trace event if channelz is on. func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { - msg := "[" + id.String() + "] " + fmt.Sprintf(format, args...) + msg := withParens(id) + fmt.Sprintf(format, args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, @@ -54,7 +58,7 @@ func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...inter // Warning logs and adds a trace event if channelz is on. func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { - msg := "[" + id.String() + "] " + fmt.Sprint(args...) + msg := withParens(id) + fmt.Sprint(args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, @@ -67,7 +71,7 @@ func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { // Warningf logs and adds a trace event if channelz is on. func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { - msg := "[" + id.String() + "] " + fmt.Sprintf(format, args...) + msg := withParens(id) + fmt.Sprintf(format, args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, @@ -80,7 +84,7 @@ func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...in // Error logs and adds a trace event if channelz is on. func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { - msg := "[" + id.String() + "] " + fmt.Sprint(args...) + msg := withParens(id) + fmt.Sprint(args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, @@ -93,7 +97,7 @@ func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { // Errorf logs and adds a trace event if channelz is on. func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { - msg := "[" + id.String() + "] " + fmt.Sprintf(format, args...) + msg := withParens(id) + fmt.Sprintf(format, args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 6221fb0c22a..38ed3d566ff 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -351,14 +351,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts } t.statsHandler.HandleConn(t.ctx, connBegin) } - if channelz.IsOn() { - var err error - t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) - if err != nil { - // TODO(easwars): See if you need to return a more meaningful error. - // TODO(easwars): Also, check if the transport needs to be closed before returning. - return nil, err - } + t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) + if err != nil { + return nil, err } if t.keepaliveEnabled { t.kpDormancyCond = sync.NewCond(&t.mu) @@ -904,9 +899,7 @@ func (t *http2Client) Close(err error) { t.controlBuf.finish() t.cancel() t.conn.Close() - if channelz.IsOn() { - channelz.RemoveEntry(t.channelzID) - } + channelz.RemoveEntry(t.channelzID) // Append info about previous goaways if there were any, since this may be important // for understanding the root cause for this connection to be closed. _, goAwayDebugMessage := t.GetGoAwayReason() diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 18280e62e3f..227608c7f21 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -275,14 +275,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, connBegin := &stats.ConnBegin{} t.stats.HandleConn(t.ctx, connBegin) } - if channelz.IsOn() { - var err error - t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) - if err != nil { - // TODO(easwars): See if you need to return a more meaningful error. - // TODO(easwars): Also, check if the transport needs to be closed before returning. - return nil, err - } + t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) + if err != nil { + return nil, err } t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1) @@ -1215,9 +1210,7 @@ func (t *http2Server) Close() { if err := t.conn.Close(); err != nil && logger.V(logLevel) { logger.Infof("transport: error closing conn during Close: %v", err) } - if channelz.IsOn() { - channelz.RemoveEntry(t.channelzID) - } + channelz.RemoveEntry(t.channelzID) // Cancel all active streams. for _, s := range streams { s.cancel() diff --git a/server.go b/server.go index 99e7c44fbb9..db60cb3251f 100644 --- a/server.go +++ b/server.go @@ -584,9 +584,7 @@ func NewServer(opt ...ServerOption) *Server { s.initServerWorkers() } - if channelz.IsOn() { - s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") - } + s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") return s } @@ -724,9 +722,7 @@ func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { func (l *listenSocket) Close() error { err := l.Listener.Close() - if channelz.IsOn() { - channelz.RemoveEntry(l.channelzID) - } + channelz.RemoveEntry(l.channelzID) return err } @@ -737,52 +733,50 @@ func (l *listenSocket) Close() error { // this method returns. // Serve will return a non-nil error unless Stop or GracefulStop is called. func (s *Server) Serve(lis net.Listener) error { - s.mu.Lock() - s.printf("serving") - s.serve = true - if s.lis == nil { - // Serve called after Stop or GracefulStop. - s.mu.Unlock() - lis.Close() - return ErrServerStopped - } + if err := func() error { // Anonymous func to be able to defer the unlock. + s.mu.Lock() + defer s.mu.Unlock() - s.serveWG.Add(1) - defer func() { - s.serveWG.Done() - if s.quit.HasFired() { - // Stop or GracefulStop called; block until done and return nil. - <-s.done.Done() + s.printf("serving") + s.serve = true + if s.lis == nil { + lis.Close() + return ErrServerStopped } - }() - ls := &listenSocket{Listener: lis} - s.lis[ls] = true + s.serveWG.Add(1) + defer func() { + s.serveWG.Done() + if s.quit.HasFired() { + // Stop or GracefulStop called; block until done and return nil. + <-s.done.Done() + } + }() + + ls := &listenSocket{Listener: lis} + s.lis[ls] = true - if channelz.IsOn() { var err error ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) if err != nil { - // TODO(easwars): See if you can defer this unlock somehow. - s.mu.Unlock() lis.Close() - // TODO(easwars): return a more meaningful error. return err } - } - s.mu.Unlock() - defer func() { - s.mu.Lock() - if s.lis != nil && s.lis[ls] { - ls.Close() - delete(s.lis, ls) - } - s.mu.Unlock() - }() + defer func() { + s.mu.Lock() + if s.lis != nil && s.lis[ls] { + ls.Close() + delete(s.lis, ls) + } + s.mu.Unlock() + }() + return nil + }(); err != nil { + return err + } var tempDelay time.Duration // how long to sleep on accept failure - for { rawConn, err := lis.Accept() if err != nil { @@ -1717,11 +1711,7 @@ func (s *Server) Stop() { s.done.Fire() }() - s.channelzRemoveOnce.Do(func() { - if channelz.IsOn() { - channelz.RemoveEntry(s.channelzID) - } - }) + s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) }) s.mu.Lock() listeners := s.lis @@ -1759,11 +1749,7 @@ func (s *Server) GracefulStop() { s.quit.Fire() defer s.done.Fire() - s.channelzRemoveOnce.Do(func() { - if channelz.IsOn() { - channelz.RemoveEntry(s.channelzID) - } - }) + s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) }) s.mu.Lock() if s.conns == nil { s.mu.Unlock()