Skip to content

Commit

Permalink
Low-hanging fruit for performance improvements in usage reporting (#7101
Browse files Browse the repository at this point in the history
)

- Drop references to unencoded and uncompressed versions of reports as
soon as they are no longer needed, to hopefully allow garbage collection
to be more effective during the HTTP POSTs.
- Simplify compression code to no longer require a workaround for Node
v6 (we require Node v14!), and use `util.promisify`.
- Add a timeout to the report POST. The default is 30 seconds, which is
  currently the timeout enforced by Apollo's load balancers; a smaller
  number may be advisable for high traffic users facing memory
  constraints. This uses the AbortController API (which should be
  supported by most Fetcher implementations). We use the
  `node-abort-controller` polyfill (same polyfill we chose for
  `@apollo/gateway`) because Node's built-in global AbortController
  requires an experimental flag in Node v14.

Fixes #7100.
  • Loading branch information
glasser committed Oct 31, 2022
1 parent 4881198 commit e4e7738
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 43 deletions.
5 changes: 5 additions & 0 deletions .changeset/purple-kiwis-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@apollo/server': patch
---

Manage memory more efficiently in the usage reporting plugin by allowing large objects to be garbage collected more quickly.
5 changes: 5 additions & 0 deletions .changeset/quick-weeks-shake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@apollo/server': minor
---

The usage reporting plugin now defaults to a 30 second timeout for each attempt to send reports to Apollo Server instead of no timeout; the timeout can be adjusted with the new `requestTimeoutMs` option to `ApolloServerPluginUsageReporting`. (Apollo's servers already enforced a 30 second timeout, so this is unlikely to break any existing use cases.)
12 changes: 12 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 18 additions & 9 deletions packages/integration-testsuite/src/apolloServerTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2225,21 +2225,20 @@ export function defineIntegrationTestSuiteApolloServerTests(

describe('graphql server functions even when Apollo servers are down', () => {
async function testWithStatus(
status: number,
status: number | 'cannot-connect' | 'timeout',
expectedRequestCount: number,
) {
const networkError = status === 0;

const { closeServer, fakeUsageReportingUrl, writeResponseResolve } =
await makeFakeUsageReportingServer({
status,
// the 444 case shouldn't ever get to actually sending 444
status: typeof status === 'number' ? status : 444,
waitWriteResponse: true,
});

try {
// To simulate a network error, we create and close the server.
// This lets us still generate a port that is hopefully unused.
if (networkError) {
if (status == 'cannot-connect') {
await closeServer();
}

Expand Down Expand Up @@ -2277,6 +2276,8 @@ export function defineIntegrationTestSuiteApolloServerTests(
reportErrorFunction(error: Error) {
reportErrorPromiseResolve(error);
},
// Make sure the timeout test actually finishes in time
requestTimeoutMs: status === 'timeout' ? 10 : undefined,
}),
],
});
Expand All @@ -2292,27 +2293,32 @@ export function defineIntegrationTestSuiteApolloServerTests(
});
expect(result.data.something).toBe('hello');

if (!networkError) {
if (typeof status === 'number') {
// Allow reporting to return its response (for every retry).
// (But not if we're intentionally timing out!)
writeResponseResolve();
}

// Make sure we can get the error from reporting.
const sendingError = await reportErrorPromise;
expect(sendingError).toBeTruthy();
if (networkError) {
if (status === 'cannot-connect') {
expect(sendingError.message).toContain(
'Error sending report to Apollo servers',
);
expect(sendingError.message).toContain('ECONNREFUSED');
} else if (status === 'timeout') {
expect(sendingError.message).toBe(
'Error sending report to Apollo servers: The user aborted a request.',
);
} else {
expect(sendingError.message).toBe(
`Error sending report to Apollo servers: HTTP status ${status}, Important text in the body`,
);
}
expect(requestCount).toBe(expectedRequestCount);
} finally {
if (!networkError) {
if (status !== 'cannot-connect') {
await closeServer();
}
}
Expand All @@ -2322,7 +2328,10 @@ export function defineIntegrationTestSuiteApolloServerTests(
await testWithStatus(500, 3);
});
it('with network error', async () => {
await testWithStatus(0, 3);
await testWithStatus('cannot-connect', 3);
});
it('with timeout', async () => {
await testWithStatus('timeout', 3);
});
it('with non-retryable error', async () => {
await testWithStatus(400, 1);
Expand Down
1 change: 1 addition & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
"loglevel": "^1.6.8",
"lru-cache": "^7.10.1",
"negotiator": "^0.6.3",
"node-abort-controller": "^3.0.1",
"node-fetch": "^2.6.7",
"uuid": "^9.0.0",
"whatwg-mimetype": "^3.0.0"
Expand Down
6 changes: 6 additions & 0 deletions packages/server/src/plugin/usageReporting/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ export interface ApolloServerPluginUsageReportingOptions<
* Minimum back-off for retries. Defaults to 100ms.
*/
minimumRetryDelayMs?: number;
/**
* Default timeout for each individual attempt to send a report to Apollo.
* (This is for each HTTP POST, not for all potential retries.) Defaults to 30
* seconds (30000ms).
*/
requestTimeoutMs?: number;
/**
* A logger interface to be used for output and errors. When not provided
* it will default to the server's own `logger` implementation and use
Expand Down
74 changes: 40 additions & 34 deletions packages/server/src/plugin/usageReporting/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import {
import retry from 'async-retry';
import { GraphQLSchema, printSchema } from 'graphql';
import type LRUCache from 'lru-cache';
import { AbortController } from 'node-abort-controller';
import fetch from 'node-fetch';
import os from 'os';
import { promisify } from 'util';
import { gzip } from 'zlib';
import type {
ApolloServerPlugin,
Expand Down Expand Up @@ -38,6 +40,8 @@ import { packageVersion } from '../../generated/packageVersion.js';
import { computeCoreSchemaHash } from '../../utils/computeCoreSchemaHash.js';
import type { HeaderMap } from '../../utils/HeaderMap.js';

const gzipPromise = promisify(gzip);

const reportHeaderDefaults = {
hostname: os.hostname(),
agentVersion: `@apollo/server@${packageVersion}`,
Expand Down Expand Up @@ -237,7 +241,7 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(

// Needs to be an arrow function to be confident that key is defined.
const sendReport = async (executableSchemaId: string): Promise<void> => {
const report = getAndDeleteReport(executableSchemaId);
let report = getAndDeleteReport(executableSchemaId);
if (
!report ||
(Object.keys(report.tracesPerQuery).length === 0 &&
Expand All @@ -254,9 +258,12 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(

const protobufError = Report.verify(report);
if (protobufError) {
throw new Error(`Error encoding report: ${protobufError}`);
throw new Error(`Error verifying report: ${protobufError}`);
}
const message = Report.encode(report).finish();
let message: Uint8Array | null = Report.encode(report).finish();
// Let the original protobuf object be garbage collected (helpful if the
// HTTP request hangs).
report = null;

// Potential follow-up: we can compare message.length to
// report.sizeEstimator.bytes and use it to "learn" if our estimation is
Expand All @@ -271,45 +278,44 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
);
}

const compressed = await new Promise<Buffer>((resolve, reject) => {
// The protobuf library gives us a Uint8Array. Node 8's zlib lets us
// pass it directly; convert for the sake of Node 6. (No support right
// now for Node 4, which lacks Buffer.from.)
const messageBuffer = Buffer.from(
message.buffer as ArrayBuffer,
message.byteOffset,
message.byteLength,
);
gzip(messageBuffer, (err, gzipResult) => {
if (err) {
reject(err);
} else {
resolve(gzipResult);
}
});
});
const compressed = await gzipPromise(message);
// Let the uncompressed message be garbage collected (helpful if the
// HTTP request is slow).
message = null;

// Wrap fetcher with async-retry for automatic retrying
const fetcher: Fetcher = options.fetcher ?? fetch;
const response: FetcherResponse = await retry(
// Retry on network errors and 5xx HTTP
// responses.
async () => {
const curResponse = await fetcher(
(options.endpointUrl ||
'https://usage-reporting.api.apollographql.com') +
'/api/ingress/traces',
{
method: 'POST',
headers: {
'user-agent': 'ApolloServerPluginUsageReporting',
'x-api-key': key,
'content-encoding': 'gzip',
accept: 'application/json',
// Note that once we require Node v16 we can use its global
// AbortController instead of the one from `node-abort-controller`.
const controller = new AbortController();
const abortTimeout = setTimeout(() => {
controller.abort();
}, options.requestTimeoutMs ?? 30_000);
let curResponse;
try {
curResponse = await fetcher(
(options.endpointUrl ||
'https://usage-reporting.api.apollographql.com') +
'/api/ingress/traces',
{
method: 'POST',
headers: {
'user-agent': 'ApolloServerPluginUsageReporting',
'x-api-key': key,
'content-encoding': 'gzip',
accept: 'application/json',
},
body: compressed,
signal: controller.signal,
},
body: compressed,
},
);
);
} finally {
clearTimeout(abortTimeout);
}

if (curResponse.status >= 500 && curResponse.status < 600) {
throw new Error(
Expand Down

0 comments on commit e4e7738

Please sign in to comment.