From d9b3ceeebcb0efd313c2ad39122c6d12e899761f Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 8 Oct 2020 11:17:32 -0700 Subject: [PATCH] [lrs_stream_report] lrs Client --- xds/internal/client/client_loadreport.go | 146 ++++++++++++++--------- 1 file changed, 92 insertions(+), 54 deletions(-) diff --git a/xds/internal/client/client_loadreport.go b/xds/internal/client/client_loadreport.go index a10c89f8241..8c8cf2664f9 100644 --- a/xds/internal/client/client_loadreport.go +++ b/xds/internal/client/client_loadreport.go @@ -19,25 +19,12 @@ package client import ( "context" + "sync" "google.golang.org/grpc" "google.golang.org/grpc/xds/internal/client/load" ) -// NodeMetadataHostnameKey is the metadata key for specifying the target name in -// the node proto of an LRS request. -const NodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME" - -// lrsClient maps to one lrsServer. It contains: -// - a ClientConn to this server (only if it's different from the xds server) -// - a load.Store that contains loads only for this server -type lrsClient struct { - refCount int - cancel func() - cc *grpc.ClientConn // nil if the server is same as the xds server - loadStore *load.Store -} - // ReportLoad starts an load reporting stream to the given server. If the server // is not an empty string, and is different from the xds server, a new // ClientConn will be created. @@ -51,56 +38,107 @@ func (c *Client) ReportLoad(server string) (*load.Store, func()) { c.lrsMu.Lock() defer c.lrsMu.Unlock() - // If there's already a client to this server, use it. - if c, ok := c.lrsClients[server]; ok { - c.refCount++ - return c.loadStore, c.cancel + // If there's already a client to this server, use it. Otherwise, create + // one. + lrsC, ok := c.lrsClients[server] + if !ok { + lrsC = newLRSClient(c, server) + c.lrsClients[server] = lrsC } - // First reporting stream to this server. - var ( - cc *grpc.ClientConn - newCC bool - ) - c.logger.Infof("Starting load report to server: %s", server) - if server == "" || server == c.opts.Config.BalancerName { - cc = c.cc - } else { - c.logger.Infof("LRS server is different from xDS server, starting a new ClientConn") - dopts := append([]grpc.DialOption{c.opts.Config.Creds}, c.opts.DialOpts...) - ccNew, err := grpc.Dial(server, dopts...) - if err != nil { - // An error from a non-blocking dial indicates something serious. - c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err) - return nil, func() {} + store := lrsC.ref() + return store, func() { + c.lrsMu.Lock() + defer c.lrsMu.Unlock() + if lrsC.unRef() { + // Delete the lrsClient from map if this is the last reference. + delete(c.lrsClients, server) } - cc = ccNew - newCC = true } +} - store := load.NewStore() - ctx, cancel := context.WithCancel(context.Background()) - go c.apiClient.ReportLoad(ctx, c.cc, LoadReportingOptions{ - loadStore: store, - }) +// lrsClient maps to one lrsServer. It contains: +// - a ClientConn to this server (only if it's different from the xds server) +// - a load.Store that contains loads only for this server +type lrsClient struct { + parent *Client + server string + + mu sync.Mutex + cc *grpc.ClientConn // nil if the server is same as the xds server + refCount int + cancelStream func() + loadStore *load.Store +} - lrsC := &lrsClient{ - refCount: 1, - cancel: cancel, - loadStore: store, +// newLRSClient creates a new LRS stream to the server. +// +// If the server is different from the xDS server in the parent xDS client, a +// new ClientConn will be created. +func newLRSClient(parent *Client, server string) *lrsClient { + return &lrsClient{ + parent: parent, + server: server, + refCount: 0, } - if newCC { - lrsC.cc = cc +} + +// ref increments the refCount. If this is the first ref, it starts the LRS stream. +func (lrsC *lrsClient) ref() *load.Store { + lrsC.mu.Lock() + defer lrsC.mu.Unlock() + lrsC.refCount++ + if lrsC.refCount == 1 { + lrsC.startStream() } - c.lrsClients[server] = lrsC + return lrsC.loadStore +} - return store, func() { - c.lrsMu.Lock() - defer c.lrsMu.Unlock() - lrsC.cancel() - lrsC.refCount-- - if lrsC.refCount == 0 && lrsC.cc != nil { +// unRef decrements the refCount, and closes the stream if refCount reaches 0 +// (and close the cc if cc is not xDS cc). It returns whether refCount reached 0 +// after this call. +func (lrsC *lrsClient) unRef() (closed bool) { + lrsC.refCount-- + + closed = lrsC.refCount == 0 + if closed { + lrsC.cancelStream() + if lrsC.cc != nil { lrsC.cc.Close() } } + return closed +} + +// startStream starts the LRS stream to the server. If server is not the same +// xDS server from the parent, it also creates a ClientConn. +// +// caller must hold lrsC.mu. +func (lrsC *lrsClient) startStream() { + // First reporting stream to this server. + var cc *grpc.ClientConn + + lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server) + if lrsC.server == "" || lrsC.server == lrsC.parent.opts.Config.BalancerName { + // Reuse the xDS client if server is the same. + cc = lrsC.parent.cc + } else { + lrsC.parent.logger.Infof("LRS server is different from xDS server, starting a new ClientConn") + dopts := append([]grpc.DialOption{lrsC.parent.opts.Config.Creds}, lrsC.parent.opts.DialOpts...) + ccNew, err := grpc.Dial(lrsC.server, dopts...) + if err != nil { + // An error from a non-blocking dial indicates something serious. + lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err) + return + } + cc = ccNew + lrsC.cc = ccNew + } + + var ctx context.Context + ctx, lrsC.cancelStream = context.WithCancel(context.Background()) + + // Create the store and stream. + lrsC.loadStore = load.NewStore() + go lrsC.parent.apiClient.ReportLoad(ctx, cc, LoadReportingOptions{loadStore: lrsC.loadStore}) }