Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed retries do not work in kafka. #4241

Closed
LaThortilla opened this issue Mar 6, 2020 · 6 comments
Closed

Failed retries do not work in kafka. #4241

LaThortilla opened this issue Mar 6, 2020 · 6 comments
Labels
needs triage This issue has not been looked into

Comments

@LaThortilla
Copy link

Bug Report

Current behavior

when an exception is generated in the consumption of an event, retries are not performed in the way that kafkajs does.

Input Code

Message Controller ./src/user.api.controller.ts

count = 0;
@MessagePattern ('user.error')
  userError (@Ctx () context: KafkaContext, @Payload () payload): any {
    if (this.count <= 3) {
      this.count ++;
      console.log ("Error emulated in the attempt" + this.count);
      throw "Error emulated in the attempt" + this.count;
    }
    else {
      console.log ("Error counter:" + this.count, "Message:", payload.value);
    }
  }

./main.ts

//init kafka microservice
  const microservice = app.connectMicroservice({
    transport: Transport.KAFKA,
    
    options:  {
      client: {
        retry: {
          retries: 50,
          initialRetryTime: 1,
          maxRetryTime: 2000,
        },
        brokers: ['localhost:9092'],
      },
    },
  });

Output:
run only one attempt

Error emulated in the attempt1 
[Nest] 39544   - 2020-03-06 1:01:19 PM   [RpcExceptionsHandler] Error emulated in the attempt1 +5025ms

Expected behavior

You must execute the configured attempts and then fail.

By executing this code using kafkajs easily you get the expected behavior:

index.js

 const { Kafka } = require('kafkajs')
 const kafka = new Kafka({
  clientId: 'kafka-test-client',
  brokers: ['localhost:9092']
 })
 const consumer = kafka.consumer({ groupId: 'user-consumer' });
 const run = async () => {
    let count = 0;
    // Consuming
    await consumer.connect()
    await consumer.subscribe({ topic: 'user.error'/*, fromBeginning: true */})
    await consumer.run({
      eachMessage: async ({ topic, resolveOffset, partition, message }) => {
        intentos++;
        if(intentos <= 3){
            console.log("Error emulated in the attemp "+ count);
            throw "Error emulated in the attemp "+ count;
        }else{
            console.log({
                partition,
                offset: message.offset,
                value: message.value.toString(),
              })
            return true;
        } 
      },
    })
  }
  run().catch((e)=>{console.error(e)});

output:

{"level":"INFO","timestamp":"2020-03-06T18:55:13.416Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"user-consumer"}
{"level":"INFO","timestamp":"2020-03-06T18:55:38.503Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"user-consumer","memberId":"kafka-test-client-7c1f223a-8727-4dc6-9908-4dc8ed0553e2","leaderId":"kafka-test-client-7c1f223a-8727-4dc6-9908-4dc8ed0553e2","isLeader":true,"memberAssignment":{"user.error":[0]},"groupProtocol":"RoundRobinAssigner","duration":25083}
Error emulated in the attemp 1
{"level":"ERROR","timestamp":"2020-03-06T18:55:38.539Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"user.error","partition":0,"offset":"5"}
Error emulated in the attemp 2
{"level":"ERROR","timestamp":"2020-03-06T18:55:38.809Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"user.error","partition":0,"offset":"5"}
Error emulated in the attemp 3
{"level":"ERROR","timestamp":"2020-03-06T18:55:39.350Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"user.error","partition":0,"offset":"5"}
{ partition: 0, offset: '5', value: 'Test Message' }

Attempts are shown in the output and then the message received.

Possible Solution

do not intercept the error that is generated within the function and return it to eachMessage

Environment


 "@nestjs/common": "^6.10.14",
    "@nestjs/config": "^0.2.3",
    "@nestjs/core": "^6.10.14",
    "@nestjs/microservices": "^6.11.7",
    "kafkajs": "^1.12.0",
@LaThortilla LaThortilla added the needs triage This issue has not been looked into label Mar 6, 2020
@LaThortilla
Copy link
Author

He found a solution that might or may not be the definitive solution.

@UseFilters must be implemented
https://docs.nestjs.com/exception-filters

inside the ExceptionFilter an error must be generated so that the error flow follows the same flow as with kafkajs

example ExceptionFilter:

@Catch()
export class NoExceptionFilter implements BaseRpcExceptionFilter {
    handleUnknownError(exception: any, status: string): Observable<never> {
        throw new Error("Method not implemented.");
    }
    isError(exception: any): exception is Error {
        throw new Error("Method not implemented.");
    }
    catch(exception: any, host: ArgumentsHost): Observable<any> {
        console.log("Capturando el error RPCException en filter");
        throw "El error esta capturado";
        //return throwError(exception.getError());
    }
}

How to implement it in each consumer:

count = 0;

@UseFilters(new NoExceptionFilter())
@MessagePattern ('user.error')
  userError (@Ctx () context: KafkaContext, @Payload () payload): any {
    if (this.count <= 3) {
      this.count ++;
      console.log ("Error emulated in the attempt" + this.count);
      throw "Error emulated in the attempt" + this.count;
    }
    else {
      console.log ("Error counter:" + this.count, "Message:", payload.value);
    }
  }

@ghost
Copy link

ghost commented Jan 26, 2022

Hi, Im dealing with the same issue and your solution actually doesnt work for me
Does anyone know how to solve this?

@tak1n
Copy link

tak1n commented Feb 18, 2022

We are facing the same issue and as well the solution outlined by @LaThortilla didn't work for us.

Tried following Filter:

import { ArgumentsHost, Catch } from '@nestjs/common';
import { BaseRpcExceptionFilter } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';

@Catch()
export class ThrowExceptionFilter implements BaseRpcExceptionFilter {
  handleUnknownError(exception: any, status: string): Observable<never> {
    throw new Error('Method not implemented.');
  }

  isError(exception: any): exception is Error {
    throw new Error('Method not implemented.');
  }

  catch(exception: any, host: ArgumentsHost): Observable<any> {
    console.log('Catched exception');
    console.dir(exception, { depth: null });
    throw new Error('Test Error');
    // return throwError(() => new Error('test Error'));
  }
}

Log:

[Nest] 1 - 02/18/2022, 9:26:25 AM LOG [NestFactory] Starting Nest application...
--
Fri, Feb 18 2022 10:26:25 am | [Nest] 1 - 02/18/2022, 9:26:25 AM LOG [InstanceLoader] TypedConfigModule dependencies initialized +105ms
Fri, Feb 18 2022 10:26:25 am | [Nest] 1 - 02/18/2022, 9:26:25 AM LOG [InstanceLoader] ConfigurationModule dependencies initialized +1ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [NestFactory] Starting Nest application... +4874ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [InstanceLoader] TypedConfigModule dependencies initialized +1ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [InstanceLoader] JwtModule dependencies initialized +0ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [InstanceLoader] ContextModule dependencies initialized +1ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [InstanceLoader] ConfigurationModule dependencies initialized +0ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [InstanceLoader] KafkaClientModule dependencies initialized +1ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [InstanceLoader] PrismaModule dependencies initialized +0ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [InstanceLoader] KeycloakModule dependencies initialized +1ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [InstanceLoader] KafkaModule dependencies initialized +0ms
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [ServerKafka] INFO [Consumer] Starting {"timestamp":"2022-02-18T09:26:25.775Z","logger":"kafkajs","groupId":"cam-server"}
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [ServerKafka] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2022-02-18T09:26:30.134Z","logger":"kafkajs","groupId":"cam-server","memberId":"cam-server-a5e73e6f-d65e-41a4-ab77-2c4fe802be04","leaderId":"cam-server-18830af7-fe61-48d8-98a1-b95d779dcb4e","isLeader":false,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":4358}
Fri, Feb 18 2022 10:26:30 am | [Nest] 1 - 02/18/2022, 9:26:30 AM LOG [NestMicroservice] Nest microservice successfully started +1ms
Fri, Feb 18 2022 10:26:30 am | Kafka App is listening
Fri, Feb 18 2022 10:27:35 am | [Nest] 1 - 02/18/2022, 9:27:35 AM ERROR [ServerKafka] ERROR [Connection] Response Heartbeat(key: 12, version: 3) {"timestamp":"2022-02-18T09:27:35.210Z","logger":"kafkajs","broker":"kafka-cluster-kafka-0.kafka-cluster-kafka-brokers.kafka.svc:9092","clientId":"cam-server","error":"The group is rebalancing, so a rejoin is needed","correlationId":16,"size":10}
Fri, Feb 18 2022 10:27:35 am | [Nest] 1 - 02/18/2022, 9:27:35 AM ERROR [ServerKafka] ERROR [Runner] The group is rebalancing, re-joining {"timestamp":"2022-02-18T09:27:35.214Z","logger":"kafkajs","groupId":"cam-server","memberId":"cam-server-a5e73e6f-d65e-41a4-ab77-2c4fe802be04","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":291}
Fri, Feb 18 2022 10:27:35 am | [Nest] 1 - 02/18/2022, 9:27:35 AM LOG [ServerKafka] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2022-02-18T09:27:35.228Z","logger":"kafkajs","groupId":"cam-server","memberId":"cam-server-a5e73e6f-d65e-41a4-ab77-2c4fe802be04","leaderId":"cam-server-a5e73e6f-d65e-41a4-ab77-2c4fe802be04","isLeader":true,"memberAssignment":{"keycloak":[0]},"groupProtocol":"RoundRobinAssigner","duration":13}
Fri, Feb 18 2022 10:29:56 am | Catched exception
Fri, Feb 18 2022 10:29:56 am | NotFoundException: User with id xxx not found
Fri, Feb 18 2022 10:29:56 am | at KeycloakService.getUser (/home/node/app/dist/keycloak/keycloak.service.js:350:19)
Fri, Feb 18 2022 10:29:56 am | at processTicksAndRejections (node:internal/process/task_queues:96:5)
Fri, Feb 18 2022 10:29:56 am | at async UsersService.buildUserPayload (/home/node/app/dist/users/users.service.js:43:22)
Fri, Feb 18 2022 10:29:56 am | at async UsersService.handleRegisterEvent (/home/node/app/dist/users/users.service.js:27:29)
Fri, Feb 18 2022 10:29:56 am | at async UsersKafkaController.handleKeycloakEvent (/home/node/app/dist/users/users.kafka.controller.js:26:13) {
Fri, Feb 18 2022 10:29:56 am | response: {
Fri, Feb 18 2022 10:29:56 am | statusCode: 404,
Fri, Feb 18 2022 10:29:56 am | message: 'User with id xxx not found',
Fri, Feb 18 2022 10:29:56 am | error: 'Not Found'
Fri, Feb 18 2022 10:29:56 am | },
Fri, Feb 18 2022 10:29:56 am | status: 404
Fri, Feb 18 2022 10:29:56 am | }

Without the custom filter the RpcExceptionsHandler is catching the exception:

[Nest] 1 - 02/18/2022, 10:45:08 AM ERROR [RpcExceptionsHandler] User with id xxx not found
--
Fri, Feb 18 2022 11:45:08 am | NotFoundException: User with id xxx not found
Fri, Feb 18 2022 11:45:08 am | at KeycloakService.getUser (/home/node/app/dist/keycloak/keycloak.service.js:350:19)
Fri, Feb 18 2022 11:45:08 am | at processTicksAndRejections (node:internal/process/task_queues:96:5)
Fri, Feb 18 2022 11:45:08 am | at async UsersService.buildUserPayload (/home/node/app/dist/users/users.service.js:43:22)
Fri, Feb 18 2022 11:45:08 am | at async UsersService.handleRegisterEvent (/home/node/app/dist/users/users.service.js:27:29)
Fri, Feb 18 2022 11:45:08 am | at async UsersKafkaController.handleKeycloakEvent (/home/node/app/dist/users/users.kafka.controller.js:25:13)

We are using the event-based approach for our Kafka consumers https://docs.nestjs.com/microservices/basics#event. I will try to make a reproduction repo in the next few days.

@AKCELEPATOP
Copy link

AKCELEPATOP commented Apr 11, 2022

Hi. The main problem in Nest's standard event handling. For observable it's creating connectable, which does not throw an error in the context of the eachMessage function of kafkajs. You should create custom transport thats extending ServerKafka and override handleEvent method.
Example:

export class AppServerKafka
  extends ServerKafka
  implements CustomTransportStrategy
{
  public async handleEvent(
    pattern: string,
    packet: ReadPacket,
    context: BaseRpcContext,
  ): Promise<any> {
    const handler = this.getHandlerByPattern(pattern);
    if (!handler) {
      return this.logger.error(
        `${NO_EVENT_HANDLER} Event pattern: ${JSON.stringify(pattern)}.`,
      );
    }

    const resultOrStream = await handler(packet.data, context);
    if (isObservable(resultOrStream)) {
      if (handler.extras?.durable) {
        return lastValueFrom(resultOrStream); // throw error here
      }

      const connectableSource = connectable(resultOrStream, {
        connector: () => new Subject(),
        resetOnDisconnect: false,
      });
      connectableSource.connect();
    }
  }
}

...

@EventPattern('<topic>', Transport.KAFKA, {
    durable: true,
})

...

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
       strategy: new AppServerKafka(<kafka_options>),
    },
);

@mastermatt
Copy link

Related to #9293

@DmitryValko
Copy link

I use the latest version now and have the same error
nestjs v 9.3.10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage This issue has not been looked into
Projects
None yet
Development

No branches or pull requests

5 participants