Skip to content

Commit

Permalink
Merge pull request #9491 from micalevisk/fix-use-isObservable
Browse files Browse the repository at this point in the history
Use rxjs when checking if the value is an observable
  • Loading branch information
kamilmysliwiec committed May 17, 2022
2 parents ca967b8 + bc0a1de commit 19b55e9
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 18 deletions.
10 changes: 5 additions & 5 deletions packages/core/router/router-response-controller.ts
Expand Up @@ -5,9 +5,9 @@ import {
RequestMethod,
MessageEvent,
} from '@nestjs/common';
import { isFunction, isObject } from '@nestjs/common/utils/shared.utils';
import { isObject } from '@nestjs/common/utils/shared.utils';
import { IncomingMessage } from 'http';
import { EMPTY, lastValueFrom, Observable } from 'rxjs';
import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs';
import { catchError, debounce, map } from 'rxjs/operators';
import {
AdditionalHeaders,
Expand Down Expand Up @@ -64,7 +64,7 @@ export class RouterResponseController {
}

public async transformToResult(resultOrDeferred: any) {
if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) {
if (isObservable(resultOrDeferred)) {
return lastValueFrom(resultOrDeferred);
}
return resultOrDeferred;
Expand Down Expand Up @@ -152,8 +152,8 @@ export class RouterResponseController {
});
}

private assertObservable(result: any) {
if (!isFunction(result.subscribe)) {
private assertObservable(value: any) {
if (!isObservable(value)) {
throw new ReferenceError(
'You must return an Observable stream to use Server-Sent Events (SSE).',
);
Expand Down
21 changes: 15 additions & 6 deletions packages/core/test/router/router-response-controller.spec.ts
Expand Up @@ -71,13 +71,13 @@ describe('RouterResponseController', () => {
describe('transformToResult', () => {
describe('when resultOrDeferred', () => {
describe('is Promise', () => {
it('should return Promise', async () => {
it('should return Promise that resolves to the value resolved by the input Promise', async () => {
const value = 100;
expect(
await routerResponseController.transformToResult(
Promise.resolve(value),
),
).to.be.eq(100);
).to.be.eq(value);
});
});

Expand All @@ -88,16 +88,25 @@ describe('RouterResponseController', () => {
await routerResponseController.transformToResult(
of(1, 2, 3, lastValue),
),
).to.be.eq(100);
).to.be.eq(lastValue);
});
});

describe('is value', () => {
it('should return Promise', async () => {
describe('is an object that has the method `subscribe`', () => {
it('should return a Promise that resolves to the input value', async () => {
const value = { subscribe() {} };
expect(
await routerResponseController.transformToResult(value),
).to.equal(value);
});
});

describe('is an ordinary value', () => {
it('should return a Promise that resolves to the input value', async () => {
const value = 100;
expect(
await routerResponseController.transformToResult(value),
).to.be.eq(100);
).to.be.eq(value);
});
});
});
Expand Down
19 changes: 15 additions & 4 deletions packages/websockets/test/web-sockets-controller.spec.ts
Expand Up @@ -345,7 +345,7 @@ describe('WebSocketsController', () => {
Promise.resolve(Promise.resolve(value)),
),
),
).to.be.eq(100);
).to.be.eq(value);
});
});

Expand All @@ -356,18 +356,29 @@ describe('WebSocketsController', () => {
await lastValueFrom(
await instance.pickResult(Promise.resolve(of(value))),
),
).to.be.eq(100);
).to.be.eq(value);
});
});

describe('is a value', () => {
describe('is an object that has the method `subscribe`', () => {
it('should return Promise<Observable>', async () => {
const value = { subscribe() {} };
expect(
await lastValueFrom(
await instance.pickResult(Promise.resolve(value)),
),
).to.equal(value);
});
});

describe('is an ordinary value', () => {
it('should return Promise<Observable>', async () => {
const value = 100;
expect(
await lastValueFrom(
await instance.pickResult(Promise.resolve(value)),
),
).to.be.eq(100);
).to.be.eq(value);
});
});
});
Expand Down
11 changes: 8 additions & 3 deletions packages/websockets/web-sockets-controller.ts
@@ -1,9 +1,14 @@
import { Type } from '@nestjs/common/interfaces/type.interface';
import { Logger } from '@nestjs/common/services/logger.service';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { ApplicationConfig } from '@nestjs/core/application-config';
import { MetadataScanner } from '@nestjs/core/metadata-scanner';
import { from as fromPromise, Observable, of, Subject } from 'rxjs';
import {
from as fromPromise,
Observable,
isObservable,
of,
Subject,
} from 'rxjs';
import { distinctUntilChanged, mergeAll } from 'rxjs/operators';
import { GATEWAY_OPTIONS, PORT_METADATA } from './constants';
import { WsContextCreator } from './context/ws-context-creator';
Expand Down Expand Up @@ -158,7 +163,7 @@ export class WebSocketsController {
deferredResult: Promise<any>,
): Promise<Observable<any>> {
const result = await deferredResult;
if (result && isFunction(result.subscribe)) {
if (isObservable(result)) {
return result;
}
if (result instanceof Promise) {
Expand Down

0 comments on commit 19b55e9

Please sign in to comment.