diff --git a/plugin/ocgrpc/client_metrics.go b/plugin/ocgrpc/client_metrics.go index 49fde3d8c..fb3c19d6b 100644 --- a/plugin/ocgrpc/client_metrics.go +++ b/plugin/ocgrpc/client_metrics.go @@ -28,6 +28,7 @@ var ( ClientReceivedMessagesPerRPC = stats.Int64("grpc.io/client/received_messages_per_rpc", "Number of response messages received per RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless) ClientReceivedBytesPerRPC = stats.Int64("grpc.io/client/received_bytes_per_rpc", "Total bytes received across all response messages per RPC.", stats.UnitBytes) ClientRoundtripLatency = stats.Float64("grpc.io/client/roundtrip_latency", "Time between first byte of request sent to last byte of response received, or terminal error.", stats.UnitMilliseconds) + ClientStartedRPCs = stats.Int64("grpc.io/client/started_rpcs", "Number of started client RPCs.", stats.UnitDimensionless) ClientServerLatency = stats.Float64("grpc.io/client/server_latency", `Propagated from the server and should have the same value as "grpc.io/server/latency".`, stats.UnitMilliseconds) ) @@ -70,6 +71,14 @@ var ( Aggregation: view.Count(), } + ClientStartedRPCsView = &view.View{ + Measure: ClientStartedRPCs, + Name: "grpc.io/client/started_rpcs", + Description: "Number of started client RPCs.", + TagKeys: []tag.Key{KeyClientMethod}, + Aggregation: view.Count(), + } + ClientSentMessagesPerRPCView = &view.View{ Measure: ClientSentMessagesPerRPC, Name: "grpc.io/client/sent_messages_per_rpc", diff --git a/plugin/ocgrpc/end_to_end_test.go b/plugin/ocgrpc/end_to_end_test.go index 8715079d7..d106f4354 100644 --- a/plugin/ocgrpc/end_to_end_test.go +++ b/plugin/ocgrpc/end_to_end_test.go @@ -40,6 +40,8 @@ func TestEndToEnd_Single(t *testing.T) { ocgrpc.ClientReceivedMessagesPerRPCView, ocgrpc.ServerSentMessagesPerRPCView, ocgrpc.ClientSentMessagesPerRPCView, + ocgrpc.ServerStartedRPCsView, + ocgrpc.ClientStartedRPCsView, } view.Register(extraViews...) defer view.Unregister(extraViews...) @@ -63,10 +65,14 @@ func TestEndToEnd_Single(t *testing.T) { if err != nil { t.Fatal(err) } + checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag) + checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag) checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag) checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag) _, _ = client.Single(ctx, &testpb.FooRequest{Fail: true}) + checkCount(t, ocgrpc.ClientStartedRPCsView, 2, clientMethodTag) + checkCount(t, ocgrpc.ServerStartedRPCsView, 2, serverMethodTag) checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, serverStatusUnknownTag) checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, clientStatusUnknownTag) @@ -101,6 +107,7 @@ func TestEndToEnd_Single(t *testing.T) { func TestEndToEnd_Stream(t *testing.T) { view.Register(ocgrpc.DefaultClientViews...) defer view.Unregister(ocgrpc.DefaultClientViews...) + view.Register(ocgrpc.DefaultServerViews...) defer view.Unregister(ocgrpc.DefaultServerViews...) @@ -109,6 +116,8 @@ func TestEndToEnd_Stream(t *testing.T) { ocgrpc.ClientReceivedMessagesPerRPCView, ocgrpc.ServerSentMessagesPerRPCView, ocgrpc.ClientSentMessagesPerRPCView, + ocgrpc.ClientStartedRPCsView, + ocgrpc.ServerStartedRPCsView, } view.Register(extraViews...) defer view.Unregister(extraViews...) @@ -146,6 +155,8 @@ func TestEndToEnd_Stream(t *testing.T) { t.Fatal(err) } + checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag) + checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag) checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag) checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag) @@ -183,6 +194,7 @@ func getCount(t *testing.T, v *view.View, tags ...tag.Tag) (int64, bool) { return 0, false } } + rows, err := view.RetrieveData(v.Name) if err != nil { t.Fatal(err) diff --git a/plugin/ocgrpc/server_metrics.go b/plugin/ocgrpc/server_metrics.go index b2059824a..fe0e97108 100644 --- a/plugin/ocgrpc/server_metrics.go +++ b/plugin/ocgrpc/server_metrics.go @@ -27,6 +27,7 @@ var ( ServerReceivedBytesPerRPC = stats.Int64("grpc.io/server/received_bytes_per_rpc", "Total bytes received across all messages per RPC.", stats.UnitBytes) ServerSentMessagesPerRPC = stats.Int64("grpc.io/server/sent_messages_per_rpc", "Number of messages sent in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless) ServerSentBytesPerRPC = stats.Int64("grpc.io/server/sent_bytes_per_rpc", "Total bytes sent in across all response messages per RPC.", stats.UnitBytes) + ServerStartedRPCs = stats.Int64("grpc.io/server/started_rpcs", "Number of started server RPCs.", stats.UnitDimensionless) ServerLatency = stats.Float64("grpc.io/server/server_latency", "Time between first byte of request received to last byte of response sent, or terminal error.", stats.UnitMilliseconds) ) @@ -73,6 +74,14 @@ var ( Aggregation: view.Count(), } + ServerStartedRPCsView = &view.View{ + Measure: ServerStartedRPCs, + Name: "grpc.io/server/started_rpcs", + Description: "Number of started server RPCs.", + TagKeys: []tag.Key{KeyServerMethod}, + Aggregation: view.Count(), + } + ServerReceivedMessagesPerRPCView = &view.View{ Name: "grpc.io/server/received_messages_per_rpc", Description: "Distribution of messages received count per RPC, by method.", diff --git a/plugin/ocgrpc/stats_common.go b/plugin/ocgrpc/stats_common.go index 89cac9c4e..9cb27320c 100644 --- a/plugin/ocgrpc/stats_common.go +++ b/plugin/ocgrpc/stats_common.go @@ -82,8 +82,10 @@ func methodName(fullname string) string { // statsHandleRPC processes the RPC events. func statsHandleRPC(ctx context.Context, s stats.RPCStats) { switch st := s.(type) { - case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer: + case *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer: // do nothing for client + case *stats.Begin: + handleRPCBegin(ctx, st) case *stats.OutPayload: handleRPCOutPayload(ctx, st) case *stats.InPayload: @@ -95,6 +97,25 @@ func statsHandleRPC(ctx context.Context, s stats.RPCStats) { } } +func handleRPCBegin(ctx context.Context, s *stats.Begin) { + d, ok := ctx.Value(rpcDataKey).(*rpcData) + if !ok { + if grpclog.V(2) { + grpclog.Infoln("Failed to retrieve *rpcData from context.") + } + } + + if s.IsClient() { + ocstats.RecordWithOptions(ctx, + ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))), + ocstats.WithMeasurements(ClientStartedRPCs.M(1))) + } else { + ocstats.RecordWithOptions(ctx, + ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))), + ocstats.WithMeasurements(ServerStartedRPCs.M(1))) + } +} + func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) { d, ok := ctx.Value(rpcDataKey).(*rpcData) if !ok { diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index fe8e35fc4..af81fbb44 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -180,14 +180,14 @@ func Test_Worker_MultiExport(t *testing.T) { // Format is Resource.Labels encoded as string, then wantPartialData := map[string][]*Row{ - makeKey(nil, count.Name): []*Row{ + makeKey(nil, count.Name): { {[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}}, {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, }, - makeKey(nil, sum.Name): []*Row{ + makeKey(nil, sum.Name): { {nil, &SumData{Value: 7.5}}, }, - makeKey(&extraResource, count.Name): []*Row{ + makeKey(&extraResource, count.Name): { {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, }, }