Skip to content

Commit

Permalink
[lrs_stream_report] lrs Client
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 8, 2020
1 parent 9870b33 commit d9b3cee
Showing 1 changed file with 92 additions and 54 deletions.
146 changes: 92 additions & 54 deletions xds/internal/client/client_loadreport.go
Expand Up @@ -19,25 +19,12 @@ package client

import (
"context"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/client/load"
)

// NodeMetadataHostnameKey is the metadata key for specifying the target name in
// the node proto of an LRS request.
const NodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME"

// lrsClient maps to one lrsServer. It contains:
// - a ClientConn to this server (only if it's different from the xds server)
// - a load.Store that contains loads only for this server
type lrsClient struct {
refCount int
cancel func()
cc *grpc.ClientConn // nil if the server is same as the xds server
loadStore *load.Store
}

// ReportLoad starts an load reporting stream to the given server. If the server
// is not an empty string, and is different from the xds server, a new
// ClientConn will be created.
Expand All @@ -51,56 +38,107 @@ func (c *Client) ReportLoad(server string) (*load.Store, func()) {
c.lrsMu.Lock()
defer c.lrsMu.Unlock()

// If there's already a client to this server, use it.
if c, ok := c.lrsClients[server]; ok {
c.refCount++
return c.loadStore, c.cancel
// If there's already a client to this server, use it. Otherwise, create
// one.
lrsC, ok := c.lrsClients[server]
if !ok {
lrsC = newLRSClient(c, server)
c.lrsClients[server] = lrsC
}

// First reporting stream to this server.
var (
cc *grpc.ClientConn
newCC bool
)
c.logger.Infof("Starting load report to server: %s", server)
if server == "" || server == c.opts.Config.BalancerName {
cc = c.cc
} else {
c.logger.Infof("LRS server is different from xDS server, starting a new ClientConn")
dopts := append([]grpc.DialOption{c.opts.Config.Creds}, c.opts.DialOpts...)
ccNew, err := grpc.Dial(server, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return nil, func() {}
store := lrsC.ref()
return store, func() {
c.lrsMu.Lock()
defer c.lrsMu.Unlock()
if lrsC.unRef() {
// Delete the lrsClient from map if this is the last reference.
delete(c.lrsClients, server)
}
cc = ccNew
newCC = true
}
}

store := load.NewStore()
ctx, cancel := context.WithCancel(context.Background())
go c.apiClient.ReportLoad(ctx, c.cc, LoadReportingOptions{
loadStore: store,
})
// lrsClient maps to one lrsServer. It contains:
// - a ClientConn to this server (only if it's different from the xds server)
// - a load.Store that contains loads only for this server
type lrsClient struct {
parent *Client
server string

mu sync.Mutex
cc *grpc.ClientConn // nil if the server is same as the xds server
refCount int
cancelStream func()
loadStore *load.Store
}

lrsC := &lrsClient{
refCount: 1,
cancel: cancel,
loadStore: store,
// newLRSClient creates a new LRS stream to the server.
//
// If the server is different from the xDS server in the parent xDS client, a
// new ClientConn will be created.
func newLRSClient(parent *Client, server string) *lrsClient {
return &lrsClient{
parent: parent,
server: server,
refCount: 0,
}
if newCC {
lrsC.cc = cc
}

// ref increments the refCount. If this is the first ref, it starts the LRS stream.
func (lrsC *lrsClient) ref() *load.Store {
lrsC.mu.Lock()
defer lrsC.mu.Unlock()
lrsC.refCount++
if lrsC.refCount == 1 {
lrsC.startStream()
}
c.lrsClients[server] = lrsC
return lrsC.loadStore
}

return store, func() {
c.lrsMu.Lock()
defer c.lrsMu.Unlock()
lrsC.cancel()
lrsC.refCount--
if lrsC.refCount == 0 && lrsC.cc != nil {
// unRef decrements the refCount, and closes the stream if refCount reaches 0
// (and close the cc if cc is not xDS cc). It returns whether refCount reached 0
// after this call.
func (lrsC *lrsClient) unRef() (closed bool) {
lrsC.refCount--

closed = lrsC.refCount == 0
if closed {
lrsC.cancelStream()
if lrsC.cc != nil {
lrsC.cc.Close()
}
}
return closed
}

// startStream starts the LRS stream to the server. If server is not the same
// xDS server from the parent, it also creates a ClientConn.
//
// caller must hold lrsC.mu.
func (lrsC *lrsClient) startStream() {
// First reporting stream to this server.
var cc *grpc.ClientConn

lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server)
if lrsC.server == "" || lrsC.server == lrsC.parent.opts.Config.BalancerName {
// Reuse the xDS client if server is the same.
cc = lrsC.parent.cc
} else {
lrsC.parent.logger.Infof("LRS server is different from xDS server, starting a new ClientConn")
dopts := append([]grpc.DialOption{lrsC.parent.opts.Config.Creds}, lrsC.parent.opts.DialOpts...)
ccNew, err := grpc.Dial(lrsC.server, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
return
}
cc = ccNew
lrsC.cc = ccNew
}

var ctx context.Context
ctx, lrsC.cancelStream = context.WithCancel(context.Background())

// Create the store and stream.
lrsC.loadStore = load.NewStore()
go lrsC.parent.apiClient.ReportLoad(ctx, cc, LoadReportingOptions{loadStore: lrsC.loadStore})
}

0 comments on commit d9b3cee

Please sign in to comment.