Skip to content

Commit

Permalink
feat(microservices): migrate redis transporter to internally use iore…
Browse files Browse the repository at this point in the history
…dis package
  • Loading branch information
kamilmysliwiec committed May 17, 2022
1 parent 61bcb7f commit 40e2755
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 499 deletions.
2 changes: 1 addition & 1 deletion integration/microservices/e2e/disconnected-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('Disconnected client', () => {
.send({
transport: Transport.REDIS,
options: {
url: 'redis://localhost:3333',
port: '3333',
},
})
.expect(408);
Expand Down
5 changes: 3 additions & 2 deletions integration/microservices/src/disconnected.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
Controller,
InternalServerErrorException,
Post,
RequestTimeoutException,
RequestTimeoutException
} from '@nestjs/common';
import { ClientProxyFactory } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';
Expand All @@ -24,7 +24,8 @@ export class DisconnectedClientController {
return throwError(() =>
code === 'ECONNREFUSED' ||
code === 'CONN_ERR' ||
code === 'CONNECTION_REFUSED'
code === 'CONNECTION_REFUSED' ||
error.message === 'Connection is closed.'
? new RequestTimeoutException('ECONNREFUSED')
: new InternalServerErrorException(),
);
Expand Down
73 changes: 64 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
"@codechecks/client": "0.1.12",
"@commitlint/cli": "17.0.0",
"@commitlint/config-angular": "17.0.0",
"@fastify/cors": "7.0.0",
"@fastify/formbody": "6.0.0",
"@fastify/multipart": "6.0.0",
"@fastify/static": "5.0.0",
"@grpc/grpc-js": "1.6.7",
"@grpc/proto-loader": "0.6.12",
"@nestjs/apollo": "10.0.11",
Expand All @@ -89,7 +93,6 @@
"@types/http-errors": "1.8.2",
"@types/mocha": "9.1.1",
"@types/node": "17.0.34",
"@types/redis": "4.0.11",
"@types/reflect-metadata": "0.1.0",
"@types/sinon": "10.0.11",
"@types/socket.io": "3.0.2",
Expand Down Expand Up @@ -122,10 +125,6 @@
"eventsource": "2.0.2",
"fancy-log": "2.0.0",
"fastify": "3.29.0",
"@fastify/cors": "7.0.0",
"@fastify/formbody": "6.0.0",
"@fastify/multipart": "6.0.0",
"@fastify/static": "5.0.0",
"graphql": "15.8.0",
"graphql-tools": "8.2.9",
"gulp": "4.0.2",
Expand All @@ -137,6 +136,7 @@
"http-errors": "2.0.0",
"husky": "8.0.1",
"imports-loader": "3.1.1",
"ioredis": "5.0.4",
"json-loader": "0.5.7",
"kafkajs": "2.0.0",
"lerna": "3.0.0",
Expand All @@ -156,7 +156,6 @@
"nyc": "15.1.0",
"point-of-view": "5.3.0",
"prettier": "2.6.2",
"redis": "3.1.2",
"rxjs-compat": "6.6.7",
"sinon": "14.0.0",
"sinon-chai": "3.7.0",
Expand Down
97 changes: 32 additions & 65 deletions packages/microservices/client/client-redis.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,31 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import {
EmptyError,
fromEvent,
lastValueFrom,
merge,
Subject,
zip,
} from 'rxjs';
import { share, take, tap } from 'rxjs/operators';
import {
CONNECT_EVENT,
ECONNREFUSED,
ERROR_EVENT,
MESSAGE_EVENT,
REDIS_DEFAULT_URL,
REDIS_DEFAULT_HOST,
REDIS_DEFAULT_PORT,
} from '../constants';
import {
ClientOpts,
RedisClient,
RetryStrategyOptions,
} from '../external/redis.interface';
import { ReadPacket, RedisOptions, WritePacket } from '../interfaces';
import { ClientProxy } from './client-proxy';

let redisPackage: any = {};
type Redis = any;

let redisPackage = {} as any;

export class ClientRedis extends ClientProxy {
protected readonly logger = new Logger(ClientProxy.name);
protected readonly subscriptionsCount = new Map<string, number>();
protected readonly url: string;
protected pubClient: RedisClient;
protected subClient: RedisClient;
protected pubClient: Redis;
protected subClient: Redis;
protected connection: Promise<any>;
protected isExplicitlyTerminated = false;

constructor(protected readonly options: RedisOptions['options']) {
super();
this.url =
this.getOptionsProp(options, 'url') ||
(!this.getOptionsProp(options, 'host') && REDIS_DEFAULT_URL);

redisPackage = loadPackage('redis', ClientRedis.name, () =>
require('redis'),
redisPackage = loadPackage('ioredis', ClientRedis.name, () =>
require('ioredis'),
);

this.initializeSerializer(options);
Expand All @@ -64,73 +47,57 @@ export class ClientRedis extends ClientProxy {
this.isExplicitlyTerminated = true;
}

public connect(): Promise<any> {
public async connect(): Promise<any> {
if (this.pubClient && this.subClient) {
return this.connection;
}
const error$ = new Subject<Error>();

this.pubClient = this.createClient(error$);
this.subClient = this.createClient(error$);
this.pubClient = this.createClient();
this.subClient = this.createClient();
this.handleError(this.pubClient);
this.handleError(this.subClient);

const pubConnect$ = fromEvent(this.pubClient, CONNECT_EVENT);
const subClient$ = fromEvent(this.subClient, CONNECT_EVENT);
this.connection = Promise.all([
this.subClient.connect(),
this.pubClient.connect(),
]);
await this.connection;

this.connection = lastValueFrom(
merge(error$, zip(pubConnect$, subClient$)).pipe(
take(1),
tap(() =>
this.subClient.on(MESSAGE_EVENT, this.createResponseCallback()),
),
share(),
),
).catch(err => {
if (err instanceof EmptyError) {
return;
}
throw err;
});
this.subClient.on(MESSAGE_EVENT, this.createResponseCallback());
return this.connection;
}

public createClient(error$: Subject<Error>): RedisClient {
return redisPackage.createClient({
...this.getClientOptions(error$),
url: this.url,
public createClient(): Redis {
return new redisPackage({
host: REDIS_DEFAULT_HOST,
port: REDIS_DEFAULT_PORT,
...this.getClientOptions(),
lazyConnect: true,
});
}

public handleError(client: RedisClient) {
public handleError(client: Redis) {
client.addListener(ERROR_EVENT, (err: any) => this.logger.error(err));
}

public getClientOptions(error$: Subject<Error>): Partial<ClientOpts> {
const retry_strategy = (options: RetryStrategyOptions) =>
this.createRetryStrategy(options, error$);
public getClientOptions(): Partial<RedisOptions['options']> {
const retryStrategy = (times: number) => this.createRetryStrategy(times);

return {
...(this.options || {}),
retry_strategy,
retryStrategy,
};
}

public createRetryStrategy(
options: RetryStrategyOptions,
error$: Subject<Error>,
): undefined | number | Error {
if (options.error && (options.error as any).code === ECONNREFUSED) {
error$.error(options.error);
}
public createRetryStrategy(times: number): undefined | number {
if (this.isExplicitlyTerminated) {
return undefined;
}
if (
!this.getOptionsProp(this.options, 'retryAttempts') ||
options.attempt > this.getOptionsProp(this.options, 'retryAttempts')
times > this.getOptionsProp(this.options, 'retryAttempts')
) {
return new Error('Retry time exhausted');
this.logger.error('Retry time exhausted');
return;
}
return this.getOptionsProp(this.options, 'retryDelay') || 0;
}
Expand Down
5 changes: 4 additions & 1 deletion packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import { ROUTE_ARGS_METADATA } from '@nestjs/common/constants';

export const TCP_DEFAULT_PORT = 3000;
export const TCP_DEFAULT_HOST = 'localhost';
export const REDIS_DEFAULT_URL = 'redis://localhost:6379';

export const REDIS_DEFAULT_PORT = 6379;
export const REDIS_DEFAULT_HOST = 'localhost';

export const NATS_DEFAULT_URL = 'nats://localhost:4222';
export const MQTT_DEFAULT_URL = 'mqtt://localhost:1883';
export const GRPC_DEFAULT_URL = 'localhost:5000';
Expand Down

0 comments on commit 40e2755

Please sign in to comment.