Skip to content

Commit

Permalink
[processor/transform] Wire up metrics processing (#10100)
Browse files Browse the repository at this point in the history
* Started wiring up metrics

* Started messing with tests

* Add NumberDataPoint function tests

* Finished metric function tests

* Added processor tests

* Added another processor test

* change DataType Getter to string

* Update readme

* Update config and factory tests

* Add more test data

* Updated readme

* clean up

* Update readme

* ran make gotidy

* fix test data

* cleanup processors

* Remove ability to set data type

Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
TylerHelmuth and bogdandrutu committed May 18, 2022
1 parent 6efea3e commit 76d58ec
Show file tree
Hide file tree
Showing 17 changed files with 2,204 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@

### 💡 Enhancements 💡

- `transformprocessor`: Add transformation of metrics (#10100)
- `kubeletstatsreceiver`: Update receiver to use new Metrics Builder. All emitted metrics remain the same. (#9744)

### 🧰 Bug fixes 🧰
Expand Down
51 changes: 33 additions & 18 deletions processor/transformprocessor/README.md
@@ -1,19 +1,20 @@
# Transform Processor

| Status | |
| ------------------------ | ---------------- |
| Stability | [In development] |
| Supported pipeline types | traces, logs |
| Distributions | none |
| Status | |
| ------------------------ | --------------------- |
| Stability | [In development] |
| Supported pipeline types | traces, metrics, logs |
| Distributions | none |

The transform processor modifies telemetry based on configuration using the Telemetry Query Language.
The transform processor modifies telemetry based on configuration using the [Telemetry Query Language](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/processing.md#telemetry-query-language).
It takes a list of queries which are performed in the order specified in the config.

Queries are composed of the following parts
- Path expressions: Fields within the incoming data can be referenced using expressions composed of the names as defined
in the OTLP protobuf definition. e.g., `status.code`, `attributes["http.method"]`. If the path expression begins with
`resource.` or `instrumentation_library.`, it will reference those values.
`resource.` or `instrumentation_library.`, it will reference those values. For metrics, `name`, `description`, `unit`, `type`, `is_monotonic`, and `aggregation_temporality` are accessed via `metric.`
- The name `instrumentation_library` within OpenTelemetry is currently under discussion and may be changed in the future.
- Metric data types are `None`, `Gauge`, `Sum`, `Histogram`, `ExponentialHistogram`, and `Summary`
- Literals: Strings, ints, and floats can be referenced as literal values
- Function invocations: Functions can be invoked with arguments matching the function's expected arguments
- Where clause: Telemetry to modify can be filtered by appending `where a <op> b`, with `a` and `b` being any of the above.
Expand Down Expand Up @@ -46,11 +47,6 @@ exporters:

processors:
transform:
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(body, attributes["http.route"])
traces:
queries:
- set(status.code, 1) where attributes["http.path"] == "/health"
Expand All @@ -60,6 +56,18 @@ processors:
- limit(resource.attributes, 100)
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
metrics:
queries:
- set(metric.description, "Sum") where metric.type == "Sum"
- keep_keys(resource.attributes, "host.name")
- limit(attributes, 100)
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(body, attributes["http.route"])
service:
pipelines:
logs:
Expand All @@ -74,11 +82,6 @@ service:

This processor will perform the operations in order for

All logs

1) Set severity text to FAIL if the body contains a string text "request failed"
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
3) Set `body` to the `http.route` attribute if it is set

All spans

Expand All @@ -90,4 +93,16 @@ All spans
6) Truncate all span attributes such that no string value has more than 4096 characters.
7) Truncate all resource attributes such that no string value has more than 4096 characters.

[In development]: https://github.com/open-telemetry/opentelemetry-collector#in-development
All metrics and their data points

1) Set metric description to "Sum" if the metric type is "Sum"
2) Keep only the `host.name` resource attributes
4) Limit all data point attributes such that each data point has no more than 100 attributes.
6) Truncate all data point attributes such that no string value has more than 4096 characters.
7) Truncate all resource attributes such that no string value has more than 4096 characters.

All logs

1) Set severity text to FAIL if the body contains a string text "request failed"
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
3) Set `body` to the `http.route` attribute if it is set
26 changes: 13 additions & 13 deletions processor/transformprocessor/config.go
Expand Up @@ -18,41 +18,41 @@ import (
"go.opentelemetry.io/collector/config"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

type LogsConfig struct {
type SignalConfig struct {
Queries []string `mapstructure:"queries"`

// The functions that have been registered in the extension for logs processing.
functions map[string]interface{} `mapstructure:"-"`
}

type TracesConfig struct {
Queries []string `mapstructure:"queries"`

// The functions that have been registered in the extension for traces processing.
// The functions that have been registered in the extension for processing.
functions map[string]interface{} `mapstructure:"-"`
}

type Config struct {
config.ProcessorSettings `mapstructure:",squash"`

Logs LogsConfig `mapstructure:"logs"`
Traces TracesConfig `mapstructure:"traces"`
Logs SignalConfig `mapstructure:"logs"`
Traces SignalConfig `mapstructure:"traces"`
Metrics SignalConfig `mapstructure:"metrics"`
}

var _ config.Processor = (*Config)(nil)

func (c *Config) Validate() error {
var errors error
_, err := common.ParseQueries(c.Logs.Queries, c.Logs.functions, logs.ParsePath)
_, err := common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
_, err = common.ParseQueries(c.Metrics.Queries, c.Metrics.functions, metrics.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
_, err = common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
_, err = common.ParseQueries(c.Logs.Queries, c.Logs.functions, logs.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
Expand Down
37 changes: 27 additions & 10 deletions processor/transformprocessor/config_test.go
Expand Up @@ -24,8 +24,9 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
)

func TestLoadingConfig(t *testing.T) {
Expand All @@ -41,22 +42,30 @@ func TestLoadingConfig(t *testing.T) {
p0 := cfg.Processors[config.NewComponentID(typeStr)]
assert.Equal(t, p0, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Traces: SignalConfig{
Queries: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: logs.DefaultFunctions(),
functions: traces.DefaultFunctions(),
},
Traces: TracesConfig{
Metrics: SignalConfig{
Queries: []string{
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`set(metric.name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: traces.DefaultFunctions(),
},
Logs: SignalConfig{
Queries: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: logs.DefaultFunctions(),
},
})
}

Expand All @@ -67,19 +76,27 @@ func TestLoadInvalidConfig(t *testing.T) {
factory := NewFactory()
factories.Processors[typeStr] = factory

cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_log.yaml"), factories)
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_trace.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_log.yaml"), factories)
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_trace.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_trace.yaml"), factories)
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_metric.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_trace.yaml"), factories)
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_metric.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_log.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_log.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)
}
31 changes: 29 additions & 2 deletions processor/transformprocessor/factory.go
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)
Expand All @@ -39,22 +41,28 @@ func NewFactory() component.ProcessorFactory {
createDefaultConfig,
component.WithLogsProcessor(createLogsProcessor),
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
)
}

func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Logs: SignalConfig{
Queries: []string{},

functions: logs.DefaultFunctions(),
},
Traces: TracesConfig{
Traces: SignalConfig{
Queries: []string{},

functions: traces.DefaultFunctions(),
},
Metrics: SignalConfig{
Queries: []string{},

functions: metrics.DefaultFunctions(),
},
}
}

Expand Down Expand Up @@ -95,3 +103,22 @@ func createTracesProcessor(
proc.ProcessTraces,
processorhelper.WithCapabilities(processorCapabilities))
}

func createMetricsProcessor(
_ context.Context,
settings component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)

proc, err := metrics.NewProcessor(oCfg.Metrics.Queries, oCfg.Metrics.functions, settings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
proc.ProcessMetrics,
processorhelper.WithCapabilities(processorCapabilities))
}

0 comments on commit 76d58ec

Please sign in to comment.