Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Record a Start Time Per Time Series within a View (#1220)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-mi committed Jul 14, 2020
1 parent 8e242ed commit af30f77
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 149 deletions.
3 changes: 3 additions & 0 deletions plugin/ocgrpc/client_stats_handler_test.go
Expand Up @@ -318,6 +318,9 @@ func TestClientDefaultCollections(t *testing.T) {
t.Errorf("%q: RetrieveData(%q) = %v", tc.label, wantData.v().Name, err)
continue
}
for i := range gotRows {
view.ClearStart(gotRows[i].Data)
}

for _, gotRow := range gotRows {
if !containsRow(wantData.rows, gotRow) {
Expand Down
4 changes: 4 additions & 0 deletions plugin/ocgrpc/server_stats_handler_test.go
Expand Up @@ -305,6 +305,10 @@ func TestServerDefaultCollections(t *testing.T) {
continue
}

for i := range gotRows {
view.ClearStart(gotRows[i].Data)
}

for _, gotRow := range gotRows {
if !containsRow(wantData.rows, gotRow) {
t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow)
Expand Down
6 changes: 6 additions & 0 deletions plugin/ochttp/route_test.go
Expand Up @@ -53,6 +53,9 @@ func TestWithRouteTag(t *testing.T) {
view.Unregister(v) // trigger exporting

got := e.rowsForView("request_total")
for i := range got {
view.ClearStart(got[i].Data)
}
want := []*view.Row{
{Data: &view.CountData{Value: 1}, Tags: []tag.Tag{{Key: ochttp.KeyServerRoute, Value: "/a/"}}},
}
Expand Down Expand Up @@ -90,6 +93,9 @@ func TestSetRoute(t *testing.T) {
view.Unregister(v) // trigger exporting

got := e.rowsForView("request_total")
for i := range got {
view.ClearStart(got[i].Data)
}
want := []*view.Row{
{Data: &view.CountData{Value: 1}, Tags: []tag.Tag{{Key: ochttp.KeyServerRoute, Value: "/a/"}}},
}
Expand Down
18 changes: 10 additions & 8 deletions stats/view/aggregation.go
Expand Up @@ -15,6 +15,8 @@

package view

import "time"

// AggType represents the type of aggregation function used on a View.
type AggType int

Expand Down Expand Up @@ -45,20 +47,20 @@ type Aggregation struct {
Type AggType // Type is the AggType of this Aggregation.
Buckets []float64 // Buckets are the bucket endpoints if this Aggregation represents a distribution, see Distribution.

newData func() AggregationData
newData func(time.Time) AggregationData
}

var (
aggCount = &Aggregation{
Type: AggTypeCount,
newData: func() AggregationData {
return &CountData{}
newData: func(t time.Time) AggregationData {
return &CountData{Start: t}
},
}
aggSum = &Aggregation{
Type: AggTypeSum,
newData: func() AggregationData {
return &SumData{}
newData: func(t time.Time) AggregationData {
return &SumData{Start: t}
},
}
)
Expand Down Expand Up @@ -103,8 +105,8 @@ func Distribution(bounds ...float64) *Aggregation {
Type: AggTypeDistribution,
Buckets: bounds,
}
agg.newData = func() AggregationData {
return newDistributionData(agg)
agg.newData = func(t time.Time) AggregationData {
return newDistributionData(agg, t)
}
return agg
}
Expand All @@ -114,7 +116,7 @@ func Distribution(bounds ...float64) *Aggregation {
func LastValue() *Aggregation {
return &Aggregation{
Type: AggTypeLastValue,
newData: func() AggregationData {
newData: func(_ time.Time) AggregationData {
return &LastValueData{}
},
}
Expand Down
55 changes: 49 additions & 6 deletions stats/view/aggregation_data.go
Expand Up @@ -31,6 +31,7 @@ type AggregationData interface {
clone() AggregationData
equal(other AggregationData) bool
toPoint(t metricdata.Type, time time.Time) metricdata.Point
StartTime() time.Time
}

const epsilon = 1e-9
Expand All @@ -40,6 +41,7 @@ const epsilon = 1e-9
//
// Most users won't directly access count data.
type CountData struct {
Start time.Time
Value int64
}

Expand All @@ -50,7 +52,7 @@ func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time)
}

func (a *CountData) clone() AggregationData {
return &CountData{Value: a.Value}
return &CountData{Value: a.Value, Start: a.Start}
}

func (a *CountData) equal(other AggregationData) bool {
Expand All @@ -59,7 +61,7 @@ func (a *CountData) equal(other AggregationData) bool {
return false
}

return a.Value == a2.Value
return a.Start.Equal(a2.Start) && a.Value == a2.Value
}

func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
Expand All @@ -71,11 +73,17 @@ func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.
}
}

// StartTime returns the start time of the data being aggregated by CountData.
func (a *CountData) StartTime() time.Time {
return a.Start
}

// SumData is the aggregated data for the Sum aggregation.
// A sum aggregation processes data and sums up the recordings.
//
// Most users won't directly access sum data.
type SumData struct {
Start time.Time
Value float64
}

Expand All @@ -86,15 +94,15 @@ func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
}

func (a *SumData) clone() AggregationData {
return &SumData{Value: a.Value}
return &SumData{Value: a.Value, Start: a.Start}
}

func (a *SumData) equal(other AggregationData) bool {
a2, ok := other.(*SumData)
if !ok {
return false
}
return math.Pow(a.Value-a2.Value, 2) < epsilon
return a.Start.Equal(a2.Start) && math.Pow(a.Value-a2.Value, 2) < epsilon
}

func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
Expand All @@ -108,6 +116,11 @@ func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Po
}
}

// StartTime returns the start time of the data being aggregated by SumData.
func (a *SumData) StartTime() time.Time {
return a.Start
}

// DistributionData is the aggregated data for the
// Distribution aggregation.
//
Expand All @@ -126,16 +139,18 @@ type DistributionData struct {
// an exemplar for the associated bucket, or nil.
ExemplarsPerBucket []*metricdata.Exemplar
bounds []float64 // histogram distribution of the values
Start time.Time
}

func newDistributionData(agg *Aggregation) *DistributionData {
func newDistributionData(agg *Aggregation, t time.Time) *DistributionData {
bucketCount := len(agg.Buckets) + 1
return &DistributionData{
CountPerBucket: make([]int64, bucketCount),
ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
bounds: agg.Buckets,
Min: math.MaxFloat64,
Max: math.SmallestNonzeroFloat64,
Start: t,
}
}

Expand Down Expand Up @@ -226,7 +241,11 @@ func (a *DistributionData) equal(other AggregationData) bool {
return false
}
}
return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
return a.Start.Equal(a2.Start) &&
a.Count == a2.Count &&
a.Min == a2.Min &&
a.Max == a2.Max &&
math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
}

func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
Expand Down Expand Up @@ -256,6 +275,11 @@ func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metr
}
}

// StartTime returns the start time of the data being aggregated by DistributionData.
func (a *DistributionData) StartTime() time.Time {
return a.Start
}

// LastValueData returns the last value recorded for LastValue aggregation.
type LastValueData struct {
Value float64
Expand Down Expand Up @@ -291,3 +315,22 @@ func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricd
panic("unsupported metricdata.Type")
}
}

// StartTime returns an empty time value as start time is not recorded when using last value
// aggregation.
func (l *LastValueData) StartTime() time.Time {
return time.Time{}
}

// ClearStart clears the Start field from data if present. Useful for testing in cases where the
// start time will be nondeterministic.
func ClearStart(data AggregationData) {
switch data := data.(type) {
case *CountData:
data.Start = time.Time{}
case *SumData:
data.Start = time.Time{}
case *DistributionData:
data.Start = time.Time{}
}
}
4 changes: 2 additions & 2 deletions stats/view/aggregation_data_test.go
Expand Up @@ -29,7 +29,7 @@ func TestDataClone(t *testing.T) {
agg := &Aggregation{
Buckets: []float64{1, 2, 3, 4},
}
dist := newDistributionData(agg)
dist := newDistributionData(agg, time.Time{})
dist.Count = 7
dist.Max = 11
dist.Min = 1
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestDistributionData_addSample(t *testing.T) {
agg := &Aggregation{
Buckets: []float64{1, 2},
}
dd := newDistributionData(agg)
dd := newDistributionData(agg, time.Time{})
attachments1 := map[string]interface{}{"key1": "value1"}
t1 := time.Now()
dd.addSample(0.5, attachments1, t1)
Expand Down
2 changes: 1 addition & 1 deletion stats/view/collector.go
Expand Up @@ -35,7 +35,7 @@ type collector struct {
func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) {
aggregator, ok := c.signatures[s]
if !ok {
aggregator = c.a.newData()
aggregator = c.a.newData(t)
c.signatures[s] = aggregator
}
aggregator.addSample(v, attachments, t)
Expand Down

0 comments on commit af30f77

Please sign in to comment.