Skip to content

Commit

Permalink
Simplify RetryLink, fix potential memory leak (#11424)
Browse files Browse the repository at this point in the history
fixes #11393
  • Loading branch information
phryneas committed Dec 20, 2023
1 parent 6d46ab9 commit 62f3b6d
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 108 deletions.
13 changes: 13 additions & 0 deletions .changeset/curvy-seas-hope.md
@@ -0,0 +1,13 @@
---
"@apollo/client": minor
---

Simplify RetryLink, fix potential memory leak

Historically, `RetryLink` would keep a `values` array of all previous values,
in case the operation would get an additional subscriber at a later point in time.
In practice, this could lead to a memory leak (#11393) and did not serve any
further purpose, as the resulting observable would only be subscribed to by
Apollo Client itself, and only once - it would be wrapped in a `Concast` before
being exposed to the user, and that `Concast` would handle subscribers on its
own.
3 changes: 3 additions & 0 deletions src/link/batch-http/__tests__/batchHttpLink.ts
Expand Up @@ -524,6 +524,9 @@ describe("SharedHttpTest", () => {
expect(subscriber.next).toHaveBeenCalledTimes(2);
expect(subscriber.complete).toHaveBeenCalledTimes(2);
expect(subscriber.error).not.toHaveBeenCalled();
// only one call because batchHttpLink can handle more than one subscriber
// without starting a new request
expect(fetchMock.calls().length).toBe(1);
resolve();
}, 50);
});
Expand Down
1 change: 1 addition & 0 deletions src/link/http/__tests__/HttpLink.ts
Expand Up @@ -634,6 +634,7 @@ describe("HttpLink", () => {
expect(subscriber.next).toHaveBeenCalledTimes(2);
expect(subscriber.complete).toHaveBeenCalledTimes(2);
expect(subscriber.error).not.toHaveBeenCalled();
expect(fetchMock.calls().length).toBe(2);
resolve();
}, 50);
});
Expand Down
21 changes: 16 additions & 5 deletions src/link/retry/__tests__/retryLink.ts
Expand Up @@ -92,7 +92,12 @@ describe("RetryLink", () => {
expect(unsubscribeStub).toHaveBeenCalledTimes(1);
});

it("supports multiple subscribers to the same request", async () => {
it("multiple subscribers will trigger multiple requests", async () => {
const subscriber = {
next: jest.fn(console.log),
error: jest.fn(console.error),
complete: jest.fn(console.info),
};
const retry = new RetryLink({
delay: { initial: 1 },
attempts: { max: 5 },
Expand All @@ -102,13 +107,19 @@ describe("RetryLink", () => {
stub.mockReturnValueOnce(fromError(standardError));
stub.mockReturnValueOnce(fromError(standardError));
stub.mockReturnValueOnce(Observable.of(data));
stub.mockReturnValueOnce(fromError(standardError));
stub.mockReturnValueOnce(fromError(standardError));
stub.mockReturnValueOnce(Observable.of(data));
const link = ApolloLink.from([retry, stub]);

const observable = execute(link, { query });
const [result1, result2] = (await waitFor(observable, observable)) as any;
expect(result1.values).toEqual([data]);
expect(result2.values).toEqual([data]);
expect(stub).toHaveBeenCalledTimes(3);
observable.subscribe(subscriber);
observable.subscribe(subscriber);
await new Promise((resolve) => setTimeout(resolve, 3500));
expect(subscriber.next).toHaveBeenNthCalledWith(1, data);
expect(subscriber.next).toHaveBeenNthCalledWith(2, data);
expect(subscriber.complete).toHaveBeenCalledTimes(2);
expect(stub).toHaveBeenCalledTimes(6);
});

it("retries independently for concurrent requests", async () => {
Expand Down
121 changes: 18 additions & 103 deletions src/link/retry/retryLink.ts
@@ -1,14 +1,12 @@
import type { Operation, FetchResult, NextLink } from "../core/index.js";
import { ApolloLink } from "../core/index.js";
import type {
Observer,
ObservableSubscription,
} from "../../utilities/index.js";
import type { ObservableSubscription } from "../../utilities/index.js";
import { Observable } from "../../utilities/index.js";
import type { DelayFunction, DelayFunctionOptions } from "./delayFunction.js";
import { buildDelayFunction } from "./delayFunction.js";
import type { RetryFunction, RetryFunctionOptions } from "./retryFunction.js";
import { buildRetryFunction } from "./retryFunction.js";
import type { SubscriptionObserver } from "zen-observable-ts";

export namespace RetryLink {
export interface Options {
Expand All @@ -27,78 +25,18 @@ export namespace RetryLink {
/**
* Tracking and management of operations that may be (or currently are) retried.
*/
class RetryableOperation<TValue = any> {
class RetryableOperation {
private retryCount: number = 0;
private values: any[] = [];
private error: any;
private complete = false;
private canceled = false;
private observers: (Observer<TValue> | null)[] = [];
private currentSubscription: ObservableSubscription | null = null;
private timerId: number | undefined;

constructor(
private observer: SubscriptionObserver<FetchResult>,
private operation: Operation,
private nextLink: NextLink,
private forward: NextLink,
private delayFor: DelayFunction,
private retryIf: RetryFunction
) {}

/**
* Register a new observer for this operation.
*
* If the operation has previously emitted other events, they will be
* immediately triggered for the observer.
*/
public subscribe(observer: Observer<TValue>) {
if (this.canceled) {
throw new Error(
`Subscribing to a retryable link that was canceled is not supported`
);
}
this.observers.push(observer);

// If we've already begun, catch this observer up.
for (const value of this.values) {
observer.next!(value);
}

if (this.complete) {
observer.complete!();
} else if (this.error) {
observer.error!(this.error);
}
}

/**
* Remove a previously registered observer from this operation.
*
* If no observers remain, the operation will stop retrying, and unsubscribe
* from its downstream link.
*/
public unsubscribe(observer: Observer<TValue>) {
const index = this.observers.indexOf(observer);
if (index < 0) {
throw new Error(
`RetryLink BUG! Attempting to unsubscribe unknown observer!`
);
}
// Note that we are careful not to change the order of length of the array,
// as we are often mid-iteration when calling this method.
this.observers[index] = null;

// If this is the last observer, we're done.
if (this.observers.every((o) => o === null)) {
this.cancel();
}
}

/**
* Start the initial request.
*/
public start() {
if (this.currentSubscription) return; // Already started.

) {
this.try();
}

Expand All @@ -112,33 +50,16 @@ class RetryableOperation<TValue = any> {
clearTimeout(this.timerId);
this.timerId = undefined;
this.currentSubscription = null;
this.canceled = true;
}

private try() {
this.currentSubscription = this.nextLink(this.operation).subscribe({
next: this.onNext,
this.currentSubscription = this.forward(this.operation).subscribe({
next: this.observer.next.bind(this.observer),
error: this.onError,
complete: this.onComplete,
complete: this.observer.complete.bind(this.observer),
});
}

private onNext = (value: any) => {
this.values.push(value);
for (const observer of this.observers) {
if (!observer) continue;
observer.next!(value);
}
};

private onComplete = () => {
this.complete = true;
for (const observer of this.observers) {
if (!observer) continue;
observer.complete!();
}
};

private onError = async (error: any) => {
this.retryCount += 1;

Expand All @@ -153,11 +74,7 @@ class RetryableOperation<TValue = any> {
return;
}

this.error = error;
for (const observer of this.observers) {
if (!observer) continue;
observer.error!(error);
}
this.observer.error(error);
};

private scheduleRetry(delay: number) {
Expand Down Expand Up @@ -189,18 +106,16 @@ export class RetryLink extends ApolloLink {
operation: Operation,
nextLink: NextLink
): Observable<FetchResult> {
const retryable = new RetryableOperation(
operation,
nextLink,
this.delayFor,
this.retryIf
);
retryable.start();

return new Observable((observer) => {
retryable.subscribe(observer);
const retryable = new RetryableOperation(
observer,
operation,
nextLink,
this.delayFor,
this.retryIf
);
return () => {
retryable.unsubscribe(observer);
retryable.cancel();
};
});
}
Expand Down

0 comments on commit 62f3b6d

Please sign in to comment.