Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: update clientStreamAttempt context with transport context #7096

Merged
merged 4 commits into from May 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 65 additions & 41 deletions test/stats_test.go
Expand Up @@ -21,7 +21,6 @@ package test
import (
"context"
"net"
"sync"
"testing"

"google.golang.org/grpc"
Expand All @@ -32,91 +31,116 @@ import (
"google.golang.org/grpc/stats"
)

// TestPeerForClientStatsHandler tests the scenario where stats handler
// (having peer as part of its struct) has peer enriched as part of
// stream context.
// TestPeerForClientStatsHandler configures a stats handler that
// verifies that peer is sent for OutPayload, InPayload, End
// stats handlers.
zasweq marked this conversation as resolved.
Show resolved Hide resolved
func (s) TestPeerForClientStatsHandler(t *testing.T) {
spy := &handlerSpy{}
statsHandler := &peerStatsHandler{}
zasweq marked this conversation as resolved.
Show resolved Hide resolved

// Define expected stats callouts and whether a peer object should be populated.
// Note:
// * Begin stats don't have peer information as the RPC begins before peer resolution.
// * PickerUpdated stats don't have peer information as the picker operates without transport-level knowledge.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap comments to 80 characters. The guidance for this repo is comments wrapped to 80 characters, but we do not have a length maximum for code line characters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

expectedCallouts := map[stats.RPCStats]bool{
&stats.OutPayload{}: true,
&stats.InHeader{}: true,
&stats.OutHeader{}: true,
&stats.InTrailer{}: true,
&stats.OutTrailer{}: true,
&stats.End{}: true,
&stats.Begin{}: false,
&stats.PickerUpdated{}: false,
}

// Start server.
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
grpcServer := grpc.NewServer()
testgrpc.RegisterTestServiceServer(grpcServer, interop.NewTestServer())
s := grpc.NewServer()
testgrpc.RegisterTestServiceServer(s, interop.NewTestServer())
errCh := make(chan error)
go func() {
errCh <- grpcServer.Serve(l)
errCh <- s.Serve(l)
}()
t.Cleanup(func() {
grpcServer.Stop()
defer func() {
s.Stop()
if err := <-errCh; err != nil {
t.Error(err)
}
})
}()

// Create client with stats handler and do some calls.
conn, err := grpc.NewClient(
cc, err := grpc.NewClient(
l.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(spy))
grpc.WithStatsHandler(statsHandler))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := conn.Close(); err != nil {
if err := cc.Close(); err != nil {
t.Error(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: This t.Error is not really in the essence of the test. I think maybe we can get rid of it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(conn)
interop.DoEmptyUnaryCall(ctx, client)
interop.DoLargeUnaryCall(ctx, client)
client := testgrpc.NewTestServiceClient(cc)
interop.DoClientStreaming(ctx, client)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to have a subtest for each RPC type (unary, bidi, etc)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesnt seem to be resolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zach suggested here to have single RPC type.

interop.DoServerStreaming(ctx, client)
interop.DoPingPong(ctx, client)

// Assert if peer is populated for each stats type.
for _, callbackArgs := range spy.Args {
if callbackArgs.Peer == nil {
switch callbackArgs.RPCStats.(type) {
case *stats.Begin:
continue
case *stats.PickerUpdated:
continue
default:
}
t.Errorf("peer not populated for: %T", callbackArgs.RPCStats)

if len(getUniqueRPCStats(statsHandler.Args)) < len(expectedCallouts) {
t.Errorf("Unexpected number of stats handler callouts.")
}
zasweq marked this conversation as resolved.
Show resolved Hide resolved

for _, callbackArgs := range statsHandler.Args {
expectedPeer, found := expectedCallouts[callbackArgs.rpcStats]
// In case expectation is set to false and still we got the peer,
// then it's good to have it. So no need to assert those conditions.
if found && expectedPeer && callbackArgs.peer != nil {
continue
} else if expectedPeer && callbackArgs.peer == nil {
t.Errorf("peer not populated for: %T", callbackArgs.rpcStats)
}
}
}

// getUniqueRPCStats extracts a list of unique stats.RPCStats types from peer list of RPC callback.
func getUniqueRPCStats(args []peerStats) []stats.RPCStats {
uniqueStatsTypes := make(map[stats.RPCStats]struct{})

for _, callbackArgs := range args {
uniqueStatsTypes[callbackArgs.rpcStats] = struct{}{}
}

var uniqueStatsList []stats.RPCStats
for statsType := range uniqueStatsTypes {
uniqueStatsList = append(uniqueStatsList, statsType)
}

return uniqueStatsList
}

type peerStats struct {
RPCStats stats.RPCStats
Peer *peer.Peer
rpcStats stats.RPCStats
peer *peer.Peer
}

type handlerSpy struct {
type peerStatsHandler struct {
Args []peerStats
zasweq marked this conversation as resolved.
Show resolved Hide resolved
mu sync.Mutex
}

func (h *handlerSpy) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
func (h *peerStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}

func (h *handlerSpy) HandleRPC(ctx context.Context, rs stats.RPCStats) {
h.mu.Lock()
func (h *peerStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
p, _ := peer.FromContext(ctx)
h.Args = append(h.Args, peerStats{rs, p})
h.mu.Unlock()
}

func (h *handlerSpy) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
func (h *peerStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
}

func (h *handlerSpy) HandleConn(context.Context, stats.ConnStats) {}
func (h *peerStatsHandler) HandleConn(context.Context, stats.ConnStats) {}