forked from nestjs/nest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka-parser.ts
61 lines (53 loc) 路 1.58 KB
/
kafka-parser.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import { isNil } from '@nestjs/common/utils/shared.utils';
import { KafkaParserConfig } from '../interfaces';
export class KafkaParser {
protected readonly keepBinary: boolean;
constructor(config?: KafkaParserConfig) {
this.keepBinary = (config && config.keepBinary) || false;
}
public parse<T = any>(data: any): T {
// Duplicate the object to not modify the original one (would break KafkaJS retries)
const result = {
...data,
headers: { ...data.headers },
};
if (!this.keepBinary) {
result.value = this.decode(data.value);
}
if (!isNil(data.key)) {
result.key = this.decode(data.key);
}
if (!isNil(data.headers)) {
const decodeHeaderByKey = (key: string) => {
result.headers[key] = this.decode(data.headers[key]);
};
Object.keys(data.headers).forEach(decodeHeaderByKey);
} else {
result.headers = {};
}
return result;
}
public decode(value: Buffer): object | string | null | Buffer {
if (isNil(value)) {
return null;
}
// A value with the "leading zero byte" indicates the schema payload.
// The "content" is possibly binary and should not be touched & parsed.
if (
Buffer.isBuffer(value) &&
value.length > 0 &&
value.readUInt8(0) === 0
) {
return value;
}
let result = value.toString();
const startChar = result.charAt(0);
// only try to parse objects and arrays
if (startChar === '{' || startChar === '[') {
try {
result = JSON.parse(value.toString());
} catch (e) {}
}
return result;
}
}