-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Client load report #1200
Client load report #1200
Changes from all commits
88b75a2
81b2295
3c582a8
a69c60d
fb5afc8
9f8a106
1fc4fc1
2826455
1a58b41
679d786
db44e79
3998879
2e68c27
fa6f260
f13617e
b54242e
23ddd82
4402ebf
18c603c
c8d2d9d
f41c688
a8465ff
d868b5d
6767d6c
61f49e9
9ed4b46
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -145,6 +145,8 @@ type balancer struct { | |
done bool | ||
expTimer *time.Timer | ||
rand *rand.Rand | ||
|
||
clientStats lbpb.ClientStats | ||
} | ||
|
||
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error { | ||
|
@@ -281,6 +283,34 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { | |
return | ||
} | ||
|
||
func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) { | ||
ticker := time.NewTicker(interval) | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ticker.C: | ||
case <-done: | ||
return | ||
} | ||
b.mu.Lock() | ||
stats := b.clientStats | ||
b.clientStats = lbpb.ClientStats{} // Clear the stats. | ||
b.mu.Unlock() | ||
t := time.Now() | ||
stats.Timestamp = &lbpb.Timestamp{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI: there's a helper in ptypes for setting Timestamp messages. It would probably be best to use that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The timestamp here is a copy of the ptypes Timestamp... |
||
Seconds: t.Unix(), | ||
Nanos: int32(t.Nanosecond()), | ||
} | ||
if err := s.Send(&lbpb.LoadBalanceRequest{ | ||
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{ | ||
ClientStats: &stats, | ||
}, | ||
}); err != nil { | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
@@ -322,6 +352,14 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b | |
grpclog.Println("TODO: Delegation is not supported yet.") | ||
return | ||
} | ||
streamDone := make(chan struct{}) | ||
defer close(streamDone) | ||
b.mu.Lock() | ||
b.clientStats = lbpb.ClientStats{} // Clear client stats. | ||
b.mu.Unlock() | ||
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { | ||
go b.sendLoadReport(stream, d, streamDone) | ||
} | ||
// Retrieve the server list. | ||
for { | ||
reply, err := stream.Recv() | ||
|
@@ -538,7 +576,32 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
err = ErrClientConnClosing | ||
return | ||
} | ||
seq := b.seq | ||
|
||
defer func() { | ||
if err != nil { | ||
return | ||
} | ||
put = func() { | ||
s, ok := rpcInfoFromContext(ctx) | ||
if !ok { | ||
return | ||
} | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
if b.done || seq < b.seq { | ||
return | ||
} | ||
b.clientStats.NumCallsFinished++ | ||
if !s.bytesSent { | ||
b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
} else if s.bytesReceived { | ||
b.clientStats.NumCallsFinishedKnownReceived++ | ||
} | ||
} | ||
}() | ||
|
||
b.clientStats.NumCallsStarted++ | ||
if len(b.addrs) > 0 { | ||
if b.next >= len(b.addrs) { | ||
b.next = 0 | ||
|
@@ -556,6 +619,13 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
} | ||
if !opts.BlockingWait { | ||
b.next = next | ||
if a.dropForLoadBalancing { | ||
b.clientStats.NumCallsFinished++ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optional: in the defer, if err != nil, do this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, these two values will not be protected by the same pair of lock/unlock. |
||
b.clientStats.NumCallsFinishedWithDropForLoadBalancing++ | ||
} else if a.dropForRateLimiting { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithDropForRateLimiting++ | ||
} | ||
b.mu.Unlock() | ||
err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr) | ||
return | ||
|
@@ -569,6 +639,8 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
} | ||
if !opts.BlockingWait { | ||
if len(b.addrs) == 0 { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
b.mu.Unlock() | ||
err = Errorf(codes.Unavailable, "there is no address available") | ||
return | ||
|
@@ -590,11 +662,17 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
for { | ||
select { | ||
case <-ctx.Done(): | ||
b.mu.Lock() | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
b.mu.Unlock() | ||
err = ctx.Err() | ||
return | ||
case <-ch: | ||
b.mu.Lock() | ||
if b.done { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
b.mu.Unlock() | ||
err = ErrClientConnClosing | ||
return | ||
|
@@ -617,6 +695,13 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
} | ||
if !opts.BlockingWait { | ||
b.next = next | ||
if a.dropForLoadBalancing { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithDropForLoadBalancing++ | ||
} else if a.dropForRateLimiting { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithDropForRateLimiting++ | ||
} | ||
b.mu.Unlock() | ||
err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr) | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes an unnecessary copy. You could instead use b.clientStats directly and clear it after calling Send.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b.clientStats is protected by the mutex. I don't want to put the rpc call between the lock/unlock.