From 3a4823e0d85afb51b7fb82a9f3a525c1957eab5d Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 8 Dec 2022 11:05:59 -0800 Subject: [PATCH] De-flake `stoppable` tests (#7232) We've had a lot of issues where `stoppable` tests are flaky because they depend on measuring the time that something takes and hoping it's relatively close to a particular timeout that's supposed to be "controlling" the observed behavior. This PR refactors the internal Stoppable object so that instead of taking a millisecond timeout value, it takes a DOM-style AbortSignal. This object is only used by ApolloServerPluginDrainHttpServer; we move the setTimeout from inside Stoppable into that plugin, which now creates an AbortController polyfilled from `node-abort-controller` (like we already are in the usage reporting plugin; note that once we drop Node v14 support we can switch to the built-in implementation). Now the Stoppable test can control the grace period directly via AbortSignals rather than indirectly based on timeouts. This lets us drop all of the time measurements from the test. There still are some delays but they are just of the form "wait some time and double-check that nothing happened": the worst case scenario here is that a test passes that should fail because the thing would have happened if you'd waited slightly longer, but there shouldn't be any spurious failures. Fixes #6963. Co-authored-by: Trevor Scheer --- .changeset/fluffy-hats-guess.md | 5 + cspell-dict.txt | 3 +- .../plugin/drainHttpServer/stoppable.test.ts | 131 ++++++++++++------ .../drainHttpServer/stoppable/server.js | 11 +- .../src/plugin/drainHttpServer/index.ts | 19 ++- .../src/plugin/drainHttpServer/stoppable.ts | 32 ++--- 6 files changed, 135 insertions(+), 66 deletions(-) create mode 100644 .changeset/fluffy-hats-guess.md diff --git a/.changeset/fluffy-hats-guess.md b/.changeset/fluffy-hats-guess.md new file mode 100644 index 00000000000..9ae150a380d --- /dev/null +++ b/.changeset/fluffy-hats-guess.md @@ -0,0 +1,5 @@ +--- +'@apollo/server': patch +--- + +Refactor the implementation of `ApolloServerPluginDrainHttpServer`'s grace period. This is intended to be a no-op. diff --git a/cspell-dict.txt b/cspell-dict.txt index 965722f11df..b3227461419 100644 --- a/cspell-dict.txt +++ b/cspell-dict.txt @@ -36,6 +36,7 @@ concat contravariance contravariantly createhash +culted dataloaders datasource datasources @@ -174,8 +175,8 @@ subpackage subqueries sumby supergraph -supergraphs Supergraph +supergraphs testful testonly testsuite diff --git a/packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts b/packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts index e315beb469a..b44c56af7a6 100644 --- a/packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts +++ b/packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts @@ -36,6 +36,8 @@ import child from 'child_process'; import path from 'path'; import type { AddressInfo } from 'net'; import { describe, it, expect, afterEach, beforeEach } from '@jest/globals'; +import { AbortController } from 'node-abort-controller'; +import resolvable, { Resolvable } from '@josephg/resolvable'; function port(s: http.Server) { return (s.address() as AddressInfo).port; @@ -163,7 +165,7 @@ Object.keys(schemes).forEach((schemeName) => { expect(gracefully).toBe(true); }); - it('with keep-alive connections', async () => { + it('with idle keep-alive connections', async () => { let closed = 0; const server = scheme.server(); const stopper = new Stopper(server); @@ -192,15 +194,16 @@ Object.keys(schemes).forEach((schemeName) => { }); }); - it('with a 0.5s grace period', async () => { + it('with unfinished requests', async () => { const server = scheme.server((_req, res) => { res.writeHead(200); - res.write('hi'); + res.write('hi'); // note lack of end()! }); const stopper = new Stopper(server); server.listen(0); const p = port(server); await a.event(server, 'listening'); + // Send two requests and wait to receive headers. await Promise.all([ request(`${schemeName}://localhost:${p}`).agent( scheme.agent({ keepAlive: true }), @@ -209,76 +212,120 @@ Object.keys(schemes).forEach((schemeName) => { scheme.agent({ keepAlive: true }), ), ]); - const start = Date.now(); - const closeEventPromise = a.event(server, 'close'); - const gracefully = await stopper.stop(500); + let closeCalled: boolean = false; + const closeEventPromise = a.event(server, 'close').then(() => { + closeCalled = true; + }); + const abortController = new AbortController(); + let gracefully: boolean | null = null; + const stopPromise = stopper + .stop(abortController.signal) + .then((stopReturn) => { + gracefully = stopReturn; + }); + + // Wait a while (number chosen here is arbitrary). Stopping should not have happened yet. + await a.delay(500); + expect(closeCalled).toBe(false); + expect(gracefully).toBeNull(); + + // Now abort it; this should be sufficient for the two promises above to + // resolve. + abortController.abort(); + await stopPromise; await closeEventPromise; - // These tests are a bit flakey; we should figure out a way to usefully - // test the grace period behavior without leading to flakiness due to - // speed variation. - const elapsed = Date.now() - start; - expect(elapsed).toBeGreaterThanOrEqual(450); - expect(elapsed).toBeLessThanOrEqual(550); + expect(closeCalled).toBe(true); expect(gracefully).toBe(false); - // It takes a moment for the `finish` events to happen. - await a.delay(20); - expect(stopper['requestCountPerSocket'].size).toBe(0); + // It takes a moment for the `finish` events to happen. Loop waiting for + // them to finish and update this data structure (if it never happens, + // we'll get a Jest timeout, which is the failure we want). + while (stopper['requestCountPerSocket'].size > 0) { + await a.delay(20); + } }); it('with requests in-flight', async () => { + const barriers: Record> = { + b250: resolvable(), + b500: resolvable(), + }; const server = scheme.server((req, res) => { - const delay = parseInt(req.url!.slice(1), 10); res.writeHead(200); res.write('hello'); - setTimeout(() => res.end('world'), delay); + barriers[req.url!.slice(1)].then(() => res.end('world')); }); const stopper = new Stopper(server); server.listen(0); const p = port(server); await a.event(server, 'listening'); - const start = Date.now(); const res = await Promise.all([ - request(`${schemeName}://localhost:${p}/250`).agent( + request(`${schemeName}://localhost:${p}/b250`).agent( scheme.agent({ keepAlive: true }), ), - request(`${schemeName}://localhost:${p}/500`).agent( + request(`${schemeName}://localhost:${p}/b500`).agent( scheme.agent({ keepAlive: true }), ), ]); - const closeEventPromise = a.event(server, 'close'); - const gracefully = await stopper.stop(); - const bodies = await Promise.all(res.map((r) => r.text())); + let closeCalled: boolean = false; + const closeEventPromise = a.event(server, 'close').then(() => { + closeCalled = true; + }); + let gracefully: boolean | null = null; + const stopPromise = stopper.stop().then((stopReturn) => { + gracefully = stopReturn; + }); + + // Wait a while. Stopping should not have happened yet, + await a.delay(250); + expect(closeCalled).toBe(false); + expect(gracefully).toBeNull(); + + // Let the first request resolve and wait a bit more. Stopping should + // still not have happened. + barriers.b250.resolve(); + await a.delay(250); + expect(closeCalled).toBe(false); + expect(gracefully).toBeNull(); + + // Let the second request resolve. Then things should stop properly. + barriers.b500.resolve(); await closeEventPromise; - expect(bodies[0]).toBe('helloworld'); - // These tests are a bit flakey; we should figure out a way to usefully - // test the grace period behavior without leading to flakiness due to - // speed variation. - const elapsed = Date.now() - start; - expect(elapsed).toBeGreaterThanOrEqual(400); - expect(elapsed).toBeLessThanOrEqual(600); + await stopPromise; + expect(closeCalled).toBe(true); expect(gracefully).toBe(true); + + const bodies = await Promise.all(res.map((r) => r.text())); + expect(bodies).toStrictEqual(['helloworld', 'helloworld']); }); if (schemeName === 'http') { it('with in-flights finishing before grace period ends', async () => { const file = path.join(__dirname, 'stoppable', 'server.js'); - const server = child.spawn('node', [file, '500']); + const server = child.spawn('node', [file]); const [data] = await a.event(server.stdout, 'data'); const port = +data.toString(); expect(typeof port).toBe('number'); - const start = Date.now(); - const res = await request( - `${schemeName}://localhost:${port}/250`, - ).agent(scheme.agent({ keepAlive: true })); - const body = await res.text(); + const res = await request(`${schemeName}://localhost:${port}/`).agent( + scheme.agent({ keepAlive: true }), + ); + let gotBody = false; + const bodyPromise = res.text().then((body: string) => { + gotBody = true; + return body; + }); + + // Wait a while. We shouldn't have finished reading the response yet. + await a.delay(250); + expect(gotBody).toBe(false); + + // Tell the server that its request should finish. + server.kill('SIGUSR1'); + + const body = await bodyPromise; + expect(gotBody).toBe(true); expect(body).toBe('helloworld'); - // These tests are a bit flakey; we should figure out a way to usefully - // test the grace period behavior without leading to flakiness due to - // speed variation. - const elapsed = Date.now() - start; - expect(elapsed).toBeGreaterThanOrEqual(150); - expect(elapsed).toBeLessThanOrEqual(350); + // Wait for subprocess to go away. await a.event(server, 'close'); }); diff --git a/packages/server/src/__tests__/plugin/drainHttpServer/stoppable/server.js b/packages/server/src/__tests__/plugin/drainHttpServer/stoppable/server.js index 414505de6f8..7943953dede 100644 --- a/packages/server/src/__tests__/plugin/drainHttpServer/stoppable/server.js +++ b/packages/server/src/__tests__/plugin/drainHttpServer/stoppable/server.js @@ -1,14 +1,17 @@ +// This server is run by a test in stoppable.test.js. Its HTTP server should +// only ever get one request. It will respond with a 200 and start writing its +// body and then start the Stopper process with no hard-destroy grace period. It +// will finish the request on SIGUSR1. + import http from 'http'; import { Stopper } from '../../../../../dist/esm/plugin/drainHttpServer/stoppable.js'; -const grace = Number(process.argv[2] || Infinity); let stopper; const server = http.createServer((req, res) => { - const delay = parseInt(req.url.slice(1), 10); res.writeHead(200); res.write('hello'); - setTimeout(() => res.end('world'), delay); - stopper.stop(grace); + process.on('SIGUSR1', () => res.end('world')); + stopper.stop(); }); stopper = new Stopper(server); server.listen(0, () => console.log(server.address().port)); diff --git a/packages/server/src/plugin/drainHttpServer/index.ts b/packages/server/src/plugin/drainHttpServer/index.ts index 4493869f6e2..7223e70408e 100644 --- a/packages/server/src/plugin/drainHttpServer/index.ts +++ b/packages/server/src/plugin/drainHttpServer/index.ts @@ -1,4 +1,5 @@ import type http from 'http'; +import { AbortController } from 'node-abort-controller'; import type { ApolloServerPlugin } from '../../externalTypes/index.js'; import { Stopper } from './stoppable.js'; @@ -31,7 +32,23 @@ export function ApolloServerPluginDrainHttpServer( async serverWillStart() { return { async drainServer() { - await stopper.stop(options.stopGracePeriodMillis ?? 10_000); + // Note: we don't use `AbortSignal.timeout()` here because our + // polyfill doesn't support it (and even once we drop Node v14 + // support, if we don't require at least Node v16.14 then the built-in + // version won't support it either). + const hardDestroyAbortController = new AbortController(); + const stopGracePeriodMillis = options.stopGracePeriodMillis ?? 10_000; + let timeout: NodeJS.Timeout | undefined; + if (stopGracePeriodMillis < Infinity) { + timeout = setTimeout( + () => hardDestroyAbortController.abort(), + stopGracePeriodMillis, + ); + } + await stopper.stop(hardDestroyAbortController.signal); + if (timeout) { + clearTimeout(timeout); + } }, }; }, diff --git a/packages/server/src/plugin/drainHttpServer/stoppable.ts b/packages/server/src/plugin/drainHttpServer/stoppable.ts index 27f69e56177..852bbc961af 100644 --- a/packages/server/src/plugin/drainHttpServer/stoppable.ts +++ b/packages/server/src/plugin/drainHttpServer/stoppable.ts @@ -29,6 +29,7 @@ import type http from 'http'; import https from 'https'; import type { Socket } from 'net'; +import type { AbortSignal } from 'node-abort-controller'; export class Stopper { private requestCountPerSocket = new Map(); @@ -65,7 +66,7 @@ export class Stopper { ); } - async stop(stopGracePeriodMillis = Infinity): Promise { + async stop(hardDestroyAbortSignal?: AbortSignal): Promise { let gracefully = true; // In the off-chance that we are calling `stop` directly from within the @@ -75,28 +76,23 @@ export class Stopper { await new Promise((resolve) => setImmediate(resolve)); this.stopped = true; - let timeout: NodeJS.Timeout | null = null; - // Soon, hard-destroy everything. - if (stopGracePeriodMillis < Infinity) { - timeout = setTimeout(() => { - gracefully = false; - this.requestCountPerSocket.forEach((_, socket) => socket.end()); - // (FYI, when importing from upstream, not sure why we need setImmediate - // here.) - setImmediate(() => { - this.requestCountPerSocket.forEach((_, socket) => socket.destroy()); - }); - }, stopGracePeriodMillis); - } + // When told to, hard-destroy everything. + const onAbort = () => { + gracefully = false; + this.requestCountPerSocket.forEach((_, socket) => socket.end()); + // (FYI, this setImmediate was cargo-culted from the original + // implementation, but we don't understand why it's here.) + setImmediate(() => { + this.requestCountPerSocket.forEach((_, socket) => socket.destroy()); + }); + }; + hardDestroyAbortSignal?.addEventListener('abort', onAbort); // Close the server and create a Promise that resolves when all connections // are closed. Note that we ignore any error from `close` here. const closePromise = new Promise((resolve) => this.server.close(() => { - if (timeout) { - clearTimeout(timeout); - timeout = null; - } + hardDestroyAbortSignal?.removeEventListener('abort', onAbort); resolve(); }), );