Skip to content

Commit

Permalink
fix(core,websockets): use rxjs when checking if is an observable
Browse files Browse the repository at this point in the history
  • Loading branch information
micalevisk committed Apr 24, 2022
1 parent d51c728 commit bc0a1de
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
10 changes: 5 additions & 5 deletions packages/core/router/router-response-controller.ts
Expand Up @@ -5,9 +5,9 @@ import {
RequestMethod,
MessageEvent,
} from '@nestjs/common';
import { isFunction, isObject } from '@nestjs/common/utils/shared.utils';
import { isObject } from '@nestjs/common/utils/shared.utils';
import { IncomingMessage } from 'http';
import { EMPTY, lastValueFrom, Observable } from 'rxjs';
import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs';
import { catchError, debounce, map } from 'rxjs/operators';
import {
AdditionalHeaders,
Expand Down Expand Up @@ -64,7 +64,7 @@ export class RouterResponseController {
}

public async transformToResult(resultOrDeferred: any) {
if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) {
if (isObservable(resultOrDeferred)) {
return lastValueFrom(resultOrDeferred);
}
return resultOrDeferred;
Expand Down Expand Up @@ -152,8 +152,8 @@ export class RouterResponseController {
});
}

private assertObservable(result: any) {
if (!isFunction(result.subscribe)) {
private assertObservable(value: any) {
if (!isObservable(value)) {
throw new ReferenceError(
'You must return an Observable stream to use Server-Sent Events (SSE).',
);
Expand Down
11 changes: 8 additions & 3 deletions packages/websockets/web-sockets-controller.ts
@@ -1,9 +1,14 @@
import { Type } from '@nestjs/common/interfaces/type.interface';
import { Logger } from '@nestjs/common/services/logger.service';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { ApplicationConfig } from '@nestjs/core/application-config';
import { MetadataScanner } from '@nestjs/core/metadata-scanner';
import { from as fromPromise, Observable, of, Subject } from 'rxjs';
import {
from as fromPromise,
Observable,
isObservable,
of,
Subject,
} from 'rxjs';
import { distinctUntilChanged, mergeAll } from 'rxjs/operators';
import { GATEWAY_OPTIONS, PORT_METADATA } from './constants';
import { WsContextCreator } from './context/ws-context-creator';
Expand Down Expand Up @@ -158,7 +163,7 @@ export class WebSocketsController {
deferredResult: Promise<any>,
): Promise<Observable<any>> {
const result = await deferredResult;
if (result && isFunction(result.subscribe)) {
if (isObservable(result)) {
return result;
}
if (result instanceof Promise) {
Expand Down

0 comments on commit bc0a1de

Please sign in to comment.