diff --git a/plugin/ocgrpc/client_stats_handler_test.go b/plugin/ocgrpc/client_stats_handler_test.go index 7a2c366c0..d271b4031 100644 --- a/plugin/ocgrpc/client_stats_handler_test.go +++ b/plugin/ocgrpc/client_stats_handler_test.go @@ -318,6 +318,9 @@ func TestClientDefaultCollections(t *testing.T) { t.Errorf("%q: RetrieveData(%q) = %v", tc.label, wantData.v().Name, err) continue } + for i := range gotRows { + view.ClearStart(gotRows[i].Data) + } for _, gotRow := range gotRows { if !containsRow(wantData.rows, gotRow) { diff --git a/plugin/ocgrpc/server_stats_handler_test.go b/plugin/ocgrpc/server_stats_handler_test.go index fb45bb62a..ab14f63f2 100644 --- a/plugin/ocgrpc/server_stats_handler_test.go +++ b/plugin/ocgrpc/server_stats_handler_test.go @@ -305,6 +305,10 @@ func TestServerDefaultCollections(t *testing.T) { continue } + for i := range gotRows { + view.ClearStart(gotRows[i].Data) + } + for _, gotRow := range gotRows { if !containsRow(wantData.rows, gotRow) { t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow) diff --git a/plugin/ochttp/route_test.go b/plugin/ochttp/route_test.go index 2efe23fc5..50b9bb34a 100644 --- a/plugin/ochttp/route_test.go +++ b/plugin/ochttp/route_test.go @@ -53,6 +53,9 @@ func TestWithRouteTag(t *testing.T) { view.Unregister(v) // trigger exporting got := e.rowsForView("request_total") + for i := range got { + view.ClearStart(got[i].Data) + } want := []*view.Row{ {Data: &view.CountData{Value: 1}, Tags: []tag.Tag{{Key: ochttp.KeyServerRoute, Value: "/a/"}}}, } @@ -90,6 +93,9 @@ func TestSetRoute(t *testing.T) { view.Unregister(v) // trigger exporting got := e.rowsForView("request_total") + for i := range got { + view.ClearStart(got[i].Data) + } want := []*view.Row{ {Data: &view.CountData{Value: 1}, Tags: []tag.Tag{{Key: ochttp.KeyServerRoute, Value: "/a/"}}}, } diff --git a/stats/view/aggregation.go b/stats/view/aggregation.go index 9d7093728..748bd568c 100644 --- a/stats/view/aggregation.go +++ b/stats/view/aggregation.go @@ -15,6 +15,8 @@ package view +import "time" + // AggType represents the type of aggregation function used on a View. type AggType int @@ -45,20 +47,20 @@ type Aggregation struct { Type AggType // Type is the AggType of this Aggregation. Buckets []float64 // Buckets are the bucket endpoints if this Aggregation represents a distribution, see Distribution. - newData func() AggregationData + newData func(time.Time) AggregationData } var ( aggCount = &Aggregation{ Type: AggTypeCount, - newData: func() AggregationData { - return &CountData{} + newData: func(t time.Time) AggregationData { + return &CountData{Start: t} }, } aggSum = &Aggregation{ Type: AggTypeSum, - newData: func() AggregationData { - return &SumData{} + newData: func(t time.Time) AggregationData { + return &SumData{Start: t} }, } ) @@ -103,8 +105,8 @@ func Distribution(bounds ...float64) *Aggregation { Type: AggTypeDistribution, Buckets: bounds, } - agg.newData = func() AggregationData { - return newDistributionData(agg) + agg.newData = func(t time.Time) AggregationData { + return newDistributionData(agg, t) } return agg } @@ -114,7 +116,7 @@ func Distribution(bounds ...float64) *Aggregation { func LastValue() *Aggregation { return &Aggregation{ Type: AggTypeLastValue, - newData: func() AggregationData { + newData: func(_ time.Time) AggregationData { return &LastValueData{} }, } diff --git a/stats/view/aggregation_data.go b/stats/view/aggregation_data.go index f331d456e..d93b52066 100644 --- a/stats/view/aggregation_data.go +++ b/stats/view/aggregation_data.go @@ -31,6 +31,7 @@ type AggregationData interface { clone() AggregationData equal(other AggregationData) bool toPoint(t metricdata.Type, time time.Time) metricdata.Point + StartTime() time.Time } const epsilon = 1e-9 @@ -40,6 +41,7 @@ const epsilon = 1e-9 // // Most users won't directly access count data. type CountData struct { + Start time.Time Value int64 } @@ -50,7 +52,7 @@ func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) } func (a *CountData) clone() AggregationData { - return &CountData{Value: a.Value} + return &CountData{Value: a.Value, Start: a.Start} } func (a *CountData) equal(other AggregationData) bool { @@ -59,7 +61,7 @@ func (a *CountData) equal(other AggregationData) bool { return false } - return a.Value == a2.Value + return a.Start.Equal(a2.Start) && a.Value == a2.Value } func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { @@ -71,11 +73,17 @@ func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata. } } +// StartTime returns the start time of the data being aggregated by CountData. +func (a *CountData) StartTime() time.Time { + return a.Start +} + // SumData is the aggregated data for the Sum aggregation. // A sum aggregation processes data and sums up the recordings. // // Most users won't directly access sum data. type SumData struct { + Start time.Time Value float64 } @@ -86,7 +94,7 @@ func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) { } func (a *SumData) clone() AggregationData { - return &SumData{Value: a.Value} + return &SumData{Value: a.Value, Start: a.Start} } func (a *SumData) equal(other AggregationData) bool { @@ -94,7 +102,7 @@ func (a *SumData) equal(other AggregationData) bool { if !ok { return false } - return math.Pow(a.Value-a2.Value, 2) < epsilon + return a.Start.Equal(a2.Start) && math.Pow(a.Value-a2.Value, 2) < epsilon } func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { @@ -108,6 +116,11 @@ func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Po } } +// StartTime returns the start time of the data being aggregated by SumData. +func (a *SumData) StartTime() time.Time { + return a.Start +} + // DistributionData is the aggregated data for the // Distribution aggregation. // @@ -126,9 +139,10 @@ type DistributionData struct { // an exemplar for the associated bucket, or nil. ExemplarsPerBucket []*metricdata.Exemplar bounds []float64 // histogram distribution of the values + Start time.Time } -func newDistributionData(agg *Aggregation) *DistributionData { +func newDistributionData(agg *Aggregation, t time.Time) *DistributionData { bucketCount := len(agg.Buckets) + 1 return &DistributionData{ CountPerBucket: make([]int64, bucketCount), @@ -136,6 +150,7 @@ func newDistributionData(agg *Aggregation) *DistributionData { bounds: agg.Buckets, Min: math.MaxFloat64, Max: math.SmallestNonzeroFloat64, + Start: t, } } @@ -226,7 +241,11 @@ func (a *DistributionData) equal(other AggregationData) bool { return false } } - return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon + return a.Start.Equal(a2.Start) && + a.Count == a2.Count && + a.Min == a2.Min && + a.Max == a2.Max && + math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon } func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { @@ -256,6 +275,11 @@ func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metr } } +// StartTime returns the start time of the data being aggregated by DistributionData. +func (a *DistributionData) StartTime() time.Time { + return a.Start +} + // LastValueData returns the last value recorded for LastValue aggregation. type LastValueData struct { Value float64 @@ -291,3 +315,22 @@ func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricd panic("unsupported metricdata.Type") } } + +// StartTime returns an empty time value as start time is not recorded when using last value +// aggregation. +func (l *LastValueData) StartTime() time.Time { + return time.Time{} +} + +// ClearStart clears the Start field from data if present. Useful for testing in cases where the +// start time will be nondeterministic. +func ClearStart(data AggregationData) { + switch data := data.(type) { + case *CountData: + data.Start = time.Time{} + case *SumData: + data.Start = time.Time{} + case *DistributionData: + data.Start = time.Time{} + } +} diff --git a/stats/view/aggregation_data_test.go b/stats/view/aggregation_data_test.go index 7d09a8fe4..d0f48cfcd 100644 --- a/stats/view/aggregation_data_test.go +++ b/stats/view/aggregation_data_test.go @@ -29,7 +29,7 @@ func TestDataClone(t *testing.T) { agg := &Aggregation{ Buckets: []float64{1, 2, 3, 4}, } - dist := newDistributionData(agg) + dist := newDistributionData(agg, time.Time{}) dist.Count = 7 dist.Max = 11 dist.Min = 1 @@ -72,7 +72,7 @@ func TestDistributionData_addSample(t *testing.T) { agg := &Aggregation{ Buckets: []float64{1, 2}, } - dd := newDistributionData(agg) + dd := newDistributionData(agg, time.Time{}) attachments1 := map[string]interface{}{"key1": "value1"} t1 := time.Now() dd.addSample(0.5, attachments1, t1) diff --git a/stats/view/collector.go b/stats/view/collector.go index 8a6a2c0fd..ac22c93a2 100644 --- a/stats/view/collector.go +++ b/stats/view/collector.go @@ -35,7 +35,7 @@ type collector struct { func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) { aggregator, ok := c.signatures[s] if !ok { - aggregator = c.a.newData() + aggregator = c.a.newData(t) c.signatures[s] = aggregator } aggregator.addSample(v, attachments, t) diff --git a/stats/view/view_test.go b/stats/view/view_test.go index 2a2bde494..23b234d70 100644 --- a/stats/view/view_test.go +++ b/stats/view/view_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "go.opencensus.io/metric/metricdata" @@ -51,6 +52,7 @@ func Test_View_MeasureFloat64_AggregationDistribution(t *testing.T) { type record struct { f float64 tags []tagString + t time.Time } type testCase struct { @@ -59,18 +61,23 @@ func Test_View_MeasureFloat64_AggregationDistribution(t *testing.T) { wantRows []*Row } + now := time.Now() + ts := make([]time.Time, 7) + for i := range ts { + ts[i] = now.Add(time.Duration(i) * time.Second) + } tcs := []testCase{ { "1", []record{ - {1, []tagString{{k1, "v1"}}}, - {5, []tagString{{k1, "v1"}}}, + {1, []tagString{{k1, "v1"}}, ts[0]}, + {5, []tagString{{k1, "v1"}}, ts[1]}, }, []*Row{ { []tag.Tag{{Key: k1, Value: "v1"}}, &DistributionData{ - Count: 2, Min: 1, Max: 5, Mean: 3, SumOfSquaredDev: 8, CountPerBucket: []int64{1, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 2, Min: 1, Max: 5, Mean: 3, SumOfSquaredDev: 8, CountPerBucket: []int64{1, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[0], }, }, }, @@ -78,20 +85,20 @@ func Test_View_MeasureFloat64_AggregationDistribution(t *testing.T) { { "2", []record{ - {1, []tagString{{k1, "v1"}}}, - {5, []tagString{{k2, "v2"}}}, + {1, []tagString{{k1, "v1"}}, ts[0]}, + {5, []tagString{{k2, "v2"}}, ts[1]}, }, []*Row{ { []tag.Tag{{Key: k1, Value: "v1"}}, &DistributionData{ - Count: 1, Min: 1, Max: 1, Mean: 1, CountPerBucket: []int64{1, 0}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 1, Min: 1, Max: 1, Mean: 1, CountPerBucket: []int64{1, 0}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[0], }, }, { []tag.Tag{{Key: k2, Value: "v2"}}, &DistributionData{ - Count: 1, Min: 5, Max: 5, Mean: 5, CountPerBucket: []int64{0, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 1, Min: 5, Max: 5, Mean: 5, CountPerBucket: []int64{0, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[1], }, }, }, @@ -99,35 +106,35 @@ func Test_View_MeasureFloat64_AggregationDistribution(t *testing.T) { { "3", []record{ - {1, []tagString{{k1, "v1"}}}, - {5, []tagString{{k1, "v1"}, {k3, "v3"}}}, - {1, []tagString{{k1, "v1 other"}}}, - {5, []tagString{{k2, "v2"}}}, - {5, []tagString{{k1, "v1"}, {k2, "v2"}}}, + {1, []tagString{{k1, "v1"}}, ts[0]}, + {5, []tagString{{k1, "v1"}, {k3, "v3"}}, ts[1]}, + {1, []tagString{{k1, "v1 other"}}, ts[2]}, + {5, []tagString{{k2, "v2"}}, ts[3]}, + {5, []tagString{{k1, "v1"}, {k2, "v2"}}, ts[4]}, }, []*Row{ { []tag.Tag{{Key: k1, Value: "v1"}}, &DistributionData{ - Count: 2, Min: 1, Max: 5, Mean: 3, SumOfSquaredDev: 8, CountPerBucket: []int64{1, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 2, Min: 1, Max: 5, Mean: 3, SumOfSquaredDev: 8, CountPerBucket: []int64{1, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[0], }, }, { []tag.Tag{{Key: k1, Value: "v1 other"}}, &DistributionData{ - Count: 1, Min: 1, Max: 1, Mean: 1, CountPerBucket: []int64{1, 0}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 1, Min: 1, Max: 1, Mean: 1, CountPerBucket: []int64{1, 0}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[2], }, }, { []tag.Tag{{Key: k2, Value: "v2"}}, &DistributionData{ - Count: 1, Min: 5, Max: 5, Mean: 5, CountPerBucket: []int64{0, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 1, Min: 5, Max: 5, Mean: 5, CountPerBucket: []int64{0, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[3], }, }, { []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}}, &DistributionData{ - Count: 1, Min: 5, Max: 5, Mean: 5, CountPerBucket: []int64{0, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 1, Min: 5, Max: 5, Mean: 5, CountPerBucket: []int64{0, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[4], }, }, }, @@ -135,31 +142,31 @@ func Test_View_MeasureFloat64_AggregationDistribution(t *testing.T) { { "4", []record{ - {1, []tagString{{k1, "v1 is a very long value key"}}}, - {5, []tagString{{k1, "v1 is a very long value key"}, {k3, "v3"}}}, - {1, []tagString{{k1, "v1 is another very long value key"}}}, - {1, []tagString{{k1, "v1 is a very long value key"}, {k2, "v2 is a very long value key"}}}, - {5, []tagString{{k1, "v1 is a very long value key"}, {k2, "v2 is a very long value key"}}}, - {3, []tagString{{k1, "v1 is a very long value key"}, {k2, "v2 is a very long value key"}}}, - {3, []tagString{{k1, "v1 is a very long value key"}, {k2, "v2 is a very long value key"}}}, + {1, []tagString{{k1, "v1 is a very long value key"}}, ts[0]}, + {5, []tagString{{k1, "v1 is a very long value key"}, {k3, "v3"}}, ts[1]}, + {1, []tagString{{k1, "v1 is another very long value key"}}, ts[2]}, + {1, []tagString{{k1, "v1 is a very long value key"}, {k2, "v2 is a very long value key"}}, ts[3]}, + {5, []tagString{{k1, "v1 is a very long value key"}, {k2, "v2 is a very long value key"}}, ts[4]}, + {3, []tagString{{k1, "v1 is a very long value key"}, {k2, "v2 is a very long value key"}}, ts[5]}, + {3, []tagString{{k1, "v1 is a very long value key"}, {k2, "v2 is a very long value key"}}, ts[6]}, }, []*Row{ { []tag.Tag{{Key: k1, Value: "v1 is a very long value key"}}, &DistributionData{ - Count: 2, Min: 1, Max: 5, Mean: 3, SumOfSquaredDev: 8, CountPerBucket: []int64{1, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 2, Min: 1, Max: 5, Mean: 3, SumOfSquaredDev: 8, CountPerBucket: []int64{1, 1}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[0], }, }, { []tag.Tag{{Key: k1, Value: "v1 is another very long value key"}}, &DistributionData{ - Count: 1, Min: 1, Max: 1, Mean: 1, CountPerBucket: []int64{1, 0}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 1, Min: 1, Max: 1, Mean: 1, CountPerBucket: []int64{1, 0}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[2], }, }, { []tag.Tag{{Key: k1, Value: "v1 is a very long value key"}, {Key: k2, Value: "v2 is a very long value key"}}, &DistributionData{ - Count: 4, Min: 1, Max: 5, Mean: 3, SumOfSquaredDev: 2.66666666666667 * 3, CountPerBucket: []int64{1, 3}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, + Count: 4, Min: 1, Max: 5, Mean: 3, SumOfSquaredDev: 2.66666666666667 * 3, CountPerBucket: []int64{1, 3}, bounds: []float64{2}, ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil}, Start: ts[3], }, }, }, @@ -178,22 +185,13 @@ func Test_View_MeasureFloat64_AggregationDistribution(t *testing.T) { if err != nil { t.Errorf("%v: New = %v", tc.label, err) } - view.addSample(tag.FromContext(ctx), r.f, nil, time.Now()) + view.addSample(tag.FromContext(ctx), r.f, nil, r.t) } gotRows := view.collectedRows() - for i, got := range gotRows { - if !containsRow(tc.wantRows, got) { - t.Errorf("%v-%d: got row %v; want none", tc.label, i, got) - break - } - } - - for i, want := range tc.wantRows { - if !containsRow(gotRows, want) { - t.Errorf("%v-%d: got none; want row %v", tc.label, i, want) - break - } + if diff := cmp.Diff(gotRows, tc.wantRows, cmpopts.SortSlices(cmpRow)); diff != "" { + t.Errorf("%v: unexpected row (got-, want+): %s", tc.label, diff) + break } } } @@ -215,8 +213,14 @@ func Test_View_MeasureFloat64_AggregationSum(t *testing.T) { type record struct { f float64 tags []tagString + t time.Time } + now := time.Now() + ts := make([]time.Time, 5) + for i := range ts { + ts[i] = now.Add(time.Duration(i) * time.Second) + } tcs := []struct { label string records []record @@ -225,58 +229,58 @@ func Test_View_MeasureFloat64_AggregationSum(t *testing.T) { { "1", []record{ - {1, []tagString{{k1, "v1"}}}, - {5, []tagString{{k1, "v1"}}}, + {1, []tagString{{k1, "v1"}}, ts[0]}, + {5, []tagString{{k1, "v1"}}, ts[1]}, }, []*Row{ { []tag.Tag{{Key: k1, Value: "v1"}}, - &SumData{Value: 6}, + &SumData{Value: 6, Start: ts[0]}, }, }, }, { "2", []record{ - {1, []tagString{{k1, "v1"}}}, - {5, []tagString{{k2, "v2"}}}, + {1, []tagString{{k1, "v1"}}, ts[0]}, + {5, []tagString{{k2, "v2"}}, ts[1]}, }, []*Row{ { []tag.Tag{{Key: k1, Value: "v1"}}, - &SumData{Value: 1}, + &SumData{Value: 1, Start: ts[0]}, }, { []tag.Tag{{Key: k2, Value: "v2"}}, - &SumData{Value: 5}, + &SumData{Value: 5, Start: ts[1]}, }, }, }, { "3", []record{ - {1, []tagString{{k1, "v1"}}}, - {5, []tagString{{k1, "v1"}, {k3, "v3"}}}, - {1, []tagString{{k1, "v1 other"}}}, - {5, []tagString{{k2, "v2"}}}, - {5, []tagString{{k1, "v1"}, {k2, "v2"}}}, + {1, []tagString{{k1, "v1"}}, ts[0]}, + {5, []tagString{{k1, "v1"}, {k3, "v3"}}, ts[1]}, + {1, []tagString{{k1, "v1 other"}}, ts[2]}, + {5, []tagString{{k2, "v2"}}, ts[3]}, + {5, []tagString{{k1, "v1"}, {k2, "v2"}}, ts[4]}, }, []*Row{ { []tag.Tag{{Key: k1, Value: "v1"}}, - &SumData{Value: 6}, + &SumData{Value: 6, Start: ts[0]}, }, { []tag.Tag{{Key: k1, Value: "v1 other"}}, - &SumData{Value: 1}, + &SumData{Value: 1, Start: ts[2]}, }, { []tag.Tag{{Key: k2, Value: "v2"}}, - &SumData{Value: 5}, + &SumData{Value: 5, Start: ts[3]}, }, { []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}}, - &SumData{Value: 5}, + &SumData{Value: 5, Start: ts[4]}, }, }, }, @@ -294,22 +298,13 @@ func Test_View_MeasureFloat64_AggregationSum(t *testing.T) { if err != nil { t.Errorf("%v: New = %v", tt.label, err) } - view.addSample(tag.FromContext(ctx), r.f, nil, time.Now()) + view.addSample(tag.FromContext(ctx), r.f, nil, r.t) } gotRows := view.collectedRows() - for i, got := range gotRows { - if !containsRow(tt.wantRows, got) { - t.Errorf("%v-%d: got row %v; want none", tt.label, i, got) - break - } - } - - for i, want := range tt.wantRows { - if !containsRow(gotRows, want) { - t.Errorf("%v-%d: got none; want row %v", tt.label, i, want) - break - } + if diff := cmp.Diff(gotRows, tt.wantRows, cmpopts.SortSlices(cmpRow)); diff != "" { + t.Errorf("%v: unexpected row (got-, want+): %s", tt.label, diff) + break } } } @@ -367,14 +362,8 @@ func TestViewSortedKeys(t *testing.T) { } } -// containsRow returns true if rows contain r. -func containsRow(rows []*Row, r *Row) bool { - for _, x := range rows { - if r.Equal(x) { - return true - } - } - return false +func cmpRow(r1 *Row, r2 *Row) bool { + return r1.Data.StartTime().Before(r2.Data.StartTime()) } func TestRegisterUnregisterParity(t *testing.T) { diff --git a/stats/view/view_to_metric.go b/stats/view/view_to_metric.go index 5e1656a1f..57d615ec7 100644 --- a/stats/view/view_to_metric.go +++ b/stats/view/view_to_metric.go @@ -119,20 +119,15 @@ func toLabelValues(row *Row, expectedKeys []metricdata.LabelKey) []metricdata.La return labelValues } -func rowToTimeseries(v *viewInternal, row *Row, now time.Time, startTime time.Time) *metricdata.TimeSeries { +func rowToTimeseries(v *viewInternal, row *Row, now time.Time) *metricdata.TimeSeries { return &metricdata.TimeSeries{ Points: []metricdata.Point{row.Data.toPoint(v.metricDescriptor.Type, now)}, LabelValues: toLabelValues(row, v.metricDescriptor.LabelKeys), - StartTime: startTime, + StartTime: row.Data.StartTime(), } } -func viewToMetric(v *viewInternal, r *resource.Resource, now time.Time, startTime time.Time) *metricdata.Metric { - if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 || - v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 { - startTime = time.Time{} - } - +func viewToMetric(v *viewInternal, r *resource.Resource, now time.Time) *metricdata.Metric { rows := v.collectedRows() if len(rows) == 0 { return nil @@ -140,7 +135,7 @@ func viewToMetric(v *viewInternal, r *resource.Resource, now time.Time, startTim ts := []*metricdata.TimeSeries{} for _, row := range rows { - ts = append(ts, rowToTimeseries(v, row, now, startTime)) + ts = append(ts, rowToTimeseries(v, row, now)) } m := &metricdata.Metric{ diff --git a/stats/view/view_to_metric_test.go b/stats/view/view_to_metric_test.go index b6df3a0f7..e7676a6e2 100644 --- a/stats/view/view_to_metric_test.go +++ b/stats/view/view_to_metric_test.go @@ -253,7 +253,6 @@ func initMetricDescriptors() { } func Test_ViewToMetric(t *testing.T) { - startTime := time.Now().Add(-60 * time.Second) now := time.Now() tests := []*testToMetrics{ { @@ -277,7 +276,7 @@ func Test_ViewToMetric(t *testing.T) { }, }, LabelValues: labelValues, - StartTime: startTime, + StartTime: now, }, }, }, @@ -305,7 +304,7 @@ func Test_ViewToMetric(t *testing.T) { }, }, LabelValues: labelValues, - StartTime: startTime, + StartTime: now, }, }, }, @@ -320,7 +319,7 @@ func Test_ViewToMetric(t *testing.T) { metricdata.NewInt64Point(now, 2), }, LabelValues: labelValues, - StartTime: startTime, + StartTime: now, }, }, }, @@ -335,7 +334,7 @@ func Test_ViewToMetric(t *testing.T) { metricdata.NewInt64Point(now, 2), }, LabelValues: labelValues, - StartTime: startTime, + StartTime: now, }, }, }, @@ -350,7 +349,7 @@ func Test_ViewToMetric(t *testing.T) { metricdata.NewInt64Point(now, 6), }, LabelValues: labelValues, - StartTime: startTime, + StartTime: now, }, }, }, @@ -365,7 +364,7 @@ func Test_ViewToMetric(t *testing.T) { metricdata.NewFloat64Point(now, 6.9), }, LabelValues: labelValues, - StartTime: startTime, + StartTime: now, }, }, }, @@ -417,12 +416,10 @@ func Test_ViewToMetric(t *testing.T) { }, } - wantMetrics := []*metricdata.Metric{} for _, tc := range tests { tc.vi, _ = defaultWorker.tryRegisterView(tc.view) tc.vi.clearRows() tc.vi.subscribe() - wantMetrics = append(wantMetrics, tc.wantMetric) } for i, tc := range tests { @@ -447,7 +444,7 @@ func Test_ViewToMetric(t *testing.T) { tc.vi.addSample(tag.FromContext(ctx), v, nil, now) } - gotMetric := viewToMetric(tc.vi, nil, now, startTime) + gotMetric := viewToMetric(tc.vi, nil, now) if !cmp.Equal(gotMetric, tc.wantMetric) { // JSON format is strictly for checking the content when test fails. Do not use JSON // format to determine if the two values are same as it doesn't differentiate between @@ -461,7 +458,6 @@ func Test_ViewToMetric(t *testing.T) { // Test to verify that a metric converted from a view with Aggregation Count should always // have Dimensionless unit. func TestUnitConversionForAggCount(t *testing.T) { - startTime := time.Now().Add(-60 * time.Second) now := time.Now() tests := []*struct { name string @@ -509,7 +505,7 @@ func TestUnitConversionForAggCount(t *testing.T) { for _, tc := range tests { tc.vi.addSample(tag.FromContext(context.Background()), 5.0, nil, now) - gotMetric := viewToMetric(tc.vi, nil, now, startTime) + gotMetric := viewToMetric(tc.vi, nil, now) gotUnit := gotMetric.Descriptor.Unit if !cmp.Equal(gotUnit, tc.wantUnit) { t.Errorf("Verify Unit: %s: Got:%v Want:%v", tc.name, gotUnit, tc.wantUnit) diff --git a/stats/view/worker.go b/stats/view/worker.go index ab8bfd46d..6e8d18b7f 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -41,9 +41,9 @@ type measureRef struct { } type worker struct { - measures map[string]*measureRef - views map[string]*viewInternal - startTimes map[*viewInternal]time.Time + measures map[string]*measureRef + views map[string]*viewInternal + viewStartTimes map[*viewInternal]time.Time timer *time.Ticker c chan command @@ -244,13 +244,13 @@ func (w *worker) SetReportingPeriod(d time.Duration) { // a single process. func NewMeter() Meter { return &worker{ - measures: make(map[string]*measureRef), - views: make(map[string]*viewInternal), - startTimes: make(map[*viewInternal]time.Time), - timer: time.NewTicker(defaultReportingDuration), - c: make(chan command, 1024), - quit: make(chan bool), - done: make(chan bool), + measures: make(map[string]*measureRef), + views: make(map[string]*viewInternal), + viewStartTimes: make(map[*viewInternal]time.Time), + timer: time.NewTicker(defaultReportingDuration), + c: make(chan command, 1024), + quit: make(chan bool), + done: make(chan bool), exporters: make(map[Exporter]struct{}), } @@ -324,7 +324,7 @@ func (w *worker) tryRegisterView(v *View) (*viewInternal, error) { return x, nil } w.views[vi.view.Name] = vi - w.startTimes[vi] = time.Now() + w.viewStartTimes[vi] = time.Now() ref := w.getMeasureRef(vi.view.Measure.Name()) ref.views[vi] = struct{}{} return vi, nil @@ -334,7 +334,7 @@ func (w *worker) unregisterView(v *viewInternal) { w.mu.Lock() defer w.mu.Unlock() delete(w.views, v.view.Name) - delete(w.startTimes, v) + delete(w.viewStartTimes, v) if measure := w.measures[v.view.Measure.Name()]; measure != nil { delete(measure.views, v) } @@ -347,7 +347,7 @@ func (w *worker) reportView(v *viewInternal) { rows := v.collectedRows() viewData := &Data{ View: v.view, - Start: w.startTimes[v], + Start: w.viewStartTimes[v], End: time.Now(), Rows: rows, } @@ -371,15 +371,7 @@ func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric { return nil } - var startTime time.Time - if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 || - v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 { - startTime = time.Time{} - } else { - startTime = w.startTimes[v] - } - - return viewToMetric(v, w.r, now, startTime) + return viewToMetric(v, w.r, now) } // Read reads all view data and returns them as metrics. diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index 193aad0e2..fe8e35fc4 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "go.opencensus.io/resource" "go.opencensus.io/metric/metricdata" @@ -213,12 +214,12 @@ func Test_Worker_MultiExport(t *testing.T) { case *CountData: got := ts.Points[0].Value.(int64) if wantValue.Value != got { - t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue, got, ts, key) + t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue.Value, got, ts, key) } case *SumData: got := ts.Points[0].Value.(float64) if wantValue.Value != got { - t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue, got, ts, key) + t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue.Value, got, ts, key) } default: t.Errorf("Unexpected type of data: %T for %v in %q", wantValue, want[i], key) @@ -335,20 +336,22 @@ func Test_Worker_RecordFloat64(t *testing.T) { for _, w := range tc.wants { gotRows, err := RetrieveData(w.v.Name) + for i := range gotRows { + switch data := gotRows[i].Data.(type) { + case *CountData: + data.Start = time.Time{} + case *SumData: + data.Start = time.Time{} + case *DistributionData: + data.Start = time.Time{} + } + } if (err != nil) != (w.err != nil) { t.Fatalf("%s: RetrieveData(%v) = %v; want error = %v", tc.label, w.v.Name, err, w.err) } - for _, got := range gotRows { - if !containsRow(w.rows, got) { - t.Errorf("%s: got row %#v; want none", tc.label, got) - break - } - } - for _, want := range w.rows { - if !containsRow(gotRows, want) { - t.Errorf("%s: got none; want %#v'", tc.label, want) - break - } + if diff := cmp.Diff(gotRows, w.rows); diff != "" { + t.Errorf("%v: unexpected row (got-, want+): %s", tc.label, diff) + break } }