Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass message to RecordReceivedMalformedEvent when validation fails #964

Open
raza-matillion opened this issue Oct 20, 2023 · 12 comments
Open

Comments

@raza-matillion
Copy link

raza-matillion commented Oct 20, 2023

We have encountered a scenario where we want to send all messages (that failed validation) to dead letter topic to be reviewed later. Currently I can't find a way to get the message if it fails validation. I think it will be useful if we pass binding.Message to ObservabilityService method RecordReceivedMalformedEvent which will enable us to log or send message to dead letter topic.

r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)

and
r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr)

@raza-matillion raza-matillion changed the title Pass message to RecordReceivedMalformedEvent when validation failed Pass message to RecordReceivedMalformedEvent when validation fails Oct 20, 2023
@embano1
Copy link
Member

embano1 commented Oct 21, 2023

Quick question: since invoker.go returns an error in such cases, why not send that event to a configured DLQ (this SDK is not opinionated on DLQs, so you can use whatever SDK/logic you want in your app). I have not seen many cases where the observability service is responsible for DLQ-ing, but I could be wrong here.

cc/ @duglin for thoughts

@duglin
Copy link
Contributor

duglin commented Oct 21, 2023

I like the idea of a DLQ that isn't tied to the ObservabilityService.

@raza-matillion
Copy link
Author

raza-matillion commented Oct 21, 2023

I agree with the idea. My plan is to use custom error handler in ObservabilityService that will handle the error as we intended. So the DLQ logic will not live in ObservabilityService.

func (o Observability) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
	o.ErrorHandler.Handle(ctx, m, err)
}

What I understand is, the invoker.go does not return error when we use client.StartReceive. Correct me if I am wrong.

@raza-matillion
Copy link
Author

raza-matillion commented Oct 21, 2023

The message is also passed as nil to the respFn otherwise I could use client.Responder instead of client.StartReceive

return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))

@embano1
Copy link
Member

embano1 commented Oct 22, 2023

So you don't know what the CloudEvent is upfront? I was assuming you know the to be sent CloudEvent and together with the validation error returned you can construct a DLQ message? Can you describe a common flow of how you/your users are using the SDK?

@raza-matillion
Copy link
Author

raza-matillion commented Oct 22, 2023

No I don't know cloudevent upfront. I am consuming cloudevent in a service which is produced by another service. I am assuming that producers will send cloudevent format. If producer sends a malformed cloudevent, I want to log error and pass the malformed message to DLQ. Now if the binding.ToEvent fails in invoker.go then I don't have a way to get hold on the malformed message so that I can pass it to DLQ to further investigation.

@embano1
Copy link
Member

embano1 commented Oct 22, 2023

I want to log error and pass the malformed message to DLQ

Shouldn't that be the responsibility of the producer/sender though? If they send a malformed event and you return an error, i.e., no ACK, it should be their (the sender's) responsibility to handle the error, no? I understand your scenario might be a special case then? I'm coming back to my initial feedback that using an observability service for DLQ capabilities could be considered an anti-pattern here. WDYT?

@raza-matillion
Copy link
Author

raza-matillion commented Oct 22, 2023

What I understand is, the sender sends and forget, and consumer try to process message and upon failure, send the message to DLQ.
I totally agree with your point that DLQ in the Observability Service is anti-pattern. My original request is to pass the binding.Message to RecordReceivedMalformedEvent in Observability Service. Here is the PR if you want to have a look
https://github.com/cloudevents/sdk-go/pull/965/files

@embano1
Copy link
Member

embano1 commented Oct 22, 2023

What I understand is, the sender sends and forget, and consumer try to process message and upon failure, send the message to DLQ.

With fire-and-forget, the sender is basically in at-most-once semantics land. So implementing DLQ on the consumer might be overkill from that perspective. If the sender uses at-least-once semantics, it needs to wait for a (N)ACK from the receiver.

Question: while not the full event/payload, do we log some metadata in the observability service for correlation purposes at all?

@raza-matillion
Copy link
Author

Just logging error message at the moment in observability service. I don't think I have access to any other info from the actual payload to log? Would be good to have some more info about payload as consumer is unaware the event/payload. Looking at another open issue, someone has encountered similar situation
#757

@embano1
Copy link
Member

embano1 commented Oct 22, 2023

Just logging error message at the moment in observability service. I don't think I have access to any other info from the actual payload to log?

Correct, we don't log any message detail.

Would be good to have some more info about payload as consumer is unaware the event/payload.

Agree, IMHO this is something we should improve. From that point, your PR sounds good to me (not the DLQ approach). However, I'm not an expert on the observability part and their could be issues with logging large, or malicious, payloads (which would be true today I guess since we record a successfully transformed event to observability service - yet, it's a risk).

This is the code in invoker.go which does not provide any information and I'd argue is almost useless since you don't know what really happened/there's no way to correlate.

	e, eventErr := binding.ToEvent(ctx, m)
	switch {
	case eventErr != nil && r.fn.hasEventIn:
		r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)
		return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr))
	case r.fn != nil:
		// Check if event is valid before invoking the receiver function
		if e != nil {
			if validationErr := e.Validate(); validationErr != nil {
				r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr)
				return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))
			}
		}

@raza-matillion
Copy link
Author

raza-matillion commented Oct 22, 2023

@embano1 You are absolutely right about this block of code. If you ignore my DLQ approach, the PR will still make this block of code useful. I would suggest to even pass binding.Message to respFn where we are passing nil. I can update my PR with this change if you agree.

respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants