Skip to content

Commit

Permalink
De-flake stoppable tests (#7232)
Browse files Browse the repository at this point in the history
We've had a lot of issues where `stoppable` tests are flaky because they
depend on measuring the time that something takes and hoping it's
relatively close to a particular timeout that's supposed to be
"controlling" the observed behavior.

This PR refactors the internal Stoppable object so that instead of
taking a millisecond timeout value, it takes a DOM-style AbortSignal.
This object is only used by ApolloServerPluginDrainHttpServer; we move
the setTimeout from inside Stoppable into that plugin, which now creates
an AbortController polyfilled from `node-abort-controller` (like we
already are in the usage reporting plugin; note that once we drop Node
v14 support we can switch to the built-in implementation).

Now the Stoppable test can control the grace period directly via
AbortSignals rather than indirectly based on timeouts. This lets us drop
all of the time measurements from the test. There still are some delays
but they are just of the form "wait some time and double-check that
nothing happened": the worst case scenario here is that a test passes
that should fail because the thing would have happened if you'd waited
slightly longer, but there shouldn't be any spurious failures.

Fixes #6963.

Co-authored-by: Trevor Scheer <trevor.scheer@gmail.com>
  • Loading branch information
glasser and trevor-scheer committed Dec 8, 2022
1 parent f97e553 commit 3a4823e
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 66 deletions.
5 changes: 5 additions & 0 deletions .changeset/fluffy-hats-guess.md
@@ -0,0 +1,5 @@
---
'@apollo/server': patch
---

Refactor the implementation of `ApolloServerPluginDrainHttpServer`'s grace period. This is intended to be a no-op.
3 changes: 2 additions & 1 deletion cspell-dict.txt
Expand Up @@ -36,6 +36,7 @@ concat
contravariance
contravariantly
createhash
culted
dataloaders
datasource
datasources
Expand Down Expand Up @@ -174,8 +175,8 @@ subpackage
subqueries
sumby
supergraph
supergraphs
Supergraph
supergraphs
testful
testonly
testsuite
Expand Down
131 changes: 89 additions & 42 deletions packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts
Expand Up @@ -36,6 +36,8 @@ import child from 'child_process';
import path from 'path';
import type { AddressInfo } from 'net';
import { describe, it, expect, afterEach, beforeEach } from '@jest/globals';
import { AbortController } from 'node-abort-controller';
import resolvable, { Resolvable } from '@josephg/resolvable';

function port(s: http.Server) {
return (s.address() as AddressInfo).port;
Expand Down Expand Up @@ -163,7 +165,7 @@ Object.keys(schemes).forEach((schemeName) => {
expect(gracefully).toBe(true);
});

it('with keep-alive connections', async () => {
it('with idle keep-alive connections', async () => {
let closed = 0;
const server = scheme.server();
const stopper = new Stopper(server);
Expand Down Expand Up @@ -192,15 +194,16 @@ Object.keys(schemes).forEach((schemeName) => {
});
});

it('with a 0.5s grace period', async () => {
it('with unfinished requests', async () => {
const server = scheme.server((_req, res) => {
res.writeHead(200);
res.write('hi');
res.write('hi'); // note lack of end()!
});
const stopper = new Stopper(server);
server.listen(0);
const p = port(server);
await a.event(server, 'listening');
// Send two requests and wait to receive headers.
await Promise.all([
request(`${schemeName}://localhost:${p}`).agent(
scheme.agent({ keepAlive: true }),
Expand All @@ -209,76 +212,120 @@ Object.keys(schemes).forEach((schemeName) => {
scheme.agent({ keepAlive: true }),
),
]);
const start = Date.now();
const closeEventPromise = a.event(server, 'close');
const gracefully = await stopper.stop(500);
let closeCalled: boolean = false;
const closeEventPromise = a.event(server, 'close').then(() => {
closeCalled = true;
});
const abortController = new AbortController();
let gracefully: boolean | null = null;
const stopPromise = stopper
.stop(abortController.signal)
.then((stopReturn) => {
gracefully = stopReturn;
});

// Wait a while (number chosen here is arbitrary). Stopping should not have happened yet.
await a.delay(500);
expect(closeCalled).toBe(false);
expect(gracefully).toBeNull();

// Now abort it; this should be sufficient for the two promises above to
// resolve.
abortController.abort();
await stopPromise;
await closeEventPromise;
// These tests are a bit flakey; we should figure out a way to usefully
// test the grace period behavior without leading to flakiness due to
// speed variation.
const elapsed = Date.now() - start;
expect(elapsed).toBeGreaterThanOrEqual(450);
expect(elapsed).toBeLessThanOrEqual(550);
expect(closeCalled).toBe(true);
expect(gracefully).toBe(false);
// It takes a moment for the `finish` events to happen.
await a.delay(20);
expect(stopper['requestCountPerSocket'].size).toBe(0);
// It takes a moment for the `finish` events to happen. Loop waiting for
// them to finish and update this data structure (if it never happens,
// we'll get a Jest timeout, which is the failure we want).
while (stopper['requestCountPerSocket'].size > 0) {
await a.delay(20);
}
});

it('with requests in-flight', async () => {
const barriers: Record<string, Resolvable<void>> = {
b250: resolvable(),
b500: resolvable(),
};
const server = scheme.server((req, res) => {
const delay = parseInt(req.url!.slice(1), 10);
res.writeHead(200);
res.write('hello');
setTimeout(() => res.end('world'), delay);
barriers[req.url!.slice(1)].then(() => res.end('world'));
});
const stopper = new Stopper(server);

server.listen(0);
const p = port(server);
await a.event(server, 'listening');
const start = Date.now();
const res = await Promise.all([
request(`${schemeName}://localhost:${p}/250`).agent(
request(`${schemeName}://localhost:${p}/b250`).agent(
scheme.agent({ keepAlive: true }),
),
request(`${schemeName}://localhost:${p}/500`).agent(
request(`${schemeName}://localhost:${p}/b500`).agent(
scheme.agent({ keepAlive: true }),
),
]);
const closeEventPromise = a.event(server, 'close');
const gracefully = await stopper.stop();
const bodies = await Promise.all(res.map((r) => r.text()));
let closeCalled: boolean = false;
const closeEventPromise = a.event(server, 'close').then(() => {
closeCalled = true;
});
let gracefully: boolean | null = null;
const stopPromise = stopper.stop().then((stopReturn) => {
gracefully = stopReturn;
});

// Wait a while. Stopping should not have happened yet,
await a.delay(250);
expect(closeCalled).toBe(false);
expect(gracefully).toBeNull();

// Let the first request resolve and wait a bit more. Stopping should
// still not have happened.
barriers.b250.resolve();
await a.delay(250);
expect(closeCalled).toBe(false);
expect(gracefully).toBeNull();

// Let the second request resolve. Then things should stop properly.
barriers.b500.resolve();
await closeEventPromise;
expect(bodies[0]).toBe('helloworld');
// These tests are a bit flakey; we should figure out a way to usefully
// test the grace period behavior without leading to flakiness due to
// speed variation.
const elapsed = Date.now() - start;
expect(elapsed).toBeGreaterThanOrEqual(400);
expect(elapsed).toBeLessThanOrEqual(600);
await stopPromise;
expect(closeCalled).toBe(true);
expect(gracefully).toBe(true);

const bodies = await Promise.all(res.map((r) => r.text()));
expect(bodies).toStrictEqual(['helloworld', 'helloworld']);
});

if (schemeName === 'http') {
it('with in-flights finishing before grace period ends', async () => {
const file = path.join(__dirname, 'stoppable', 'server.js');
const server = child.spawn('node', [file, '500']);
const server = child.spawn('node', [file]);
const [data] = await a.event(server.stdout, 'data');
const port = +data.toString();
expect(typeof port).toBe('number');
const start = Date.now();
const res = await request(
`${schemeName}://localhost:${port}/250`,
).agent(scheme.agent({ keepAlive: true }));
const body = await res.text();
const res = await request(`${schemeName}://localhost:${port}/`).agent(
scheme.agent({ keepAlive: true }),
);
let gotBody = false;
const bodyPromise = res.text().then((body: string) => {
gotBody = true;
return body;
});

// Wait a while. We shouldn't have finished reading the response yet.
await a.delay(250);
expect(gotBody).toBe(false);

// Tell the server that its request should finish.
server.kill('SIGUSR1');

const body = await bodyPromise;
expect(gotBody).toBe(true);
expect(body).toBe('helloworld');
// These tests are a bit flakey; we should figure out a way to usefully
// test the grace period behavior without leading to flakiness due to
// speed variation.
const elapsed = Date.now() - start;
expect(elapsed).toBeGreaterThanOrEqual(150);
expect(elapsed).toBeLessThanOrEqual(350);

// Wait for subprocess to go away.
await a.event(server, 'close');
});
Expand Down
@@ -1,14 +1,17 @@
// This server is run by a test in stoppable.test.js. Its HTTP server should
// only ever get one request. It will respond with a 200 and start writing its
// body and then start the Stopper process with no hard-destroy grace period. It
// will finish the request on SIGUSR1.

import http from 'http';
import { Stopper } from '../../../../../dist/esm/plugin/drainHttpServer/stoppable.js';

const grace = Number(process.argv[2] || Infinity);
let stopper;
const server = http.createServer((req, res) => {
const delay = parseInt(req.url.slice(1), 10);
res.writeHead(200);
res.write('hello');
setTimeout(() => res.end('world'), delay);
stopper.stop(grace);
process.on('SIGUSR1', () => res.end('world'));
stopper.stop();
});
stopper = new Stopper(server);
server.listen(0, () => console.log(server.address().port));
19 changes: 18 additions & 1 deletion packages/server/src/plugin/drainHttpServer/index.ts
@@ -1,4 +1,5 @@
import type http from 'http';
import { AbortController } from 'node-abort-controller';
import type { ApolloServerPlugin } from '../../externalTypes/index.js';
import { Stopper } from './stoppable.js';

Expand Down Expand Up @@ -31,7 +32,23 @@ export function ApolloServerPluginDrainHttpServer(
async serverWillStart() {
return {
async drainServer() {
await stopper.stop(options.stopGracePeriodMillis ?? 10_000);
// Note: we don't use `AbortSignal.timeout()` here because our
// polyfill doesn't support it (and even once we drop Node v14
// support, if we don't require at least Node v16.14 then the built-in
// version won't support it either).
const hardDestroyAbortController = new AbortController();
const stopGracePeriodMillis = options.stopGracePeriodMillis ?? 10_000;
let timeout: NodeJS.Timeout | undefined;
if (stopGracePeriodMillis < Infinity) {
timeout = setTimeout(
() => hardDestroyAbortController.abort(),
stopGracePeriodMillis,
);
}
await stopper.stop(hardDestroyAbortController.signal);
if (timeout) {
clearTimeout(timeout);
}
},
};
},
Expand Down
32 changes: 14 additions & 18 deletions packages/server/src/plugin/drainHttpServer/stoppable.ts
Expand Up @@ -29,6 +29,7 @@
import type http from 'http';
import https from 'https';
import type { Socket } from 'net';
import type { AbortSignal } from 'node-abort-controller';

export class Stopper {
private requestCountPerSocket = new Map<Socket, number>();
Expand Down Expand Up @@ -65,7 +66,7 @@ export class Stopper {
);
}

async stop(stopGracePeriodMillis = Infinity): Promise<boolean> {
async stop(hardDestroyAbortSignal?: AbortSignal): Promise<boolean> {
let gracefully = true;

// In the off-chance that we are calling `stop` directly from within the
Expand All @@ -75,28 +76,23 @@ export class Stopper {
await new Promise<void>((resolve) => setImmediate(resolve));
this.stopped = true;

let timeout: NodeJS.Timeout | null = null;
// Soon, hard-destroy everything.
if (stopGracePeriodMillis < Infinity) {
timeout = setTimeout(() => {
gracefully = false;
this.requestCountPerSocket.forEach((_, socket) => socket.end());
// (FYI, when importing from upstream, not sure why we need setImmediate
// here.)
setImmediate(() => {
this.requestCountPerSocket.forEach((_, socket) => socket.destroy());
});
}, stopGracePeriodMillis);
}
// When told to, hard-destroy everything.
const onAbort = () => {
gracefully = false;
this.requestCountPerSocket.forEach((_, socket) => socket.end());
// (FYI, this setImmediate was cargo-culted from the original
// implementation, but we don't understand why it's here.)
setImmediate(() => {
this.requestCountPerSocket.forEach((_, socket) => socket.destroy());
});
};
hardDestroyAbortSignal?.addEventListener('abort', onAbort);

// Close the server and create a Promise that resolves when all connections
// are closed. Note that we ignore any error from `close` here.
const closePromise = new Promise<void>((resolve) =>
this.server.close(() => {
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
hardDestroyAbortSignal?.removeEventListener('abort', onAbort);
resolve();
}),
);
Expand Down

0 comments on commit 3a4823e

Please sign in to comment.