Skip to content

Commit

Permalink
final fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Apr 20, 2024
1 parent 2282dec commit 1500079
Showing 1 changed file with 65 additions and 41 deletions.
106 changes: 65 additions & 41 deletions test/stats_test.go
Expand Up @@ -21,7 +21,6 @@ package test
import (
"context"
"net"
"sync"
"testing"

"google.golang.org/grpc"
Expand All @@ -32,91 +31,116 @@ import (
"google.golang.org/grpc/stats"
)

// TestPeerForClientStatsHandler tests the scenario where stats handler
// (having peer as part of its struct) has peer enriched as part of
// stream context.
// TestPeerForClientStatsHandler configures a stats handler that
// verifies that peer is sent for OutPayload, InPayload, End
// stats handlers.
func (s) TestPeerForClientStatsHandler(t *testing.T) {
spy := &handlerSpy{}
statsHandler := &peerStatsHandler{}

// Define expected stats callouts and whether a peer object should be populated.
// Note:
// * Begin stats don't have peer information as the RPC begins before peer resolution.
// * PickerUpdated stats don't have peer information as the picker operates without transport-level knowledge.
expectedCallouts := map[stats.RPCStats]bool{
&stats.OutPayload{}: true,
&stats.InHeader{}: true,
&stats.OutHeader{}: true,
&stats.InTrailer{}: true,
&stats.OutTrailer{}: true,
&stats.End{}: true,
&stats.Begin{}: false,
&stats.PickerUpdated{}: false,
}

// Start server.
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
grpcServer := grpc.NewServer()
testgrpc.RegisterTestServiceServer(grpcServer, interop.NewTestServer())
s := grpc.NewServer()
testgrpc.RegisterTestServiceServer(s, interop.NewTestServer())
errCh := make(chan error)
go func() {
errCh <- grpcServer.Serve(l)
errCh <- s.Serve(l)
}()
t.Cleanup(func() {
grpcServer.Stop()
defer func() {
s.Stop()
if err := <-errCh; err != nil {
t.Error(err)
}
})
}()

// Create client with stats handler and do some calls.
conn, err := grpc.NewClient(
cc, err := grpc.NewClient(
l.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(spy))
grpc.WithStatsHandler(statsHandler))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := conn.Close(); err != nil {
if err := cc.Close(); err != nil {
t.Error(err)
}
})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(conn)
interop.DoEmptyUnaryCall(ctx, client)
interop.DoLargeUnaryCall(ctx, client)
client := testgrpc.NewTestServiceClient(cc)
interop.DoClientStreaming(ctx, client)
interop.DoServerStreaming(ctx, client)
interop.DoPingPong(ctx, client)

// Assert if peer is populated for each stats type.
for _, callbackArgs := range spy.Args {
if callbackArgs.Peer == nil {
switch callbackArgs.RPCStats.(type) {
case *stats.Begin:
continue
case *stats.PickerUpdated:
continue
default:
}
t.Errorf("peer not populated for: %T", callbackArgs.RPCStats)

if len(getUniqueRPCStats(statsHandler.Args)) < len(expectedCallouts) {
t.Errorf("Unexpected number of stats handler callouts.")
}

for _, callbackArgs := range statsHandler.Args {
expectedPeer, found := expectedCallouts[callbackArgs.rpcStats]
// In case expectation is set to false and still we got the peer,
// then it's good to have it. So no need to assert those conditions.
if found && expectedPeer && callbackArgs.peer != nil {
continue
} else if expectedPeer && callbackArgs.peer == nil {
t.Errorf("peer not populated for: %T", callbackArgs.rpcStats)
}
}
}

// getUniqueRPCStats extracts a list of unique stats.RPCStats types from peer list of RPC callback.
func getUniqueRPCStats(args []peerStats) []stats.RPCStats {
uniqueStatsTypes := make(map[stats.RPCStats]struct{})

for _, callbackArgs := range args {
uniqueStatsTypes[callbackArgs.rpcStats] = struct{}{}
}

var uniqueStatsList []stats.RPCStats
for statsType := range uniqueStatsTypes {
uniqueStatsList = append(uniqueStatsList, statsType)
}

return uniqueStatsList
}

type peerStats struct {
RPCStats stats.RPCStats
Peer *peer.Peer
rpcStats stats.RPCStats
peer *peer.Peer
}

type handlerSpy struct {
type peerStatsHandler struct {
Args []peerStats
mu sync.Mutex
}

func (h *handlerSpy) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
func (h *peerStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}

func (h *handlerSpy) HandleRPC(ctx context.Context, rs stats.RPCStats) {
h.mu.Lock()
func (h *peerStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
p, _ := peer.FromContext(ctx)
h.Args = append(h.Args, peerStats{rs, p})
h.mu.Unlock()
}

func (h *handlerSpy) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
func (h *peerStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
}

func (h *handlerSpy) HandleConn(context.Context, stats.ConnStats) {}
func (h *peerStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

0 comments on commit 1500079

Please sign in to comment.