-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Client load report #1200
Client load report #1200
Changes from 24 commits
88b75a2
81b2295
3c582a8
a69c60d
fb5afc8
9f8a106
1fc4fc1
2826455
1a58b41
679d786
db44e79
3998879
2e68c27
fa6f260
f13617e
b54242e
23ddd82
4402ebf
18c603c
c8d2d9d
f41c688
a8465ff
d868b5d
6767d6c
61f49e9
9ed4b46
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,11 +93,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran | |
} | ||
|
||
// sendRequest writes out various information of an RPC such as Context and Message. | ||
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) { | ||
stream, err := t.NewStream(ctx, callHdr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) { | ||
defer func() { | ||
if err != nil { | ||
// If err is connection error, t will be closed, no need to close stream here. | ||
|
@@ -120,7 +116,7 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, | |
} | ||
outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload) | ||
if err != nil { | ||
return nil, Errorf(codes.Internal, "grpc: %v", err) | ||
return Errorf(codes.Internal, "grpc: %v", err) | ||
} | ||
err = t.Write(stream, outBuf, opts) | ||
if err == nil && outPayload != nil { | ||
|
@@ -131,10 +127,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, | |
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following | ||
// recvResponse to get the final status. | ||
if err != nil && err != io.EOF { | ||
return nil, err | ||
return err | ||
} | ||
// Sent successfully. | ||
return stream, nil | ||
return nil | ||
} | ||
|
||
// Invoke sends the RPC request on the wire and returns after response is received. | ||
|
@@ -183,6 +179,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
} | ||
}() | ||
} | ||
ctx = newContextWithRPCStats(ctx) | ||
sh := cc.dopts.copts.StatsHandler | ||
if sh != nil { | ||
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method}) | ||
|
@@ -246,9 +243,33 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
if c.traceInfo.tr != nil { | ||
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) | ||
} | ||
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts) | ||
stream, err = t.NewStream(ctx, callHdr) | ||
if err != nil { | ||
if _, ok := err.(transport.ConnectionError); ok && put != nil { | ||
// If error is connection error, transport was sending data on wire, | ||
// and we are not sure if anything has been sent on wire. | ||
// If error is not connection error, we are sure nothing has been sent. | ||
updateRPCStatsInContext(ctx, rpcStats{bytesSent: true, bytesReceived: false}) | ||
} | ||
if put != nil { | ||
put() | ||
put = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why nil out put? Unless I'm reading it wrong, it's local to this loop, and we either return or continue after this. (Same below.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. |
||
} | ||
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about: if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
}
return toRPCErr(err) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
if c.failFast { | ||
return toRPCErr(err) | ||
} | ||
continue | ||
} | ||
return toRPCErr(err) | ||
} | ||
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, stream, t, args, topts) | ||
if err != nil { | ||
if put != nil { | ||
updateRPCStatsInContext(ctx, rpcStats{ | ||
bytesSent: stream.BytesSent(), | ||
bytesReceived: stream.BytesReceived(), | ||
}) | ||
put() | ||
put = nil | ||
} | ||
|
@@ -266,6 +287,10 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply) | ||
if err != nil { | ||
if put != nil { | ||
updateRPCStatsInContext(ctx, rpcStats{ | ||
bytesSent: stream.BytesSent(), | ||
bytesReceived: stream.BytesReceived(), | ||
}) | ||
put() | ||
put = nil | ||
} | ||
|
@@ -277,11 +302,19 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
} | ||
return toRPCErr(err) | ||
} | ||
updateRPCStatsInContext(ctx, rpcStats{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this update here? Presumably nobody will care about the stats until put is called. (It's also not guarded by a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. |
||
bytesSent: stream.BytesSent(), | ||
bytesReceived: stream.BytesReceived(), | ||
}) | ||
if c.traceInfo.tr != nil { | ||
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true) | ||
} | ||
t.CloseStream(stream, nil) | ||
if put != nil { | ||
updateRPCStatsInContext(ctx, rpcStats{ | ||
bytesSent: stream.BytesSent(), | ||
bytesReceived: stream.BytesReceived(), | ||
}) | ||
put() | ||
put = nil | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -145,6 +145,8 @@ type balancer struct { | |
done bool | ||
expTimer *time.Timer | ||
rand *rand.Rand | ||
|
||
clientStats lbpb.ClientStats | ||
} | ||
|
||
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error { | ||
|
@@ -281,6 +283,34 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { | |
return | ||
} | ||
|
||
func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) { | ||
ticker := time.NewTicker(interval) | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ticker.C: | ||
case <-done: | ||
return | ||
} | ||
b.mu.Lock() | ||
stats := b.clientStats | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes an unnecessary copy. You could instead use b.clientStats directly and clear it after calling Send. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. b.clientStats is protected by the mutex. I don't want to put the rpc call between the lock/unlock. |
||
b.clientStats = lbpb.ClientStats{} // Clear the stats. | ||
b.mu.Unlock() | ||
t := time.Now() | ||
stats.Timestamp = &lbpb.Timestamp{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI: there's a helper in ptypes for setting Timestamp messages. It would probably be best to use that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The timestamp here is a copy of the ptypes Timestamp... |
||
Seconds: t.Unix(), | ||
Nanos: int32(t.Nanosecond()), | ||
} | ||
if err := s.Send(&lbpb.LoadBalanceRequest{ | ||
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{ | ||
ClientStats: &stats, | ||
}, | ||
}); err != nil { | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
@@ -322,6 +352,14 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b | |
grpclog.Println("TODO: Delegation is not supported yet.") | ||
return | ||
} | ||
streamDone := make(chan struct{}) | ||
defer close(streamDone) | ||
b.mu.Lock() | ||
b.clientStats = lbpb.ClientStats{} // Clear client stats. | ||
b.mu.Unlock() | ||
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { | ||
go b.sendLoadReport(stream, d, streamDone) | ||
} | ||
// Retrieve the server list. | ||
for { | ||
reply, err := stream.Recv() | ||
|
@@ -538,7 +576,31 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
err = ErrClientConnClosing | ||
return | ||
} | ||
seq := b.seq | ||
|
||
defer func() { | ||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preferred go-ism: if err != nil {
return
} And outdent the following code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
put = func() { | ||
s, ok := rpcStatsFromContext(ctx) | ||
if !ok { | ||
return | ||
} | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
if b.done || seq < b.seq { | ||
return | ||
} | ||
b.clientStats.NumCallsFinished++ | ||
if !s.bytesSent { | ||
b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
} else if s.bytesReceived { | ||
b.clientStats.NumCallsFinishedKnownReceived++ | ||
} | ||
} | ||
} | ||
}() | ||
|
||
b.clientStats.NumCallsStarted++ | ||
if len(b.addrs) > 0 { | ||
if b.next >= len(b.addrs) { | ||
b.next = 0 | ||
|
@@ -556,6 +618,13 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
} | ||
if !opts.BlockingWait { | ||
b.next = next | ||
if a.dropForLoadBalancing { | ||
b.clientStats.NumCallsFinished++ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optional: in the defer, if err != nil, do this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, these two values will not be protected by the same pair of lock/unlock. |
||
b.clientStats.NumCallsFinishedWithDropForLoadBalancing++ | ||
} else if a.dropForRateLimiting { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithDropForRateLimiting++ | ||
} | ||
b.mu.Unlock() | ||
err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr) | ||
return | ||
|
@@ -569,6 +638,8 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
} | ||
if !opts.BlockingWait { | ||
if len(b.addrs) == 0 { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
b.mu.Unlock() | ||
err = Errorf(codes.Unavailable, "there is no address available") | ||
return | ||
|
@@ -591,10 +662,14 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
select { | ||
case <-ctx.Done(): | ||
err = ctx.Err() | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
return | ||
case <-ch: | ||
b.mu.Lock() | ||
if b.done { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
b.mu.Unlock() | ||
err = ErrClientConnClosing | ||
return | ||
|
@@ -617,6 +692,13 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre | |
} | ||
if !opts.BlockingWait { | ||
b.next = next | ||
if a.dropForLoadBalancing { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithDropForLoadBalancing++ | ||
} else if a.dropForRateLimiting { | ||
b.clientStats.NumCallsFinished++ | ||
b.clientStats.NumCallsFinishedWithDropForRateLimiting++ | ||
} | ||
b.mu.Unlock() | ||
err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr) | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe combine the ifs here:
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done