Skip to content

Commit

Permalink
WIP: Wrap everything
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Oct 24, 2023
1 parent c0a39e7 commit 1946829
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 22 deletions.
25 changes: 25 additions & 0 deletions .chloggen/core-exporters-status.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: This PR adds a WithStatusReporting option to exporterhelper to opt-in to automatic status reporting and updates the core exporters to use it.
å
# One or more tracking issues or pull requests related to the change
issues: [7682]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
25 changes: 25 additions & 0 deletions component/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package component // import "go.opentelemetry.io/collector/component"

import (
"time"

"go.opentelemetry.io/collector/component/componenterror"
)

type Status int32
Expand Down Expand Up @@ -98,6 +100,29 @@ func NewFatalErrorEvent(err error) *StatusEvent {
return ev
}

// NewEventFromError returns a StatusEvent with the following rules:
// - If the error is nil, it returns an event with StatusOK.
// - If the error is a componenterror, the event status will be the
// status matching the corresponding error type.
// - If the error is not a component error, the event will be created using
// the fallback status.
func NewEventFromError(err error, fallback Status) *StatusEvent {
switch {
case err == nil:
return NewStatusEvent(StatusOK)
case componenterror.IsRecoverable(err):
return NewRecoverableErrorEvent(err)
case componenterror.IsPermanent(err):
return NewPermanentErrorEvent(err)
case componenterror.IsFatal(err):
return NewFatalErrorEvent(err)
default:
ev := NewStatusEvent(fallback)
ev.err = err
return ev
}
}

// StatusFunc is the expected type of ReportComponentStatus for component.TelemetrySettings
type StatusFunc func(*StatusEvent) error

Expand Down
1 change: 1 addition & 0 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func TestComponentStatusWatcher(t *testing.T) {
// An unhealthy processor asynchronously reports a recoverable error.
expectedStatuses := []component.Status{
component.StatusStarting,
component.StatusOK,
component.StatusRecoverableError,
}

Expand Down
6 changes: 4 additions & 2 deletions processor/processortest/unhealthy_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ type unhealthyProcessor struct {
}

func (p unhealthyProcessor) Start(_ context.Context, _ component.Host) error {
go func() {
_ = p.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusRecoverableError))
defer func() {
go func() {
_ = p.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusRecoverableError))
}()
}()
return nil
}
13 changes: 2 additions & 11 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"gonum.org/v1/gonum/graph/topo"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -385,11 +386,7 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
continue
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))

if compErr := comp.Start(ctx, host); compErr != nil {
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
if compErr := comp.Start(ctx, host); compErr != nil && !componenterror.IsRecoverable(err) {
return compErr
}
}
Expand All @@ -416,16 +413,10 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
continue
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))

if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
continue
}

_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped))
}
return errs
}
Expand Down
55 changes: 46 additions & 9 deletions service/internal/graph/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/components"
statuswrappers "go.opentelemetry.io/collector/service/internal/status/wrappers"
)

const (
Expand Down Expand Up @@ -82,19 +83,28 @@ func (n *receiverNode) buildComponent(ctx context.Context,
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers))
comp, err := builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers))
if err == nil {
n.Component = statuswrappers.WrapComponent(comp, &set.TelemetrySettings)
}
case component.DataTypeMetrics:
var consumers []consumer.Metrics
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Metrics))
}
n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers))
comp, err := builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers))
if err == nil {
n.Component = statuswrappers.WrapComponent(comp, &set.TelemetrySettings)
}
case component.DataTypeLogs:
var consumers []consumer.Logs
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers))
comp, err := builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers))
if err == nil {
n.Component = statuswrappers.WrapComponent(comp, &set.TelemetrySettings)
}
default:
return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType)
}
Expand Down Expand Up @@ -138,11 +148,20 @@ func (n *processorNode) buildComponent(ctx context.Context,
var err error
switch n.pipelineID.Type() {
case component.DataTypeTraces:
n.Component, err = builder.CreateTraces(ctx, set, next.(consumer.Traces))
comp, err := builder.CreateTraces(ctx, set, next.(consumer.Traces))
if err == nil {
n.Component = statuswrappers.WrapTracesProcessor(comp, &set.TelemetrySettings)
}
case component.DataTypeMetrics:
n.Component, err = builder.CreateMetrics(ctx, set, next.(consumer.Metrics))
comp, err := builder.CreateMetrics(ctx, set, next.(consumer.Metrics))
if err == nil {
n.Component = statuswrappers.WrapMetricsProcessor(comp, &set.TelemetrySettings)
}
case component.DataTypeLogs:
n.Component, err = builder.CreateLogs(ctx, set, next.(consumer.Logs))
comp, err := builder.CreateLogs(ctx, set, next.(consumer.Logs))
if err == nil {
n.Component = statuswrappers.WrapLogsProcessor(comp, &set.TelemetrySettings)
}
default:
return fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, n.pipelineID, n.pipelineID.Type())
}
Expand Down Expand Up @@ -186,11 +205,20 @@ func (n *exporterNode) buildComponent(
var err error
switch n.pipelineType {
case component.DataTypeTraces:
n.Component, err = builder.CreateTraces(ctx, set)
comp, err := builder.CreateTraces(ctx, set)
if err == nil {
n.Component = statuswrappers.WrapTracesExporter(comp, &set.TelemetrySettings)
}
case component.DataTypeMetrics:
n.Component, err = builder.CreateMetrics(ctx, set)
comp, err := builder.CreateMetrics(ctx, set)
if err == nil {
n.Component = statuswrappers.WrapMetricsExporter(comp, &set.TelemetrySettings)
}
case component.DataTypeLogs:
n.Component, err = builder.CreateLogs(ctx, set)
comp, err := builder.CreateLogs(ctx, set)
if err == nil {
n.Component = statuswrappers.WrapLogsExporter(comp, &set.TelemetrySettings)
}
default:
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType)
}
Expand Down Expand Up @@ -252,6 +280,7 @@ func (n *connectorNode) buildComponent(
if err != nil {
return err
}
conn = statuswrappers.WrapTracesConnector(conn, &set.TelemetrySettings)
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
Expand All @@ -261,12 +290,14 @@ func (n *connectorNode) buildComponent(
if err != nil {
return err
}
conn = statuswrappers.WrapMetricsConnector(conn, &set.TelemetrySettings)
n.Component, n.baseConsumer = conn, conn
case component.DataTypeLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err
}
conn = statuswrappers.WrapLogsConnector(conn, &set.TelemetrySettings)
n.Component, n.baseConsumer = conn, conn
}

Expand All @@ -285,12 +316,14 @@ func (n *connectorNode) buildComponent(
if err != nil {
return err
}
conn = statuswrappers.WrapTracesConnector(conn, &set.TelemetrySettings)
n.Component, n.baseConsumer = conn, conn
case component.DataTypeMetrics:
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err
}
conn = statuswrappers.WrapMetricsConnector(conn, &set.TelemetrySettings)
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
Expand All @@ -300,6 +333,7 @@ func (n *connectorNode) buildComponent(
if err != nil {
return err
}
conn = statuswrappers.WrapLogsConnector(conn, &set.TelemetrySettings)
n.Component, n.baseConsumer = conn, conn
}
case component.DataTypeLogs:
Expand All @@ -317,18 +351,21 @@ func (n *connectorNode) buildComponent(
if err != nil {
return err
}
conn = statuswrappers.WrapTracesConnector(conn, &set.TelemetrySettings)
n.Component, n.baseConsumer = conn, conn
case component.DataTypeMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err
}
conn = statuswrappers.WrapMetricsConnector(conn, &set.TelemetrySettings)
n.Component, n.baseConsumer = conn, conn
case component.DataTypeLogs:
conn, err := builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err
}
conn = statuswrappers.WrapLogsConnector(conn, &set.TelemetrySettings)
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
Expand Down
47 changes: 47 additions & 0 deletions service/internal/status/wrappers/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package wrappers // import "go.opentelemetry.io/collector/service/internal/status/wrappers"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
)

type componentWrapper struct {
component component.Component
telemetry *component.TelemetrySettings
}

func (cw *componentWrapper) Start(ctx context.Context, host component.Host) error {
_ = cw.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting))
err := cw.component.Start(ctx, host)
_ = cw.telemetry.ReportComponentStatus(
component.NewEventFromError(err, component.StatusPermanentError),
)
if err != nil && !componenterror.IsRecoverable(err) {
return err
}
return nil
}

func (cw *componentWrapper) Shutdown(ctx context.Context) error {
cw.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping))
if err := cw.component.Shutdown(ctx); err != nil {
_ = cw.telemetry.ReportComponentStatus(
component.NewEventFromError(err, component.StatusPermanentError),
)
return err
}
_ = cw.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped))
return nil
}

func WrapComponent(c component.Component, telemetry *component.TelemetrySettings) component.Component {
return &componentWrapper{
component: c,
telemetry: telemetry,
}
}
64 changes: 64 additions & 0 deletions service/internal/status/wrappers/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package wrappers // import "go.opentelemetry.io/collector/service/internal/status/wrappers"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
)

type logsConsumer struct {
consumer.Logs
telemetry *component.TelemetrySettings
}

func (lc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
err := lc.Logs.ConsumeLogs(ctx, ld)
_ = lc.telemetry.ReportComponentStatus(
component.NewEventFromError(err, component.StatusRecoverableError))
return err
}

func WrapLogsConsumer(lc consumer.Logs, telemetry *component.TelemetrySettings) consumer.Logs {
return &logsConsumer{
Logs: lc,
telemetry: telemetry,
}
}

func WrapLogsExporter(le exporter.Logs, telemetry *component.TelemetrySettings) exporter.Logs {
return &struct {
component.Component
consumer.Logs
}{
Component: WrapComponent(le, telemetry),
Logs: WrapLogsConsumer(le, telemetry),
}
}

func WrapLogsProcessor(lp processor.Logs, telemetry *component.TelemetrySettings) processor.Logs {
return &struct {
component.Component
consumer.Logs
}{
Component: WrapComponent(lp, telemetry),
Logs: WrapLogsConsumer(lp, telemetry),
}
}

func WrapLogsConnector(lc connector.Logs, telemetry *component.TelemetrySettings) connector.Logs {
return &struct {
component.Component
consumer.Logs
}{
Component: WrapComponent(lc, telemetry),
Logs: WrapLogsConsumer(lc, telemetry),
}
}

0 comments on commit 1946829

Please sign in to comment.