Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated status reporting via typed errors #8709

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 35 additions & 0 deletions component/componenterror/fatal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componenterror // import "go.opentelemetry.io/collector/consumer/consumererror"

import "errors"

// fatal indicates a component is an error state that it cannot recover from and the collector
// should terminate as a result
type fatal struct {
err error
}

// NewFatal wraps an error to indicate that a component has encountered a fatal error.
// i.e. it is in an error state that it cannot recover from and the collector should terminate.
func NewFatal(err error) error {
return fatal{err: err}
}

func (p fatal) Error() string {
return "Fatal error: " + p.err.Error()
}

// Unwrap returns the wrapped error for functions Is and As in standard package errors.
func (p fatal) Unwrap() error {
return p.err
}

// IsFatal checks if an error was wrapped with the NewFatal function.
func IsFatal(err error) bool {
if err == nil {
return false
}
return errors.As(err, &fatal{})
}
44 changes: 44 additions & 0 deletions component/componenterror/fatal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componenterror

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestIsFatal(t *testing.T) {
var err error
assert.False(t, IsFatal(err))

err = errors.New("testError")
assert.False(t, IsFatal(err))

err = NewFatal(err)
assert.True(t, IsFatal(err))

err = fmt.Errorf("%w", err)
assert.True(t, IsFatal(err))
}

func TestFatal_Unwrap(t *testing.T) {
var err error = testErrorType{"testError"}
require.False(t, IsFatal(err))

// Wrapping testErrorType err with fatal error.
fatalErr := NewFatal(err)
require.True(t, IsFatal(fatalErr))

target := testErrorType{}
require.NotEqual(t, err, target)

isTestErrorTypeWrapped := errors.As(fatalErr, &target)
require.True(t, isTestErrorTypeWrapped)

require.Equal(t, err, target)
}
34 changes: 34 additions & 0 deletions component/componenterror/permanent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componenterror // import "go.opentelemetry.io/collector/consumer/consumererror"

import "errors"

// permanent indicates a component is an error state that it cannot recover from
type permanent struct {
err error
}

// NewPermanent wraps an error to indicate that a component has encountered a permanent error.
// i.e. it is in an error state that it cannot recover from.
func NewPermanent(err error) error {
return permanent{err: err}
}

func (p permanent) Error() string {
return "Permanent error: " + p.err.Error()
}

// Unwrap returns the wrapped error for functions Is and As in standard package errors.
func (p permanent) Unwrap() error {
return p.err
}

// IsPermanent checks if an error was wrapped with the NewPermanent function.
func IsPermanent(err error) bool {
if err == nil {
return false
}
return errors.As(err, &permanent{})
}
52 changes: 52 additions & 0 deletions component/componenterror/permanent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componenterror

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type testErrorType struct {
s string
}

func (t testErrorType) Error() string {
return ""
}

func TestIsPermanent(t *testing.T) {
var err error
assert.False(t, IsPermanent(err))

err = errors.New("testError")
assert.False(t, IsPermanent(err))

err = NewPermanent(err)
assert.True(t, IsPermanent(err))

err = fmt.Errorf("%w", err)
assert.True(t, IsPermanent(err))
}

func TestPermanent_Unwrap(t *testing.T) {
var err error = testErrorType{"testError"}
require.False(t, IsPermanent(err))

// Wrapping testErrorType err with permanent error.
permanentErr := NewPermanent(err)
require.True(t, IsPermanent(permanentErr))

target := testErrorType{}
require.NotEqual(t, err, target)

isTestErrorTypeWrapped := errors.As(permanentErr, &target)
require.True(t, isTestErrorTypeWrapped)

require.Equal(t, err, target)
}
33 changes: 33 additions & 0 deletions component/componenterror/recoverable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componenterror // import "go.opentelemetry.io/collector/consumer/consumererror"

import "errors"

// recoverable indicates a component is a recoverable error state
type recoverable struct {
err error
}

// NewRecoverable wraps an error to indicate that a component has encountered a recoverable error.
func NewRecoverable(err error) error {
return recoverable{err: err}
}

func (p recoverable) Error() string {
return "Recoverable error: " + p.err.Error()
}

// Unwrap returns the wrapped error for functions Is and As in standard package errors.
func (p recoverable) Unwrap() error {
return p.err
}

// IsRecoverable checks if an error was wrapped with the NewRecoverable function.
func IsRecoverable(err error) bool {
if err == nil {
return false
}
return errors.As(err, &recoverable{})
}
44 changes: 44 additions & 0 deletions component/componenterror/recoverable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componenterror

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestIsRecoverable(t *testing.T) {
var err error
assert.False(t, IsRecoverable(err))

err = errors.New("testError")
assert.False(t, IsRecoverable(err))

err = NewRecoverable(err)
assert.True(t, IsRecoverable(err))

err = fmt.Errorf("%w", err)
assert.True(t, IsRecoverable(err))
}

func TestRecoverable_Unwrap(t *testing.T) {
var err error = testErrorType{"testError"}
require.False(t, IsRecoverable(err))

// Wrapping testErrorType err with recoverable error.
recoverableErr := NewRecoverable(err)
require.True(t, IsRecoverable(recoverableErr))

target := testErrorType{}
require.NotEqual(t, err, target)

isTestErrorTypeWrapped := errors.As(recoverableErr, &target)
require.True(t, isTestErrorTypeWrapped)

require.Equal(t, err, target)
}
25 changes: 25 additions & 0 deletions component/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package component // import "go.opentelemetry.io/collector/component"

import (
"time"

"go.opentelemetry.io/collector/component/componenterror"
)

type Status int32
Expand Down Expand Up @@ -98,6 +100,29 @@ func NewFatalErrorEvent(err error) *StatusEvent {
return ev
}

// NewEventFromError returns a StatusEvent with the following rules:
// - If the error is nil, it returns an event with StatusOK.
// - If the error is a componenterror, the event status will be the
// status matching the corresponding error type.
// - If the error is not a component error, the event will be created using
// the fallback status.
func NewEventFromError(err error, fallback Status) *StatusEvent {
switch {
case err == nil:
return NewStatusEvent(StatusOK)
case componenterror.IsRecoverable(err):
return NewRecoverableErrorEvent(err)
case componenterror.IsPermanent(err):
return NewPermanentErrorEvent(err)
case componenterror.IsFatal(err):
return NewFatalErrorEvent(err)
default:
ev := NewStatusEvent(fallback)
ev.err = err
return ev
}
}

// StatusFunc is the expected type of ReportComponentStatus for component.TelemetrySettings
type StatusFunc func(*StatusEvent) error

Expand Down
1 change: 1 addition & 0 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func TestComponentStatusWatcher(t *testing.T) {
// An unhealthy processor asynchronously reports a recoverable error.
expectedStatuses := []component.Status{
component.StatusStarting,
component.StatusOK,
component.StatusRecoverableError,
}

Expand Down
6 changes: 4 additions & 2 deletions processor/processortest/unhealthy_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ type unhealthyProcessor struct {
}

func (p unhealthyProcessor) Start(_ context.Context, _ component.Host) error {
go func() {
_ = p.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusRecoverableError))
defer func() {
go func() {
_ = p.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusRecoverableError))
}()
}()
return nil
}
13 changes: 2 additions & 11 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"gonum.org/v1/gonum/graph/topo"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -385,11 +386,7 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
continue
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))

if compErr := comp.Start(ctx, host); compErr != nil {
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
if compErr := comp.Start(ctx, host); compErr != nil && !componenterror.IsRecoverable(err) {
return compErr
}
}
Expand All @@ -416,16 +413,10 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
continue
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))

if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
continue
}

_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped))
}
return errs
}
Expand Down