Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

De-flake stoppable tests #7232

Merged
merged 2 commits into from Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fluffy-hats-guess.md
@@ -0,0 +1,5 @@
---
'@apollo/server': patch
---

Refactor the implementation of `ApolloServerPluginDrainHttpServer`'s grace period. This is intended to be a no-op.
3 changes: 2 additions & 1 deletion cspell-dict.txt
Expand Up @@ -36,6 +36,7 @@ concat
contravariance
contravariantly
createhash
culted
dataloaders
datasource
datasources
Expand Down Expand Up @@ -174,8 +175,8 @@ subpackage
subqueries
sumby
supergraph
supergraphs
Supergraph
supergraphs
testful
testonly
testsuite
Expand Down
131 changes: 89 additions & 42 deletions packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts
Expand Up @@ -36,6 +36,8 @@ import child from 'child_process';
import path from 'path';
import type { AddressInfo } from 'net';
import { describe, it, expect, afterEach, beforeEach } from '@jest/globals';
import { AbortController } from 'node-abort-controller';
import resolvable, { Resolvable } from '@josephg/resolvable';

function port(s: http.Server) {
return (s.address() as AddressInfo).port;
Expand Down Expand Up @@ -163,7 +165,7 @@ Object.keys(schemes).forEach((schemeName) => {
expect(gracefully).toBe(true);
});

it('with keep-alive connections', async () => {
it('with idle keep-alive connections', async () => {
let closed = 0;
const server = scheme.server();
const stopper = new Stopper(server);
Expand Down Expand Up @@ -192,15 +194,16 @@ Object.keys(schemes).forEach((schemeName) => {
});
});

it('with a 0.5s grace period', async () => {
it('with unfinished requests', async () => {
const server = scheme.server((_req, res) => {
res.writeHead(200);
res.write('hi');
res.write('hi'); // note lack of end()!
});
const stopper = new Stopper(server);
server.listen(0);
const p = port(server);
await a.event(server, 'listening');
// Send two requests and wait to receive headers.
await Promise.all([
request(`${schemeName}://localhost:${p}`).agent(
scheme.agent({ keepAlive: true }),
Expand All @@ -209,76 +212,120 @@ Object.keys(schemes).forEach((schemeName) => {
scheme.agent({ keepAlive: true }),
),
]);
const start = Date.now();
const closeEventPromise = a.event(server, 'close');
const gracefully = await stopper.stop(500);
let closeCalled: boolean = false;
const closeEventPromise = a.event(server, 'close').then(() => {
closeCalled = true;
});
const abortController = new AbortController();
let gracefully: boolean | null = null;
const stopPromise = stopper
.stop(abortController.signal)
.then((stopReturn) => {
gracefully = stopReturn;
});

// Wait a while (number chosen here is arbitrary). Stopping should not have happened yet.
await a.delay(500);
expect(closeCalled).toBe(false);
expect(gracefully).toBeNull();

// Now abort it; this should be sufficient for the two promises above to
// resolve.
abortController.abort();
await stopPromise;
await closeEventPromise;
// These tests are a bit flakey; we should figure out a way to usefully
// test the grace period behavior without leading to flakiness due to
// speed variation.
const elapsed = Date.now() - start;
expect(elapsed).toBeGreaterThanOrEqual(450);
expect(elapsed).toBeLessThanOrEqual(550);
expect(closeCalled).toBe(true);
expect(gracefully).toBe(false);
// It takes a moment for the `finish` events to happen.
await a.delay(20);
expect(stopper['requestCountPerSocket'].size).toBe(0);
// It takes a moment for the `finish` events to happen. Loop waiting for
// them to finish and update this data structure (if it never happens,
// we'll get a Jest timeout, which is the failure we want).
while (stopper['requestCountPerSocket'].size > 0) {
await a.delay(20);
}
});

it('with requests in-flight', async () => {
const barriers: Record<string, Resolvable<void>> = {
b250: resolvable(),
b500: resolvable(),
};
const server = scheme.server((req, res) => {
const delay = parseInt(req.url!.slice(1), 10);
res.writeHead(200);
res.write('hello');
setTimeout(() => res.end('world'), delay);
barriers[req.url!.slice(1)].then(() => res.end('world'));
});
const stopper = new Stopper(server);

server.listen(0);
const p = port(server);
await a.event(server, 'listening');
const start = Date.now();
const res = await Promise.all([
request(`${schemeName}://localhost:${p}/250`).agent(
request(`${schemeName}://localhost:${p}/b250`).agent(
scheme.agent({ keepAlive: true }),
),
request(`${schemeName}://localhost:${p}/500`).agent(
request(`${schemeName}://localhost:${p}/b500`).agent(
scheme.agent({ keepAlive: true }),
),
]);
const closeEventPromise = a.event(server, 'close');
const gracefully = await stopper.stop();
const bodies = await Promise.all(res.map((r) => r.text()));
let closeCalled: boolean = false;
const closeEventPromise = a.event(server, 'close').then(() => {
closeCalled = true;
});
let gracefully: boolean | null = null;
const stopPromise = stopper.stop().then((stopReturn) => {
gracefully = stopReturn;
});

// Wait a while. Stopping should not have happened yet,
await a.delay(250);
expect(closeCalled).toBe(false);
expect(gracefully).toBeNull();

// Let the first request resolve and wait a bit more. Stopping should
// still not have happened.
barriers.b250.resolve();
await a.delay(250);
expect(closeCalled).toBe(false);
expect(gracefully).toBeNull();

// Let the second request resolve. Then things should stop properly.
barriers.b500.resolve();
await closeEventPromise;
expect(bodies[0]).toBe('helloworld');
// These tests are a bit flakey; we should figure out a way to usefully
// test the grace period behavior without leading to flakiness due to
// speed variation.
const elapsed = Date.now() - start;
expect(elapsed).toBeGreaterThanOrEqual(400);
expect(elapsed).toBeLessThanOrEqual(600);
await stopPromise;
expect(closeCalled).toBe(true);
expect(gracefully).toBe(true);

const bodies = await Promise.all(res.map((r) => r.text()));
expect(bodies).toStrictEqual(['helloworld', 'helloworld']);
});

if (schemeName === 'http') {
it('with in-flights finishing before grace period ends', async () => {
const file = path.join(__dirname, 'stoppable', 'server.js');
const server = child.spawn('node', [file, '500']);
const server = child.spawn('node', [file]);
const [data] = await a.event(server.stdout, 'data');
const port = +data.toString();
expect(typeof port).toBe('number');
const start = Date.now();
const res = await request(
`${schemeName}://localhost:${port}/250`,
).agent(scheme.agent({ keepAlive: true }));
const body = await res.text();
const res = await request(`${schemeName}://localhost:${port}/`).agent(
scheme.agent({ keepAlive: true }),
);
let gotBody = false;
const bodyPromise = res.text().then((body: string) => {
gotBody = true;
return body;
});

// Wait a while. We shouldn't have finished reading the response yet.
await a.delay(250);
expect(gotBody).toBe(false);

// Tell the server that its request should finish.
server.kill('SIGUSR1');

const body = await bodyPromise;
expect(gotBody).toBe(true);
expect(body).toBe('helloworld');
// These tests are a bit flakey; we should figure out a way to usefully
// test the grace period behavior without leading to flakiness due to
// speed variation.
const elapsed = Date.now() - start;
expect(elapsed).toBeGreaterThanOrEqual(150);
expect(elapsed).toBeLessThanOrEqual(350);

// Wait for subprocess to go away.
await a.event(server, 'close');
});
Expand Down
@@ -1,14 +1,17 @@
// This server is run by a test in stoppable.test.js. Its HTTP server should
// only ever get one request. It will respond with a 200 and start writing its
// body and then start the Stopper process with no hard-destroy grace period. It
// will finish the request on SIGUSR1.

import http from 'http';
import { Stopper } from '../../../../../dist/esm/plugin/drainHttpServer/stoppable.js';

const grace = Number(process.argv[2] || Infinity);
let stopper;
const server = http.createServer((req, res) => {
const delay = parseInt(req.url.slice(1), 10);
res.writeHead(200);
res.write('hello');
setTimeout(() => res.end('world'), delay);
stopper.stop(grace);
process.on('SIGUSR1', () => res.end('world'));
stopper.stop();
});
stopper = new Stopper(server);
server.listen(0, () => console.log(server.address().port));
19 changes: 18 additions & 1 deletion packages/server/src/plugin/drainHttpServer/index.ts
@@ -1,4 +1,5 @@
import type http from 'http';
import { AbortController } from 'node-abort-controller';
import type { ApolloServerPlugin } from '../../externalTypes/index.js';
import { Stopper } from './stoppable.js';

Expand Down Expand Up @@ -31,7 +32,23 @@ export function ApolloServerPluginDrainHttpServer(
async serverWillStart() {
return {
async drainServer() {
await stopper.stop(options.stopGracePeriodMillis ?? 10_000);
// Note: we don't use `AbortSignal.timeout()` here because our
// polyfill doesn't support it (and even once we drop Node v14
// support, if we don't require at least Node v16.14 then the built-in
// version won't support it either).
const hardDestroyAbortController = new AbortController();
const stopGracePeriodMillis = options.stopGracePeriodMillis ?? 10_000;
let timeout: NodeJS.Timeout | undefined;
if (stopGracePeriodMillis < Infinity) {
timeout = setTimeout(
() => hardDestroyAbortController.abort(),
stopGracePeriodMillis,
);
}
await stopper.stop(hardDestroyAbortController.signal);
if (timeout) {
clearTimeout(timeout);
}
},
};
},
Expand Down
32 changes: 14 additions & 18 deletions packages/server/src/plugin/drainHttpServer/stoppable.ts
Expand Up @@ -29,6 +29,7 @@
import type http from 'http';
import https from 'https';
import type { Socket } from 'net';
import type { AbortSignal } from 'node-abort-controller';

export class Stopper {
private requestCountPerSocket = new Map<Socket, number>();
Expand Down Expand Up @@ -65,7 +66,7 @@ export class Stopper {
);
}

async stop(stopGracePeriodMillis = Infinity): Promise<boolean> {
async stop(hardDestroyAbortSignal?: AbortSignal): Promise<boolean> {
let gracefully = true;

// In the off-chance that we are calling `stop` directly from within the
Expand All @@ -75,28 +76,23 @@ export class Stopper {
await new Promise<void>((resolve) => setImmediate(resolve));
this.stopped = true;

let timeout: NodeJS.Timeout | null = null;
// Soon, hard-destroy everything.
if (stopGracePeriodMillis < Infinity) {
timeout = setTimeout(() => {
gracefully = false;
this.requestCountPerSocket.forEach((_, socket) => socket.end());
// (FYI, when importing from upstream, not sure why we need setImmediate
// here.)
setImmediate(() => {
this.requestCountPerSocket.forEach((_, socket) => socket.destroy());
});
}, stopGracePeriodMillis);
}
// When told to, hard-destroy everything.
const onAbort = () => {
gracefully = false;
this.requestCountPerSocket.forEach((_, socket) => socket.end());
// (FYI, this setImmediate was cargo-culted from the original
// implementation, but we don't understand why it's here.)
setImmediate(() => {
this.requestCountPerSocket.forEach((_, socket) => socket.destroy());
});
};
hardDestroyAbortSignal?.addEventListener('abort', onAbort);

// Close the server and create a Promise that resolves when all connections
// are closed. Note that we ignore any error from `close` here.
const closePromise = new Promise<void>((resolve) =>
this.server.close(() => {
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
hardDestroyAbortSignal?.removeEventListener('abort', onAbort);
resolve();
}),
);
Expand Down