diff --git a/packages/core/router/router-response-controller.ts b/packages/core/router/router-response-controller.ts index 008eea5a85d..4128e7fcab5 100644 --- a/packages/core/router/router-response-controller.ts +++ b/packages/core/router/router-response-controller.ts @@ -5,9 +5,9 @@ import { RequestMethod, MessageEvent, } from '@nestjs/common'; -import { isFunction, isObject } from '@nestjs/common/utils/shared.utils'; +import { isObject } from '@nestjs/common/utils/shared.utils'; import { IncomingMessage } from 'http'; -import { EMPTY, lastValueFrom, Observable } from 'rxjs'; +import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs'; import { catchError, debounce, map } from 'rxjs/operators'; import { AdditionalHeaders, @@ -64,7 +64,7 @@ export class RouterResponseController { } public async transformToResult(resultOrDeferred: any) { - if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) { + if (isObservable(resultOrDeferred)) { return lastValueFrom(resultOrDeferred); } return resultOrDeferred; @@ -152,8 +152,8 @@ export class RouterResponseController { }); } - private assertObservable(result: any) { - if (!isFunction(result.subscribe)) { + private assertObservable(value: any) { + if (!isObservable(value)) { throw new ReferenceError( 'You must return an Observable stream to use Server-Sent Events (SSE).', ); diff --git a/packages/websockets/web-sockets-controller.ts b/packages/websockets/web-sockets-controller.ts index 75e790c8277..d01811e179b 100644 --- a/packages/websockets/web-sockets-controller.ts +++ b/packages/websockets/web-sockets-controller.ts @@ -1,9 +1,14 @@ import { Type } from '@nestjs/common/interfaces/type.interface'; import { Logger } from '@nestjs/common/services/logger.service'; -import { isFunction } from '@nestjs/common/utils/shared.utils'; import { ApplicationConfig } from '@nestjs/core/application-config'; import { MetadataScanner } from '@nestjs/core/metadata-scanner'; -import { from as fromPromise, Observable, of, Subject } from 'rxjs'; +import { + from as fromPromise, + Observable, + isObservable, + of, + Subject, +} from 'rxjs'; import { distinctUntilChanged, mergeAll } from 'rxjs/operators'; import { GATEWAY_OPTIONS, PORT_METADATA } from './constants'; import { WsContextCreator } from './context/ws-context-creator'; @@ -158,7 +163,7 @@ export class WebSocketsController { deferredResult: Promise, ): Promise> { const result = await deferredResult; - if (result && isFunction(result.subscribe)) { + if (isObservable(result)) { return result; } if (result instanceof Promise) {