Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lrs: handle multiple clusters in LRS stream #3935

Merged
merged 4 commits into from Oct 26, 2020

Conversation

menghanl
Copy link
Contributor

@menghanl menghanl commented Oct 7, 2020

  • xdsclient.ReportLoad
    • create one LRS stream and one load.Store for each server address
    • to take just the server address, and return the load.Store
    • update EDS and LRS balancing policy to recreate LRS stream only when server address changes
  • LRS stream (v2 and v3)
    • set feature send_all_clusters in node
    • handle resp.Clusters and only send load for those clusters
    • handle resp.SendAllClusters and send load for all clusters

@menghanl menghanl requested a review from easwars October 7, 2020 18:34
@menghanl menghanl force-pushed the lrs_stream_report branch 2 times, most recently from d5de61e to 985f6e5 Compare October 7, 2020 18:44
Copy link
Contributor

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't added much comments. But based on our discussion offline to have the lrsClient contain all LRS functionality, I will wait for that change to be made before adding other comments. Thanks.

xds/internal/client/client.go Outdated Show resolved Hide resolved
xds/internal/client/client_loadreport.go Outdated Show resolved Hide resolved
xds/internal/client/transport_helper.go Outdated Show resolved Hide resolved
@easwars easwars assigned menghanl and unassigned easwars Oct 7, 2020
@menghanl menghanl added this to the 1.33 Release milestone Oct 8, 2020
@menghanl
Copy link
Contributor Author

menghanl commented Oct 8, 2020

lrsClient done. PTAL.

@menghanl menghanl assigned easwars and unassigned menghanl Oct 8, 2020
xds/internal/client/client_loadreport.go Show resolved Hide resolved
xds/internal/client/client_loadreport.go Outdated Show resolved Hide resolved
xds/internal/client/client_loadreport.go Show resolved Hide resolved
xds/internal/client/client_loadreport.go Outdated Show resolved Hide resolved
if err != nil {
// An error from a non-blocking dial indicates something serious.
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return func() {}
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do something better than just throwing a log when the Dial fails. This seems to be irrecoverable at this point, because even if there is another call to ReportLoad we wont retry the Dial since the refCount will be non-zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just panic... If this non-blocking dial fails, there's no way the LRS will keep working...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ... but we are only throwing an Info log here. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to add panic...

What I meant was, this should never happen (for real????). But we still Info just in case..

xds/internal/client/client.go Show resolved Hide resolved
xds/internal/client/transport_helper.go Outdated Show resolved Hide resolved
xds/internal/client/client_loadreport_test.go Show resolved Hide resolved
xds/internal/client/client_loadreport_test.go Outdated Show resolved Hide resolved
xds/internal/client/client_loadreport_test.go Outdated Show resolved Hide resolved
@easwars easwars assigned menghanl and unassigned easwars Oct 9, 2020
Copy link
Contributor Author

@menghanl menghanl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. All fixed. PTAL.

xds/internal/client/client_loadreport.go Show resolved Hide resolved
xds/internal/client/client_loadreport.go Outdated Show resolved Hide resolved
loadWrapper *loadStoreWrapper
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's
// returned by the client.
loadOriginal *load.Store
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startLoadReport is only called when the LRS server changes.

We keep this around for cases where only the EDS service name changes (so we don't restart LRS, but we get a new PerClusterReport with the new service name).

xds/internal/balancer/lrs/balancer.go Outdated Show resolved Hide resolved
xds/internal/client/transport_helper.go Outdated Show resolved Hide resolved
xds/internal/balancer/edsbalancer/xds_client_wrapper.go Outdated Show resolved Hide resolved
xds/internal/balancer/lrs/balancer.go Outdated Show resolved Hide resolved
if err != nil {
// An error from a non-blocking dial indicates something serious.
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return func() {}
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just panic... If this non-blocking dial fails, there's no way the LRS will keep working...

xds/internal/client/client_loadreport_test.go Outdated Show resolved Hide resolved
@menghanl menghanl assigned easwars and unassigned menghanl Oct 9, 2020
@menghanl

This comment has been minimized.

loadWrapper *loadStoreWrapper
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's
// returned by the client.
loadOriginal *load.Store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could instead cache the load.Store returned by the call to xdsclient.ReportLoad() in the loadWrapper itself. And split the update() method into two: setStore() and setServiceName() which will called appropriately when handling the update. What do you think about this approach?

}
}

if updateLoadStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a copy-paste gotcha. You have the same conditional statement twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole if statement is deleted.

}

if attr == nil {
return fmt.Errorf("failed to get xdsClient from attributes: attributes is nil")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these errors be prefixed with xds: or lrs:. I'm a little confused about the policy for these error message. I guess we dont have to do that for log statements since the prefix logger takes care of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added lrs:.

The caller (which is the implementation of the balancer interface) could wrap. But that will be only to add the prefix, and seems to be not worth it. I added it here.

if err != nil {
// An error from a non-blocking dial indicates something serious.
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return func() {}
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ... but we are only throwing an Info log here. Am I missing something?

// Report to the same address should not create new ClientConn.
store1, lrsCancel1 := xdsC.ReportLoad(fs.Address)
defer lrsCancel1()
ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please create a different context here, so that we can use the context with the defaultTestTimeout deadline for other things which need it.

	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
	defer sCancel()
	if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Report to a different address should create new ClientConn.
store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address)
defer lrsCancel2()
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do the above, we wont need to reinitialize the context with the defaultTestTimeout here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ClusterServiceName: "eds",
TotalDroppedRequests: 1,
DroppedRequests: []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}},
}); !proto.Equal(want, receivedLoad[0]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use protocmp.Diff or whatever is the equivalent in the protobuf package and pass it to cmp.Diff so that we can print a useful error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -115,7 +115,7 @@ func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName stri
c.removeWatches[resourceType].Send(resourceName)
}

func (c *testAPIClient) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) {
func (c *testAPIClient) reportLoad(_ context.Context, _ *grpc.ClientConn, _ loadReportingOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underscores could be removed I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@easwars easwars assigned menghanl and unassigned easwars Oct 15, 2020
@menghanl menghanl force-pushed the lrs_stream_report branch 2 times, most recently from dec9231 to 13f8352 Compare October 20, 2020 21:26
@menghanl menghanl assigned easwars and unassigned menghanl Oct 20, 2020
@@ -177,7 +177,9 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
return
}

x.client.handleUpdate(cfg, u.ResolverState.Attributes)
if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil {
x.logger.Infof("failed to update xds clients: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errorf or Warningf instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to warning

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The perClusterStore type in package load already checks for nil. Why is this required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot.
Deleted.

}

var dopts []grpc.DialOption
if dialer := c.bbo.Dialer; dialer != nil {
dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
}

// TODO: there's no long a need to read bootstrap file and create a new xds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/long/longer/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about nil checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@easwars easwars assigned menghanl and unassigned easwars Oct 21, 2020
- xdsclient.ReportLoad
  - create one LRS stream and one load.Store for each server address
  - to take just the server address, and return the load.Store
  - update EDS and LRS balancing policy to recreate LRS stream only when server address changes
- LRS stream (v2 and v3)
  - set feature `send_all_clusters` in `node`
  - handle `resp.Clusters` and only send load for those clusters
  - handle `resp.SendAllClusters` and send load for all clusters
@menghanl menghanl assigned menghanl and unassigned easwars Oct 22, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants