New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(microservices): errors thrown during eachMessage execution should be passed to kafkajs #9293
Conversation
Provide access to kafkajs commitOffset by using @ctx() decorator when handeling an event. This commit closes Kafka commitOffsets nestjs#9283.
Error thrown by event handling method are no longer being caught by RcpExecptionFilter. Instead errors are passed to kafkajs's eachMessage. This results in proper interaction with kafka. This commit closes Kafka commitOffsets nestjs#9283.
Provide access to native consumer of kafkajs using @ctx() decorator.
@kamilmysliwiec Any thoughts on the new handling of errors in |
@davidschuette yeah this change makes a lot of sense, agree. |
@kamilmysliwiec I fixed the merge conflict. Is there anything else I need to do to get this PR approved? |
PR should be all set @davidschuette. I'll review it as soon as I can |
I'm a fan of this PR, and I'd like to see it released ASAP. Unfortunately, I don't think it's enough to solve some scenarios. I've been hunting down an issue with how nest/packages/microservices/server/server.ts Lines 118 to 125 in 26e9ed6
If handler returns a Promise, the await on line 118 will cause the resulting Promise of handleEvent async-ness to correctly wait and allow any errors to bubble up the stack. However, if an Observable is resolved from handler then handleEvent will return a Promise that resolves too quickly and never rejects.Thanks to this little bit of code, simply adding Interceptors will cause any value returned from the controller to be wrapped in an Observable and cause this bug. I think the solution is to add |
// @ts-expect-error transport does not exist on type of config?? | ||
config.transport, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of a CustomStrategy
, the transport needs to come from config.strategy.transportId
Bumps [@nestjs/graphql](https://github.com/nestjs/graphql) from 10.0.3 to 10.0.4. - [Release notes](https://github.com/nestjs/graphql/releases) - [Changelog](https://github.com/nestjs/graphql/blob/master/CHANGELOG.md) - [Commits](nestjs/graphql@v10.0.3...v10.0.4) --- updated-dependencies: - dependency-name: "@nestjs/graphql" dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com>
Bumps [mongoose](https://github.com/Automattic/mongoose) from 6.2.2 to 6.2.3. - [Release notes](https://github.com/Automattic/mongoose/releases) - [Changelog](https://github.com/Automattic/mongoose/blob/master/CHANGELOG.md) - [Commits](Automattic/mongoose@6.2.2...6.2.3) --- updated-dependencies: - dependency-name: mongoose dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com>
Bumps [url-parse](https://github.com/unshiftio/url-parse) from 1.5.6 to 1.5.10. - [Release notes](https://github.com/unshiftio/url-parse/releases) - [Commits](unshiftio/url-parse@1.5.6...1.5.10) --- updated-dependencies: - dependency-name: url-parse dependency-type: indirect ... Signed-off-by: dependabot[bot] <support@github.com>
9147851
to
a09572e
Compare
@mastermatt Thank you for your support on this PR. |
I'm not quite following the expected behavior of removing the And one other question, sorry for hijacking this thread, what should be returned from a filter implementing |
@mastermatt It looks like your fix is way easier than mine. I don't know how I missed that. Do you know what this code is doing? I removed it and it doesn't break any tests. const connectableSource = connectable(resultOrStream, {
connector: () => new Subject(),
resetOnDisconnect: false,
});
connectableSource.connect(); |
@mastermatt Regarding your issue with filters. I will take a look at that problem tomorrow. |
@davidschuette keeping in mind I don't have much RxJS experience, I think the |
@mastermatt Can you give me a repo with the project that is giving you problems with filters? |
Btw @davidschuette the commit history of this PR got all messed up by pulling changes from |
@kamilmysliwiec I will give it a try. Though I don't have much experience with rebasing. |
Any updates on this ? Could we help somehow to have this pr merged 🤔 ? |
Ignoring the current state of the rebase, let me take a stab at trying to summarize the current state of this change set: a. Update b. Add consumer getter to Kafka context. c. Add |
Really nice sum. Would it make sense to unblock this pr by splitting it into the prs that are more trivial and sure that are correct? |
I would be happy to open a pr or two, if that would help, but at this point I don't think there is a blocker beyond simply not knowing the desired behavior. We need clarity on the intended design/architecture. |
we're tracking this here #9586 |
PR Checklist
Please check if your PR fulfills the following requirements:
PR Type
What kind of change does this PR introduce?
What is the current behavior?
Issue Number: #9283
What is the new behavior?
Errors thrown during kafka event processing are no longer being caught by the
RcpProxy
but passed to kafkajs'seachMessage
.KafkaContext
provides function to get reference to consumer instance.Does this PR introduce a breaking change?
If the previous functionality is required a custom interceptor can be used to catch the errors.
Other information
I did not add any tests for the reworked error handling. I will add them if the changes get approved.