Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(microservices): errors thrown during eachMessage execution should be passed to kafkajs #9293

Closed
wants to merge 53 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
5b24f81
feat(microservices): add commit offset to kafka context
davidschuette Mar 4, 2022
77b7bd6
feat(microservices): dont catch errors thrown by kafka event handling
davidschuette Mar 4, 2022
c229a75
fix(microservices): fix import path
davidschuette Mar 4, 2022
b91803c
feat(microservices): add consumer getter to kafka context
davidschuette Mar 7, 2022
0b93404
refactor(microservices): simplify function call
davidschuette Mar 10, 2022
37d83ee
test(microservices): add tests for kafka-rpc-proxy
davidschuette Mar 10, 2022
50cfa23
chore(deps-dev): bump @nestjs/graphql from 10.0.3 to 10.0.4
dependabot[bot] Feb 21, 2022
d0a22ae
docs(websockets): update GatewayMetadata pingTimeout default value to…
yevgeniypak Feb 21, 2022
06a4b0b
docs(websockets): add numeric separators to numeric literals
yevgeniypak Feb 22, 2022
61272da
chore(deps): update dependency @babel/cli to v7.17.6
renovate-bot Feb 22, 2022
8cf980c
chore(deps-dev): bump mongoose from 6.2.2 to 6.2.3
dependabot[bot] Feb 22, 2022
d454c7f
chore(deps): update dependency @nestjs/schematics to v8.0.7
renovate-bot Feb 28, 2022
9a89f50
chore(deps): bump url-parse from 1.5.6 to 1.5.10
dependabot[bot] Feb 28, 2022
14b55f8
feat(microservices): Add options param to serializer
guiruiz Feb 15, 2022
838adbb
fix(common): Providing typing for optional error handling in middleware
brian-pinwheel Feb 23, 2022
1b2b69b
feat(common): extend streamable-file header support
davidschuette Feb 21, 2022
02d1766
feat(common): add length header to streamable-file
davidschuette Feb 23, 2022
d54f539
feat(core): support optional dependencies in factory providers
kamilmysliwiec Feb 18, 2022
63285c6
fix(core): fix optional factory provider condition
kamilmysliwiec Feb 21, 2022
ac81c2f
test(): add more tests around optional factory dependency
kamilmysliwiec Feb 21, 2022
05d3eb3
feat(common): add abstract type to catch decorator
jeean Feb 16, 2022
d4cadfa
fix(core): use class refs as keys (container)
micalevisk Feb 28, 2022
560b95c
test(integration): add tests for self-injections providers
micalevisk Feb 28, 2022
55ca49c
feat(common): Add custom versioning support
adworacz Nov 16, 2021
58aa164
chore(): minor formatting changes
kamilmysliwiec Mar 1, 2022
05b63ac
fix(microservices): remove options object from packets (rmq and mqtt)
kamilmysliwiec Mar 1, 2022
1e4b970
fix(microservices): mqtt shared subscription support
tensoar Sep 22, 2021
69b9363
test(microservices): change mqtt docker image
tensoar Jan 2, 2022
f187eed
test(microservices): Open websocket in mosquitto
tensoar Jan 2, 2022
fb8f5e4
fix(core): address compilation errors
kamilmysliwiec Mar 1, 2022
20409a0
refactor(microservices): move tcp socket logic to an abstract class
jeanbmar Jan 30, 2022
9e76e72
feat(microservices): allow use of custom tcp sockets
jeanbmar Jan 30, 2022
71381cb
chore(@nestjs) publish v8.4.0 release
kamilmysliwiec Mar 1, 2022
0093ad2
chore(): update package.json and readmes
kamilmysliwiec Mar 1, 2022
2b7564e
fix(gateways): add the socket.io types dependency
capitantrueno Feb 24, 2022
417af63
fix(gateways): add the socket.io dependency
capitantrueno Feb 24, 2022
ce31cfe
sample(23-code-first): add missing directive declaration
kamilmysliwiec Mar 3, 2022
22ac987
chore: add .devcontainer to gitignore
Mar 5, 2022
eb1abb1
chore(deps): update dependency @nestjs/schematics to v8.0.8
renovate-bot Mar 8, 2022
89a2dc7
chore(deps): update dependency mysql to v8.0.28
renovate-bot Mar 14, 2022
365532e
chore(deps): update dependency @types/node to v16.11.26
renovate-bot Mar 14, 2022
38f08ca
fix(core): use context module for nested transient providers
kamilmysliwiec Mar 11, 2022
f08ed93
feat: allow custom log formatters
Mar 10, 2022
50c54d7
refactor: extract colorize
Mar 11, 2022
b7ea263
feat: extract protected method for pid
Mar 11, 2022
aeb3158
fix(microservices): tcp client parallel connections issue
kamilmysliwiec Mar 7, 2022
8fca491
feat(common): do not use color in CLI if not supported
jonahsnider Mar 1, 2022
f4b6eea
fix(common): fix condition
jonahsnider Mar 1, 2022
a79ac03
style(common): format
jonahsnider Mar 1, 2022
3f59c09
style(common): format
jonahsnider Mar 2, 2022
2fb8bf8
fix(core): apply global middleware to routes excluded from prefix
kamilmysliwiec Mar 14, 2022
75eb3f0
chore(@nestjs) publish v8.4.1 release
kamilmysliwiec Mar 14, 2022
a09572e
fix(microservices): fix handling of errors when using observables
davidschuette May 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/microservices/context/kafka-rpc-proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Observable } from 'rxjs';
import { RpcExceptionsHandler } from '../exceptions/rpc-exceptions-handler';
import { RpcProxy } from './rpc-proxy';

export class KafkaRpcProxy extends RpcProxy {
public create(
targetCallback: (...args: unknown[]) => Promise<Observable<any>>,
exceptionsHandler: RpcExceptionsHandler,
): (...args: unknown[]) => Promise<Observable<unknown>> {
return (...args: unknown[]) => {
return targetCallback(...args);
};
}
}
14 changes: 13 additions & 1 deletion packages/microservices/ctx-host/kafka.context.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { KafkaMessage } from '../external/kafka.interface';
import { BaseRpcContext } from './base-rpc.context';

type KafkaContextArgs = [KafkaMessage, number, string];
type KafkaContextArgs = [
message: KafkaMessage,
partition: number,
topic: string,
commitOffset: () => Promise<void>,
];

export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
constructor(args: KafkaContextArgs) {
Expand All @@ -28,4 +33,11 @@ export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
getTopic() {
return this.args[2];
}

/**
* Commit the offset of this message.
*/
commitOffset() {
return this.args[3]();
}
davidschuette marked this conversation as resolved.
Show resolved Hide resolved
}
11 changes: 9 additions & 2 deletions packages/microservices/microservices-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import { PipesContextCreator } from '@nestjs/core/pipes/pipes-context-creator';
import { ClientProxyFactory } from './client';
import { ClientsContainer } from './container';
import { ExceptionFiltersContext } from './context/exception-filters-context';
import { KafkaRpcProxy } from './context/kafka-rpc-proxy';
import { RpcContextCreator } from './context/rpc-context-creator';
import { RpcProxy } from './context/rpc-proxy';
import { Transport } from './enums';
import { CustomTransportStrategy } from './interfaces';
import { ListenersController } from './listeners-controller';
import { Server } from './server/server';
Expand All @@ -23,8 +25,13 @@ export class MicroservicesModule {
private readonly clientsContainer = new ClientsContainer();
private listenersController: ListenersController;

public register(container: NestContainer, config: ApplicationConfig) {
const rpcProxy = new RpcProxy();
public register(
container: NestContainer,
config: ApplicationConfig,
transport?: Transport,
) {
const rpcProxy =
transport === Transport.KAFKA ? new KafkaRpcProxy() : new RpcProxy();
const exceptionFiltersContext = new ExceptionFiltersContext(
container,
config,
Expand Down
7 changes: 6 additions & 1 deletion packages/microservices/nest-microservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ export class NestMicroservice
) {
super(container);

this.microservicesModule.register(container, this.applicationConfig);
this.microservicesModule.register(
container,
this.applicationConfig,
// @ts-expect-error transport does not exist on type of config??
config.transport,
Comment on lines +52 to +53

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of a CustomStrategy, the transport needs to come from config.strategy.transportId

);
this.createServer(config);
this.selectContextModule();
}
Expand Down
9 changes: 8 additions & 1 deletion packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { isNil } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import {
KAFKA_DEFAULT_BROKER,
KAFKA_DEFAULT_CLIENT,
Expand Down Expand Up @@ -162,6 +161,14 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
rawMessage,
payload.partition,
payload.topic,
() =>
this.consumer.commitOffsets([
{
offset: (parseInt(payload.message.offset, 10) + 1).toString(),
partition: payload.partition,
topic: payload.topic,
},
]),
]);
// if the correlation id or reply topic is not set
// then this is an event (events could still have correlation id)
Expand Down
14 changes: 12 additions & 2 deletions packages/microservices/test/ctx-host/kafka.context.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { KafkaContext } from '../../ctx-host';
import { KafkaMessage } from '../../external/kafka.interface';

describe('KafkaContext', () => {
const args = ['test', { test: true }];
const testFunc = sinon.spy();
const args = ['test', { test: true }, undefined, testFunc];
let context: KafkaContext;

beforeEach(() => {
context = new KafkaContext(args as [KafkaMessage, number, string]);
context = new KafkaContext(
args as [KafkaMessage, number, string, () => Promise<void>],
);
});
describe('getTopic', () => {
it('should return topic', () => {
Expand All @@ -24,4 +28,10 @@ describe('KafkaContext', () => {
expect(context.getMessage()).to.be.eql(args[0]);
});
});
describe('commitOffset', () => {
it('should be called once', () => {
context.commitOffset();
expect(testFunc.called).to.be.true;
});
});
});