Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite links with rxjs #1592

Closed
wants to merge 17 commits into from
1 change: 1 addition & 0 deletions examples/express-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"abort-controller": "^3.0.0",
"express": "^4.17.1",
"node-fetch": "^2.6.1",
"rxjs": "^7.5.4",
"typescript": "4.4.4",
"zod": "^3.0.0"
},
Expand Down
2 changes: 1 addition & 1 deletion examples/express-server/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { loggerLink } from '@trpc/client/links/loggerLink';
import AbortController from 'abort-controller';
import fetch from 'node-fetch';
import type { AppRouter } from './server';
import { tap } from '@trpc/client/rx/operators';
import { tap } from 'rxjs';

// polyfill
global.AbortController = AbortController;
Expand Down
4 changes: 2 additions & 2 deletions examples/fastify-server/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ async function start() {
let randomNumberCount = 0;

await new Promise<void>((resolve) => {
const unsub = anon.client.subscription('sub:randomNumber', null, {
const subscription = anon.client.subscription('sub:randomNumber', null, {
next(data) {
console.log('>>> anon:sub:randomNumber:received:', data);
randomNumberCount++;

if (randomNumberCount > 3) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed this pattern in more than one spot. Consider using take(3) instead. It's the same basic idea.

unsub();
subscription.unsubscribe();
resolve();
}
},
Expand Down
1 change: 1 addition & 0 deletions examples/next-prisma-todomvc/next-env.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/// <reference types="next" />
/// <reference types="next/types/global" />
/// <reference types="next/image-types/global" />

// NOTE: This file should not be edited
Expand Down
11 changes: 5 additions & 6 deletions examples/standalone-server/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,22 @@ async function main() {
console.log('createPostResponse', createPostRes);

let count = 0;
const unsub = client.subscription('randomNumber', null, {
const subscription = client.subscription('randomNumber', null, {
next(data) {
// ^ note that `data` here is inferred
console.log('received', data);
count++;
if (count > 3) {
// stop after 3 pulls
unsub();
subscription.unsubscribe();

console.log('unsubscribing and closing websocket');
wsClient.close();
}
},
error(err) {
console.error('error', err);
},
complete() {
console.log('completed called - closing websocket');
wsClient.close();
},
});
}

Expand Down
21 changes: 8 additions & 13 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
"require": "./dist/index.js",
"default": "./dist/index.js"
},
"./rx": {
"import": "./dist/rx.mjs",
"require": "./dist/rx.js",
"default": "./dist/rx.js"
},
"./links/httpBatchLink": {
"import": "./dist/links/httpBatchLink.mjs",
"require": "./dist/links/httpBatchLink.js",
Expand Down Expand Up @@ -48,30 +53,20 @@
"import": "./dist/links/wsLink.mjs",
"require": "./dist/links/wsLink.js",
"default": "./dist/links/wsLink.js"
},
"./rx": {
"import": "./dist/rx/index.mjs",
"require": "./dist/rx/index.js",
"default": "./dist/rx/index.js"
},
"./rx/operators": {
"import": "./dist/rx/operators/index.mjs",
"require": "./dist/rx/operators/index.js",
"default": "./dist/rx/operators/index.js"
}
},
"files": [
"README.md",
"dist",
"links",
"src",
"rx"
"src"
],
"scripts": {
"build": "rollup -c"
},
"dependencies": {
"@babel/runtime": "^7.9.0"
"@babel/runtime": "^7.9.0",
"rxjs": "^7.5.4"
sachinraja marked this conversation as resolved.
Show resolved Hide resolved
},
"peerDependencies": {
"@trpc/server": "^9.12.0"
Expand Down
3 changes: 1 addition & 2 deletions packages/client/rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ import { getRollupConfig } from '../../scripts/rollup';
const config = getRollupConfig({
input: [
'src/index.ts',
'src/rx.ts',
'src/links/httpBatchLink.ts',
'src/links/httpLink.ts',
'src/links/loggerLink.ts',
'src/links/splitLink.ts',
'src/links/transformerLink.ts',
'src/links/wsLink.ts',
'src/rx/index.ts',
'src/rx/operators/index.ts',
],
});

Expand Down
2 changes: 1 addition & 1 deletion packages/client/rx/index.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
module.exports = require('../dist/rx/index');
module.exports = require('../dist/rx');
1 change: 0 additions & 1 deletion packages/client/rx/operators/index.d.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/client/rx/operators/index.js

This file was deleted.

18 changes: 6 additions & 12 deletions packages/client/src/internals/TRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ import {
OperationLink,
TRPCLink,
} from '../links/types';
import { UnsubscribeFn } from '../rx';
import { inferObservableValue } from '../rx/observable';
import { share } from '../rx/operators';
import { Observer } from '../rx/types';
import { observableToPromise } from '../rx/util/observableToPromise';
import { inferObservableValue, observableToPromise } from '../rx';
import { share } from 'rxjs/operators';
sachinraja marked this conversation as resolved.
Show resolved Hide resolved
import { Observer, Subscription } from 'rxjs';
import { TRPCClientError } from '../TRPCClientError';
import { getAbortController } from './fetchHelpers';

Expand Down Expand Up @@ -216,16 +214,15 @@ export class TRPCClient<TRouter extends AnyRouter> {
>(
path: TPath,
input: TInput,
opts: TRPCRequestOptions &
Partial<Observer<TRPCResult<TOutput>, TRPCClientError<TRouter>>>,
): UnsubscribeFn {
opts: TRPCRequestOptions & Partial<Observer<TRPCResult<TOutput>>>,
): Subscription {
const observable$ = this.$request<TInput, TOutput>({
type: 'subscription',
path,
input,
context: opts.context,
});
const observer = observable$.subscribe({
return observable$.subscribe({
next(result) {
if ('error' in result.data) {
const err = TRPCClientError.from(result.data, {
Expand All @@ -244,8 +241,5 @@ export class TRPCClient<TRouter extends AnyRouter> {
opts.complete?.();
},
});
return () => {
observer.unsubscribe();
};
}
}
6 changes: 3 additions & 3 deletions packages/client/src/links/dedupeLink.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { waitFor } from '@testing-library/dom';
import { AnyRouter } from '@trpc/server';
import { OperationLink } from '..';
import { observable } from '../rx/observable';
import { Observable } from 'rxjs';
import { dedupeLink } from './dedupeLink';
import { createChain } from './internals/createChain';

Expand All @@ -12,7 +12,7 @@ test('dedupeLink', async () => {
// "dedupe link"
dedupeLink()(null as any),
({ op }) => {
return observable((subscribe) => {
return new Observable((subscribe) => {
endingLinkTriggered();
const timer = setTimeout(() => {
timerTriggered();
Expand Down Expand Up @@ -75,7 +75,7 @@ test('dedupe - cancel one does not cancel the other', async () => {
// "dedupe link"
dedupeLink()(null as any),
({ op }) => {
return observable((subscribe) => {
return new Observable((subscribe) => {
endingLinkTriggered();
const timer = setTimeout(() => {
timerTriggered();
Expand Down
55 changes: 28 additions & 27 deletions packages/client/src/links/dedupeLink.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import { AnyRouter } from '@trpc/server';
import { observable } from '../rx/observable';
import { share } from '../rx/operators';
import { Observable } from '../rx/types';
import { TRPCLink } from './types';
import { Observable } from 'rxjs';
import { share } from 'rxjs';
sachinraja marked this conversation as resolved.
Show resolved Hide resolved
import { OperationResult, TRPCLink } from './types';

export function dedupeLink<
TRouter extends AnyRouter = AnyRouter,
>(): TRPCLink<TRouter> {
// initialized config
return () => {
// initialized in app
const pending: Record<string, Observable<any, any>> = {};
const pending: Record<string, Observable<any>> = {};
return ({ op, next }) => {
// initialized for request

Expand All @@ -21,31 +20,33 @@ export function dedupeLink<
const key = JSON.stringify([op.path, op.input]);
if (pending[key]) {
// console.log('hooking into pending', { op });
return observable((observer) => pending[key].subscribe(observer));
return new Observable((observer) => pending[key].subscribe(observer));
}

const shared$ = observable((observer) => {
function reset() {
delete pending[key];
}
const next$ = next(op).subscribe({
next(v) {
observer.next(v);
},
error(e) {
const shared$ = new Observable<OperationResult<TRouter, unknown>>(
(observer) => {
function reset() {
delete pending[key];
}
const next$ = next(op).subscribe({
next(v) {
observer.next(v);
},
error(e) {
reset();
observer.error(e);
},
complete() {
reset();
sachinraja marked this conversation as resolved.
Show resolved Hide resolved
observer.complete();
},
});
return () => {
reset();
observer.error(e);
},
complete() {
reset();
observer.complete();
},
});
return () => {
reset();
next$.unsubscribe();
};
}).pipe(share());
next$.unsubscribe();
};
},
).pipe(share());
pending[key] = shared$;
return shared$;
};
Expand Down
4 changes: 2 additions & 2 deletions packages/client/src/links/httpBatchLink.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { AnyRouter, ProcedureType } from '@trpc/server';
import { dataLoader } from '../internals/dataLoader';
import { observable } from '../rx/observable';
import { Observable } from 'rxjs';
import { TRPCLink } from './types';
import { HTTPLinkOptions, httpRequest, ResponseShape } from './httpUtils';

Expand Down Expand Up @@ -52,7 +52,7 @@ export function httpBatchLink<TRouter extends AnyRouter>(

const loaders = { query, subscription, mutation };
return ({ op }) => {
return observable((observer) => {
return new Observable((observer) => {
const loader = loaders[op.type];
const { promise, cancel } = loader.load(op);

Expand Down
4 changes: 2 additions & 2 deletions packages/client/src/links/httpLink.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AnyRouter } from '@trpc/server';
import { observable } from '../rx/observable';
import { Observable } from 'rxjs';
import { TRPCLink } from './types';
import { HTTPLinkOptions, httpRequest } from './httpUtils';

Expand All @@ -9,7 +9,7 @@ export function httpLink<TRouter extends AnyRouter>(
const { url } = opts;
return (runtime) =>
({ op }) =>
observable((observer) => {
new Observable((observer) => {
const { path, input, type } = op;
const { promise, cancel } = httpRequest({
runtime,
Expand Down
14 changes: 7 additions & 7 deletions packages/client/src/links/internals/createChain.test.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import { AnyRouter } from '@trpc/server/src';
import { createChain } from './createChain';
import { observable } from '../../rx/observable';
import { Observable } from 'rxjs';

describe('chain', () => {
test('trivial', () => {
const result$ = createChain<AnyRouter, unknown, unknown>({
links: [
({ next, op }) => {
return observable((observer) => {
const next$ = next(op).subscribe(observer);
return new Observable((observer) => {
const subscription = next(op).subscribe(observer);
return () => {
next$.unsubscribe();
subscription.unsubscribe();
};
});
},
({ op }) => {
return observable((observer) => {
return new Observable((observer) => {
observer.next({
context: {},
data: {
Expand Down Expand Up @@ -51,7 +51,7 @@ describe('chain', () => {
const result$ = createChain<AnyRouter, unknown, unknown>({
links: [
({ next, op }) => {
return observable((observer) => {
return new Observable((observer) => {
observer.next({
context: {},
data: {
Expand All @@ -69,7 +69,7 @@ describe('chain', () => {
});
},
({ op }) => {
return observable((observer) => {
return new Observable((observer) => {
observer.next({
data: {
id: null,
Expand Down
4 changes: 2 additions & 2 deletions packages/client/src/links/internals/createChain.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AnyRouter } from '@trpc/server';
import { observable } from '../../rx/observable';
import { Observable } from 'rxjs';
import { Operation, OperationLink, OperationResultObservable } from '../types';

/** @internal */
Expand All @@ -11,7 +11,7 @@ export function createChain<
links: OperationLink<TRouter, TInput, TOutput>[];
op: Operation<TInput>;
}): OperationResultObservable<TRouter, TOutput> {
return observable((observer) => {
return new Observable((observer) => {
function execute(index = 0, op = opts.op) {
const next = opts.links[index];
const next$ = next({
Expand Down
5 changes: 2 additions & 3 deletions packages/client/src/links/loggerLink.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { AnyRouter } from '@trpc/server';
import { TRPCClientError } from '..';
import { observable } from '../rx/observable';
import { tap } from '../rx/operators';
import { Observable, tap } from 'rxjs';
import { Operation, OperationResult, TRPCLink } from './types';

type ConsoleEsque = {
Expand Down Expand Up @@ -108,7 +107,7 @@ export function loggerLink<TRouter extends AnyRouter = AnyRouter>(

return () => {
return ({ op, next }) => {
return observable((observer) => {
return new Observable((observer) => {
// ->
enabled({ ...op, direction: 'up' }) &&
logger({
Expand Down