/
kafka-concurrent.controller.ts
71 lines (63 loc) 路 1.62 KB
/
kafka-concurrent.controller.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import {
Body,
Controller,
HttpCode,
Logger,
OnModuleDestroy,
OnModuleInit,
Post,
} from '@nestjs/common';
import { Client, ClientKafka, Transport } from '@nestjs/microservices';
import { PartitionerArgs } from 'kafkajs';
import { Observable } from 'rxjs';
import { SumDto } from './dto/sum.dto';
/**
* The following function explicitly sends messages to the key representing the partition.
*/
const explicitPartitioner = () => {
return ({ message }: PartitionerArgs) => {
return parseFloat(message.headers.toPartition.toString());
};
};
@Controller()
export class KafkaConcurrentController
implements OnModuleInit, OnModuleDestroy
{
protected readonly logger = new Logger(KafkaConcurrentController.name);
@Client({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
run: {
partitionsConsumedConcurrently: 3,
},
producer: {
createPartitioner: explicitPartitioner,
},
},
})
public readonly client: ClientKafka;
async onModuleInit() {
const requestPatterns = ['math.sum.sync.number.wait'];
requestPatterns.forEach(pattern => {
this.client.subscribeToResponseOf(pattern);
});
await this.client.connect();
}
async onModuleDestroy() {
await this.client.close();
}
@Post('mathSumSyncNumberWait')
@HttpCode(200)
public mathSumSyncNumberWait(@Body() data: SumDto): Observable<string> {
return this.client.send('math.sum.sync.number.wait', {
headers: {
toPartition: data.key.toString(),
},
key: data.key.toString(),
value: data.numbers,
});
}
}