Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(microservices): errors thrown during eachMessage execution should be passed to kafkajs #9293

Closed
wants to merge 53 commits into from

Conversation

davidschuette
Copy link
Contributor

@davidschuette davidschuette commented Mar 4, 2022

PR Checklist

Please check if your PR fulfills the following requirements:

PR Type

What kind of change does this PR introduce?

  • Bugfix
  • Feature
  • Code style update (formatting, local variables)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • CI related changes
  • Other... Please describe:

What is the current behavior?

Issue Number: #9283

What is the new behavior?

Errors thrown during kafka event processing are no longer being caught by the RcpProxy but passed to kafkajs's eachMessage.
KafkaContext provides function to get reference to consumer instance.

Does this PR introduce a breaking change?

  • Yes
  • No

If the previous functionality is required a custom interceptor can be used to catch the errors.

Other information

I did not add any tests for the reworked error handling. I will add them if the changes get approved.

Provide access to kafkajs commitOffset by using @ctx() decorator when handeling an event.

This commit closes Kafka commitOffsets nestjs#9283.
Error thrown by event handling method are no longer being caught by RcpExecptionFilter.
Instead errors are passed to kafkajs's eachMessage.
This results in proper interaction with kafka.

This commit closes Kafka commitOffsets nestjs#9283.
@coveralls
Copy link

coveralls commented Mar 4, 2022

Pull Request Test Coverage Report for Build a6202d37-e0ab-4c77-8747-24db9117807b

  • 93 of 103 (90.29%) changed or added relevant lines in 20 files are covered.
  • 72 unchanged lines in 8 files lost coverage.
  • Overall coverage increased (+0.1%) to 94.17%

Changes Missing Coverage Covered Lines Changed/Added Lines %
packages/microservices/client/client-tcp.ts 4 5 80.0%
packages/microservices/server/server-tcp.ts 2 3 66.67%
packages/microservices/server/server.ts 0 1 0.0%
packages/core/router/router-explorer.ts 9 11 81.82%
packages/microservices/server/server-mqtt.ts 0 2 0.0%
packages/microservices/helpers/tcp-socket.ts 29 32 90.63%
Files with Coverage Reduction New Missed Lines %
packages/microservices/client/client-kafka.ts 3 95.63%
packages/microservices/server/server-tcp.ts 4 76.74%
packages/core/router/router-explorer.ts 5 82.76%
packages/microservices/server/server-mqtt.ts 5 81.34%
packages/microservices/client/client-rmq.ts 6 84.87%
packages/microservices/client/client-tcp.ts 6 86.84%
packages/core/injector/module.ts 9 84.58%
packages/core/injector/injector.ts 34 74.13%
Totals Coverage Status
Change from base Build 0a7dce2a-d55f-4ed3-96fe-535d2466a616: 0.1%
Covered Lines: 5767
Relevant Lines: 6124

💛 - Coveralls

Provide access to native consumer of kafkajs using @ctx() decorator.
@davidschuette
Copy link
Contributor Author

@kamilmysliwiec Any thoughts on the new handling of errors in eachMessage? If this is approved I will add tests and update my PR for the docs.

@kamilmysliwiec
Copy link
Member

@davidschuette yeah this change makes a lot of sense, agree.

@davidschuette davidschuette changed the title Kafka commit offsets feat(microservices): Kafka commit offsets Mar 10, 2022
@davidschuette
Copy link
Contributor Author

@kamilmysliwiec I fixed the merge conflict. Is there anything else I need to do to get this PR approved?

@kamilmysliwiec
Copy link
Member

PR should be all set @davidschuette. I'll review it as soon as I can

@mastermatt
Copy link

I'm a fan of this PR, and I'd like to see it released ASAP. Unfortunately, I don't think it's enough to solve some scenarios. I've been hunting down an issue with how ServerKafka interacts with kafkjs's eachMessage when I found this PR. If my issue ends up being unrelated, I'll open a separate bug issue, but I'm pretty sure they're intertwined and the current change here is only a half-fix.
What I discovered was an inconsistency in what gets returned to eachMessage. What should happen, in all cases, is ServerKafka.handleMessage should return a Promise that resolves or rejects after awaiting all the user-land code in the controller and any interceptors etc. However, in the case of event handlers where an Observable is introduced into the response flow, the Promise returned to eachMessage resolves before any user-land code is run.
I believe the issue should be fixed here:

const resultOrStream = await handler(packet.data, context);
if (isObservable(resultOrStream)) {
const connectableSource = connectable(resultOrStream, {
connector: () => new Subject(),
resetOnDisconnect: false,
});
connectableSource.connect();
}

If handler returns a Promise, the await on line 118 will cause the resulting Promise of handleEvent async-ness to correctly wait and allow any errors to bubble up the stack. However, if an Observable is resolved from handler then handleEvent will return a Promise that resolves too quickly and never rejects.
Thanks to this little bit of code, simply adding Interceptors will cause any value returned from the controller to be wrapped in an Observable and cause this bug.

I think the solution is to add await lastValueFrom(connectableSource); after line 124 of Server.handleEvent, but my RxJS-foo is weak and could use a second opinion.

Comment on lines +52 to +53
// @ts-expect-error transport does not exist on type of config??
config.transport,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of a CustomStrategy, the transport needs to come from config.strategy.transportId

@davidschuette davidschuette changed the title feat(microservices): Kafka commit offsets feat(microservices): errors thrown during eachMessage execution should be passed to kafkajs May 1, 2022
dependabot bot and others added 8 commits May 1, 2022 22:02
Bumps [@nestjs/graphql](https://github.com/nestjs/graphql) from 10.0.3 to 10.0.4.
- [Release notes](https://github.com/nestjs/graphql/releases)
- [Changelog](https://github.com/nestjs/graphql/blob/master/CHANGELOG.md)
- [Commits](nestjs/graphql@v10.0.3...v10.0.4)

---
updated-dependencies:
- dependency-name: "@nestjs/graphql"
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Bumps [mongoose](https://github.com/Automattic/mongoose) from 6.2.2 to 6.2.3.
- [Release notes](https://github.com/Automattic/mongoose/releases)
- [Changelog](https://github.com/Automattic/mongoose/blob/master/CHANGELOG.md)
- [Commits](Automattic/mongoose@6.2.2...6.2.3)

---
updated-dependencies:
- dependency-name: mongoose
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Bumps [url-parse](https://github.com/unshiftio/url-parse) from 1.5.6 to 1.5.10.
- [Release notes](https://github.com/unshiftio/url-parse/releases)
- [Commits](unshiftio/url-parse@1.5.6...1.5.10)

---
updated-dependencies:
- dependency-name: url-parse
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
@davidschuette
Copy link
Contributor Author

davidschuette commented May 1, 2022

@mastermatt Thank you for your support on this PR.
I changed a few lines to fix the problem you described (see a09572e).
I don't know what git did with all these other commits showing up now... any help is appreciated.

@mastermatt
Copy link

I'm not quite following the expected behavior of removing the exceptionsHandler from the RPC Proxy logic.
@davidschuette can you clarify?
When I play around with this locally, the errors propagate to kafjajs' eachMessage by simplifying adding the await lastValueFrom(resultOrStream); line. However, when I add the other changes (KafkaRpcProxy) the error still makes it to kafkajs, but none of my custom exception filters fire.

And one other question, sorry for hijacking this thread, what should be returned from a filter implementing RpcExceptionFilter.catch to stop the error propagation? Or should an interceptor be used for that?

@davidschuette
Copy link
Contributor Author

davidschuette commented May 1, 2022

@mastermatt It looks like your fix is way easier than mine. I don't know how I missed that.

Do you know what this code is doing? I removed it and it doesn't break any tests.
Line 120 in packages/microservices/server/server.ts

const connectableSource = connectable(resultOrStream, {
  connector: () => new Subject(),
  resetOnDisconnect: false,
});
connectableSource.connect();

@davidschuette
Copy link
Contributor Author

@mastermatt Regarding your issue with filters. I will take a look at that problem tomorrow.

@mastermatt
Copy link

@davidschuette keeping in mind I don't have much RxJS experience, I think the connectableSource bit is there to kick off the observable if it's deferred. But since lastValueFrom calls subscribe, I think the connectableSource becomes redundant.
My local testing was also successful with only the changes from a09572e

@davidschuette
Copy link
Contributor Author

@mastermatt Can you give me a repo with the project that is giving you problems with filters?

@kamilmysliwiec
Copy link
Member

Btw @davidschuette the commit history of this PR got all messed up by pulling changes from master (instead of simply merging that branch). Can you rebase your changes?

@davidschuette
Copy link
Contributor Author

@kamilmysliwiec I will give it a try. Though I don't have much experience with rebasing.
The branch got messed up as a result of a rebase.

@gkampitakis
Copy link

Any updates on this ? Could we help somehow to have this pr merged 🤔 ?

@mastermatt
Copy link

Ignoring the current state of the rebase, let me take a stab at trying to summarize the current state of this change set:

a. Update Server#handleEvent to ensure any Observable is finished before resolving the async method.
a09572e
This change isn't error-handling specific, instead it just allows a Kafka message handler to either resolve or reject before kafkajs moves on to the next message in the batch.
For my own personal needs, this change did the trick, and I've been running a version of this in Production successfully for a few weeks now.

b. Add consumer getter to Kafka context.
b91803c
A nice feature add that allows consumers to manually interact with the kafkajs Consumer instance when some non-generic action is needed.

c. Add KafkaRpcProxy that bypasses error handling after a controller method throws an error (skips the Exception filters)
77b7bd6
This is the part I'm unsure about, because it's not clear from the docs what the expected behavior is in Nest.
@kamilmysliwiec can you clarify: what the expected (desired) behavior of Exception Filters for RPC event-style messages? In my experience, the only things that could happen, if the Controller throws, is to determine if/how the error should cause a retry, be ignored, or deliver to a dead-letter-queue.

@gkampitakis
Copy link

Really nice sum. Would it make sense to unblock this pr by splitting it into the prs that are more trivial and sure that are correct?

@mastermatt
Copy link

I would be happy to open a pr or two, if that would help, but at this point I don't think there is a blocker beyond simply not knowing the desired behavior. We need clarity on the intended design/architecture.

@kamilmysliwiec
Copy link
Member

we're tracking this here #9586

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet