Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Record a Start Time Per Time Series within a View #1220

Merged
merged 8 commits into from Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions plugin/ocgrpc/client_stats_handler_test.go
Expand Up @@ -318,6 +318,9 @@ func TestClientDefaultCollections(t *testing.T) {
t.Errorf("%q: RetrieveData(%q) = %v", tc.label, wantData.v().Name, err)
continue
}
for i := range gotRows {
view.ClearStart(gotRows[i].Data)
}

for _, gotRow := range gotRows {
if !containsRow(wantData.rows, gotRow) {
Expand Down
4 changes: 4 additions & 0 deletions plugin/ocgrpc/server_stats_handler_test.go
Expand Up @@ -305,6 +305,10 @@ func TestServerDefaultCollections(t *testing.T) {
continue
}

for i := range gotRows {
view.ClearStart(gotRows[i].Data)
}

for _, gotRow := range gotRows {
if !containsRow(wantData.rows, gotRow) {
t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow)
Expand Down
6 changes: 6 additions & 0 deletions plugin/ochttp/route_test.go
Expand Up @@ -53,6 +53,9 @@ func TestWithRouteTag(t *testing.T) {
view.Unregister(v) // trigger exporting

got := e.rowsForView("request_total")
for i := range got {
view.ClearStart(got[i].Data)
}
want := []*view.Row{
{Data: &view.CountData{Value: 1}, Tags: []tag.Tag{{Key: ochttp.KeyServerRoute, Value: "/a/"}}},
}
Expand Down Expand Up @@ -90,6 +93,9 @@ func TestSetRoute(t *testing.T) {
view.Unregister(v) // trigger exporting

got := e.rowsForView("request_total")
for i := range got {
view.ClearStart(got[i].Data)
}
want := []*view.Row{
{Data: &view.CountData{Value: 1}, Tags: []tag.Tag{{Key: ochttp.KeyServerRoute, Value: "/a/"}}},
}
Expand Down
18 changes: 10 additions & 8 deletions stats/view/aggregation.go
Expand Up @@ -15,6 +15,8 @@

package view

import "time"

// AggType represents the type of aggregation function used on a View.
type AggType int

Expand Down Expand Up @@ -45,20 +47,20 @@ type Aggregation struct {
Type AggType // Type is the AggType of this Aggregation.
Buckets []float64 // Buckets are the bucket endpoints if this Aggregation represents a distribution, see Distribution.

newData func() AggregationData
newData func(time.Time) AggregationData
}

var (
aggCount = &Aggregation{
Type: AggTypeCount,
newData: func() AggregationData {
return &CountData{}
newData: func(t time.Time) AggregationData {
return &CountData{Start: t}
},
}
aggSum = &Aggregation{
Type: AggTypeSum,
newData: func() AggregationData {
return &SumData{}
newData: func(t time.Time) AggregationData {
return &SumData{Start: t}
},
}
)
Expand Down Expand Up @@ -103,8 +105,8 @@ func Distribution(bounds ...float64) *Aggregation {
Type: AggTypeDistribution,
Buckets: bounds,
}
agg.newData = func() AggregationData {
return newDistributionData(agg)
agg.newData = func(t time.Time) AggregationData {
return newDistributionData(agg, t)
}
return agg
}
Expand All @@ -114,7 +116,7 @@ func Distribution(bounds ...float64) *Aggregation {
func LastValue() *Aggregation {
return &Aggregation{
Type: AggTypeLastValue,
newData: func() AggregationData {
newData: func(_ time.Time) AggregationData {
return &LastValueData{}
},
}
Expand Down
55 changes: 49 additions & 6 deletions stats/view/aggregation_data.go
Expand Up @@ -31,6 +31,7 @@ type AggregationData interface {
clone() AggregationData
equal(other AggregationData) bool
toPoint(t metricdata.Type, time time.Time) metricdata.Point
StartTime() time.Time
}

const epsilon = 1e-9
Expand All @@ -40,6 +41,7 @@ const epsilon = 1e-9
//
// Most users won't directly access count data.
type CountData struct {
Start time.Time
Value int64
}

Expand All @@ -50,7 +52,7 @@ func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time)
}

func (a *CountData) clone() AggregationData {
return &CountData{Value: a.Value}
return &CountData{Value: a.Value, Start: a.Start}
}

func (a *CountData) equal(other AggregationData) bool {
Expand All @@ -59,7 +61,7 @@ func (a *CountData) equal(other AggregationData) bool {
return false
}

return a.Value == a2.Value
return a.Start.Equal(a2.Start) && a.Value == a2.Value
}

func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
Expand All @@ -71,11 +73,17 @@ func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.
}
}

// StartTime returns the start time of the data being aggregated by CountData.
func (a *CountData) StartTime() time.Time {
return a.Start
}

// SumData is the aggregated data for the Sum aggregation.
// A sum aggregation processes data and sums up the recordings.
//
// Most users won't directly access sum data.
type SumData struct {
Start time.Time
Value float64
}

Expand All @@ -86,15 +94,15 @@ func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
}

func (a *SumData) clone() AggregationData {
return &SumData{Value: a.Value}
return &SumData{Value: a.Value, Start: a.Start}
}

func (a *SumData) equal(other AggregationData) bool {
a2, ok := other.(*SumData)
if !ok {
return false
}
return math.Pow(a.Value-a2.Value, 2) < epsilon
return a.Start.Equal(a2.Start) && math.Pow(a.Value-a2.Value, 2) < epsilon
}

func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
Expand All @@ -108,6 +116,11 @@ func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Po
}
}

// StartTime returns the start time of the data being aggregated by SumData.
func (a *SumData) StartTime() time.Time {
return a.Start
}

// DistributionData is the aggregated data for the
// Distribution aggregation.
//
Expand All @@ -126,16 +139,18 @@ type DistributionData struct {
// an exemplar for the associated bucket, or nil.
ExemplarsPerBucket []*metricdata.Exemplar
bounds []float64 // histogram distribution of the values
Start time.Time
}

func newDistributionData(agg *Aggregation) *DistributionData {
func newDistributionData(agg *Aggregation, t time.Time) *DistributionData {
bucketCount := len(agg.Buckets) + 1
return &DistributionData{
CountPerBucket: make([]int64, bucketCount),
ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
bounds: agg.Buckets,
Min: math.MaxFloat64,
Max: math.SmallestNonzeroFloat64,
Start: t,
}
}

Expand Down Expand Up @@ -226,7 +241,11 @@ func (a *DistributionData) equal(other AggregationData) bool {
return false
}
}
return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
return a.Start.Equal(a2.Start) &&
a.Count == a2.Count &&
a.Min == a2.Min &&
a.Max == a2.Max &&
math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
}

func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
Expand Down Expand Up @@ -256,6 +275,11 @@ func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metr
}
}

// StartTime returns the start time of the data being aggregated by DistributionData.
func (a *DistributionData) StartTime() time.Time {
return a.Start
}

// LastValueData returns the last value recorded for LastValue aggregation.
type LastValueData struct {
Value float64
Expand Down Expand Up @@ -291,3 +315,22 @@ func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricd
panic("unsupported metricdata.Type")
}
}

// StartTime returns an empty time value as start time is not recorded when using last value
// aggregation.
func (l *LastValueData) StartTime() time.Time {
return time.Time{}
}

// ClearStart clears the Start field from data if present. Useful for testing in cases where the
// start time will be nondeterministic.
func ClearStart(data AggregationData) {
switch data := data.(type) {
case *CountData:
data.Start = time.Time{}
case *SumData:
data.Start = time.Time{}
case *DistributionData:
data.Start = time.Time{}
}
}
4 changes: 2 additions & 2 deletions stats/view/aggregation_data_test.go
Expand Up @@ -29,7 +29,7 @@ func TestDataClone(t *testing.T) {
agg := &Aggregation{
Buckets: []float64{1, 2, 3, 4},
}
dist := newDistributionData(agg)
dist := newDistributionData(agg, time.Time{})
dist.Count = 7
dist.Max = 11
dist.Min = 1
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestDistributionData_addSample(t *testing.T) {
agg := &Aggregation{
Buckets: []float64{1, 2},
}
dd := newDistributionData(agg)
dd := newDistributionData(agg, time.Time{})
attachments1 := map[string]interface{}{"key1": "value1"}
t1 := time.Now()
dd.addSample(0.5, attachments1, t1)
Expand Down
2 changes: 1 addition & 1 deletion stats/view/collector.go
Expand Up @@ -35,7 +35,7 @@ type collector struct {
func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) {
aggregator, ok := c.signatures[s]
if !ok {
aggregator = c.a.newData()
aggregator = c.a.newData(t)
c.signatures[s] = aggregator
}
aggregator.addSample(v, attachments, t)
Expand Down