Skip to content

Commit

Permalink
Passing message to record malformed event
Browse files Browse the repository at this point in the history
Signed-off-by: Muhammad Raza <muhammad.raza@matillion.com>
  • Loading branch information
raza-matillion committed Oct 23, 2023
1 parent 9c0547b commit 5e1a96b
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion observability/opencensus/v2/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (n fakeObservabilityServiceWithError) InboundContextDecorators() []func(con
return nil
}

func (n fakeObservabilityServiceWithError) RecordReceivedMalformedEvent(ctx context.Context, err error) {
func (n fakeObservabilityServiceWithError) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
}

func (n fakeObservabilityServiceWithError) RecordCallingInvoker(ctx context.Context, event *event.Event) (context.Context, func(errOrResult error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (o opencensusObservabilityService) InboundContextDecorators() []func(contex
return []func(context.Context, binding.Message) context.Context{tracePropagatorContextDecorator}
}

func (o opencensusObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) {
func (o opencensusObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
ctx, r := NewReporter(ctx, reportReceive)
r.Error()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (o OTelObservabilityService) InboundContextDecorators() []func(context.Cont
}

// RecordReceivedMalformedEvent records the error from a malformed event in the span.
func (o OTelObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) {
func (o OTelObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
spanName := observability.ClientSpanName + ".malformed receive"
_, span := o.tracer.Start(
ctx, spanName,
Expand Down
1 change: 1 addition & 0 deletions test/observability/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cloudevents/sdk-go/v2/extensions"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/cloudevents/sdk-go/v2/test"
)

var (
Expand Down Expand Up @@ -385,7 +386,7 @@ func TestRecordReceivedMalformedEvent(t *testing.T) {
os := otelObs.NewOTelObservabilityService()

// act
os.RecordReceivedMalformedEvent(ctx, tc.expectedResult)
os.RecordReceivedMalformedEvent(ctx, test.FullMessage(), tc.expectedResult)

spans := sr.Ended()

Expand Down
4 changes: 2 additions & 2 deletions v2/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p
e, eventErr := binding.ToEvent(ctx, m)
switch {
case eventErr != nil && r.fn.hasEventIn:
r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)
r.observabilityService.RecordReceivedMalformedEvent(ctx, m, eventErr)
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr))
case r.fn != nil:
// Check if event is valid before invoking the receiver function
if e != nil {
if validationErr := e.Validate(); validationErr != nil {
r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr)
r.observabilityService.RecordReceivedMalformedEvent(ctx, m, validationErr)
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))
}
}
Expand Down
5 changes: 3 additions & 2 deletions v2/client/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ObservabilityService interface {
InboundContextDecorators() []func(context.Context, binding.Message) context.Context

// RecordReceivedMalformedEvent is invoked when an event was received but it's malformed or invalid.
RecordReceivedMalformedEvent(ctx context.Context, err error)
RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error)
// RecordCallingInvoker is invoked before the user function is invoked.
// The returned callback will be invoked after the user finishes to process the event with the eventual processing error
// The error provided to the callback could be both a processing error, or a result
Expand All @@ -39,7 +39,8 @@ func (n noopObservabilityService) InboundContextDecorators() []func(context.Cont
return nil
}

func (n noopObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) {}
func (n noopObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
}

func (n noopObservabilityService) RecordCallingInvoker(ctx context.Context, event *event.Event) (context.Context, func(errOrResult error)) {
return ctx, func(errOrResult error) {}
Expand Down

0 comments on commit 5e1a96b

Please sign in to comment.