diff --git a/.changeset/fluffy-hats-guess.md b/.changeset/fluffy-hats-guess.md new file mode 100644 index 00000000000..9ae150a380d --- /dev/null +++ b/.changeset/fluffy-hats-guess.md @@ -0,0 +1,5 @@ +--- +'@apollo/server': patch +--- + +Refactor the implementation of `ApolloServerPluginDrainHttpServer`'s grace period. This is intended to be a no-op. diff --git a/cspell-dict.txt b/cspell-dict.txt index 965722f11df..b3227461419 100644 --- a/cspell-dict.txt +++ b/cspell-dict.txt @@ -36,6 +36,7 @@ concat contravariance contravariantly createhash +culted dataloaders datasource datasources @@ -174,8 +175,8 @@ subpackage subqueries sumby supergraph -supergraphs Supergraph +supergraphs testful testonly testsuite diff --git a/packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts b/packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts index e315beb469a..b44c56af7a6 100644 --- a/packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts +++ b/packages/server/src/__tests__/plugin/drainHttpServer/stoppable.test.ts @@ -36,6 +36,8 @@ import child from 'child_process'; import path from 'path'; import type { AddressInfo } from 'net'; import { describe, it, expect, afterEach, beforeEach } from '@jest/globals'; +import { AbortController } from 'node-abort-controller'; +import resolvable, { Resolvable } from '@josephg/resolvable'; function port(s: http.Server) { return (s.address() as AddressInfo).port; @@ -163,7 +165,7 @@ Object.keys(schemes).forEach((schemeName) => { expect(gracefully).toBe(true); }); - it('with keep-alive connections', async () => { + it('with idle keep-alive connections', async () => { let closed = 0; const server = scheme.server(); const stopper = new Stopper(server); @@ -192,15 +194,16 @@ Object.keys(schemes).forEach((schemeName) => { }); }); - it('with a 0.5s grace period', async () => { + it('with unfinished requests', async () => { const server = scheme.server((_req, res) => { res.writeHead(200); - res.write('hi'); + res.write('hi'); // note lack of end()! }); const stopper = new Stopper(server); server.listen(0); const p = port(server); await a.event(server, 'listening'); + // Send two requests and wait to receive headers. await Promise.all([ request(`${schemeName}://localhost:${p}`).agent( scheme.agent({ keepAlive: true }), @@ -209,76 +212,120 @@ Object.keys(schemes).forEach((schemeName) => { scheme.agent({ keepAlive: true }), ), ]); - const start = Date.now(); - const closeEventPromise = a.event(server, 'close'); - const gracefully = await stopper.stop(500); + let closeCalled: boolean = false; + const closeEventPromise = a.event(server, 'close').then(() => { + closeCalled = true; + }); + const abortController = new AbortController(); + let gracefully: boolean | null = null; + const stopPromise = stopper + .stop(abortController.signal) + .then((stopReturn) => { + gracefully = stopReturn; + }); + + // Wait a while (number chosen here is arbitrary). Stopping should not have happened yet. + await a.delay(500); + expect(closeCalled).toBe(false); + expect(gracefully).toBeNull(); + + // Now abort it; this should be sufficient for the two promises above to + // resolve. + abortController.abort(); + await stopPromise; await closeEventPromise; - // These tests are a bit flakey; we should figure out a way to usefully - // test the grace period behavior without leading to flakiness due to - // speed variation. - const elapsed = Date.now() - start; - expect(elapsed).toBeGreaterThanOrEqual(450); - expect(elapsed).toBeLessThanOrEqual(550); + expect(closeCalled).toBe(true); expect(gracefully).toBe(false); - // It takes a moment for the `finish` events to happen. - await a.delay(20); - expect(stopper['requestCountPerSocket'].size).toBe(0); + // It takes a moment for the `finish` events to happen. Loop waiting for + // them to finish and update this data structure (if it never happens, + // we'll get a Jest timeout, which is the failure we want). + while (stopper['requestCountPerSocket'].size > 0) { + await a.delay(20); + } }); it('with requests in-flight', async () => { + const barriers: Record> = { + b250: resolvable(), + b500: resolvable(), + }; const server = scheme.server((req, res) => { - const delay = parseInt(req.url!.slice(1), 10); res.writeHead(200); res.write('hello'); - setTimeout(() => res.end('world'), delay); + barriers[req.url!.slice(1)].then(() => res.end('world')); }); const stopper = new Stopper(server); server.listen(0); const p = port(server); await a.event(server, 'listening'); - const start = Date.now(); const res = await Promise.all([ - request(`${schemeName}://localhost:${p}/250`).agent( + request(`${schemeName}://localhost:${p}/b250`).agent( scheme.agent({ keepAlive: true }), ), - request(`${schemeName}://localhost:${p}/500`).agent( + request(`${schemeName}://localhost:${p}/b500`).agent( scheme.agent({ keepAlive: true }), ), ]); - const closeEventPromise = a.event(server, 'close'); - const gracefully = await stopper.stop(); - const bodies = await Promise.all(res.map((r) => r.text())); + let closeCalled: boolean = false; + const closeEventPromise = a.event(server, 'close').then(() => { + closeCalled = true; + }); + let gracefully: boolean | null = null; + const stopPromise = stopper.stop().then((stopReturn) => { + gracefully = stopReturn; + }); + + // Wait a while. Stopping should not have happened yet, + await a.delay(250); + expect(closeCalled).toBe(false); + expect(gracefully).toBeNull(); + + // Let the first request resolve and wait a bit more. Stopping should + // still not have happened. + barriers.b250.resolve(); + await a.delay(250); + expect(closeCalled).toBe(false); + expect(gracefully).toBeNull(); + + // Let the second request resolve. Then things should stop properly. + barriers.b500.resolve(); await closeEventPromise; - expect(bodies[0]).toBe('helloworld'); - // These tests are a bit flakey; we should figure out a way to usefully - // test the grace period behavior without leading to flakiness due to - // speed variation. - const elapsed = Date.now() - start; - expect(elapsed).toBeGreaterThanOrEqual(400); - expect(elapsed).toBeLessThanOrEqual(600); + await stopPromise; + expect(closeCalled).toBe(true); expect(gracefully).toBe(true); + + const bodies = await Promise.all(res.map((r) => r.text())); + expect(bodies).toStrictEqual(['helloworld', 'helloworld']); }); if (schemeName === 'http') { it('with in-flights finishing before grace period ends', async () => { const file = path.join(__dirname, 'stoppable', 'server.js'); - const server = child.spawn('node', [file, '500']); + const server = child.spawn('node', [file]); const [data] = await a.event(server.stdout, 'data'); const port = +data.toString(); expect(typeof port).toBe('number'); - const start = Date.now(); - const res = await request( - `${schemeName}://localhost:${port}/250`, - ).agent(scheme.agent({ keepAlive: true })); - const body = await res.text(); + const res = await request(`${schemeName}://localhost:${port}/`).agent( + scheme.agent({ keepAlive: true }), + ); + let gotBody = false; + const bodyPromise = res.text().then((body: string) => { + gotBody = true; + return body; + }); + + // Wait a while. We shouldn't have finished reading the response yet. + await a.delay(250); + expect(gotBody).toBe(false); + + // Tell the server that its request should finish. + server.kill('SIGUSR1'); + + const body = await bodyPromise; + expect(gotBody).toBe(true); expect(body).toBe('helloworld'); - // These tests are a bit flakey; we should figure out a way to usefully - // test the grace period behavior without leading to flakiness due to - // speed variation. - const elapsed = Date.now() - start; - expect(elapsed).toBeGreaterThanOrEqual(150); - expect(elapsed).toBeLessThanOrEqual(350); + // Wait for subprocess to go away. await a.event(server, 'close'); }); diff --git a/packages/server/src/__tests__/plugin/drainHttpServer/stoppable/server.js b/packages/server/src/__tests__/plugin/drainHttpServer/stoppable/server.js index 414505de6f8..7943953dede 100644 --- a/packages/server/src/__tests__/plugin/drainHttpServer/stoppable/server.js +++ b/packages/server/src/__tests__/plugin/drainHttpServer/stoppable/server.js @@ -1,14 +1,17 @@ +// This server is run by a test in stoppable.test.js. Its HTTP server should +// only ever get one request. It will respond with a 200 and start writing its +// body and then start the Stopper process with no hard-destroy grace period. It +// will finish the request on SIGUSR1. + import http from 'http'; import { Stopper } from '../../../../../dist/esm/plugin/drainHttpServer/stoppable.js'; -const grace = Number(process.argv[2] || Infinity); let stopper; const server = http.createServer((req, res) => { - const delay = parseInt(req.url.slice(1), 10); res.writeHead(200); res.write('hello'); - setTimeout(() => res.end('world'), delay); - stopper.stop(grace); + process.on('SIGUSR1', () => res.end('world')); + stopper.stop(); }); stopper = new Stopper(server); server.listen(0, () => console.log(server.address().port)); diff --git a/packages/server/src/plugin/drainHttpServer/index.ts b/packages/server/src/plugin/drainHttpServer/index.ts index 4493869f6e2..7223e70408e 100644 --- a/packages/server/src/plugin/drainHttpServer/index.ts +++ b/packages/server/src/plugin/drainHttpServer/index.ts @@ -1,4 +1,5 @@ import type http from 'http'; +import { AbortController } from 'node-abort-controller'; import type { ApolloServerPlugin } from '../../externalTypes/index.js'; import { Stopper } from './stoppable.js'; @@ -31,7 +32,23 @@ export function ApolloServerPluginDrainHttpServer( async serverWillStart() { return { async drainServer() { - await stopper.stop(options.stopGracePeriodMillis ?? 10_000); + // Note: we don't use `AbortSignal.timeout()` here because our + // polyfill doesn't support it (and even once we drop Node v14 + // support, if we don't require at least Node v16.14 then the built-in + // version won't support it either). + const hardDestroyAbortController = new AbortController(); + const stopGracePeriodMillis = options.stopGracePeriodMillis ?? 10_000; + let timeout: NodeJS.Timeout | undefined; + if (stopGracePeriodMillis < Infinity) { + timeout = setTimeout( + () => hardDestroyAbortController.abort(), + stopGracePeriodMillis, + ); + } + await stopper.stop(hardDestroyAbortController.signal); + if (timeout) { + clearTimeout(timeout); + } }, }; }, diff --git a/packages/server/src/plugin/drainHttpServer/stoppable.ts b/packages/server/src/plugin/drainHttpServer/stoppable.ts index 27f69e56177..852bbc961af 100644 --- a/packages/server/src/plugin/drainHttpServer/stoppable.ts +++ b/packages/server/src/plugin/drainHttpServer/stoppable.ts @@ -29,6 +29,7 @@ import type http from 'http'; import https from 'https'; import type { Socket } from 'net'; +import type { AbortSignal } from 'node-abort-controller'; export class Stopper { private requestCountPerSocket = new Map(); @@ -65,7 +66,7 @@ export class Stopper { ); } - async stop(stopGracePeriodMillis = Infinity): Promise { + async stop(hardDestroyAbortSignal?: AbortSignal): Promise { let gracefully = true; // In the off-chance that we are calling `stop` directly from within the @@ -75,28 +76,23 @@ export class Stopper { await new Promise((resolve) => setImmediate(resolve)); this.stopped = true; - let timeout: NodeJS.Timeout | null = null; - // Soon, hard-destroy everything. - if (stopGracePeriodMillis < Infinity) { - timeout = setTimeout(() => { - gracefully = false; - this.requestCountPerSocket.forEach((_, socket) => socket.end()); - // (FYI, when importing from upstream, not sure why we need setImmediate - // here.) - setImmediate(() => { - this.requestCountPerSocket.forEach((_, socket) => socket.destroy()); - }); - }, stopGracePeriodMillis); - } + // When told to, hard-destroy everything. + const onAbort = () => { + gracefully = false; + this.requestCountPerSocket.forEach((_, socket) => socket.end()); + // (FYI, this setImmediate was cargo-culted from the original + // implementation, but we don't understand why it's here.) + setImmediate(() => { + this.requestCountPerSocket.forEach((_, socket) => socket.destroy()); + }); + }; + hardDestroyAbortSignal?.addEventListener('abort', onAbort); // Close the server and create a Promise that resolves when all connections // are closed. Note that we ignore any error from `close` here. const closePromise = new Promise((resolve) => this.server.close(() => { - if (timeout) { - clearTimeout(timeout); - timeout = null; - } + hardDestroyAbortSignal?.removeEventListener('abort', onAbort); resolve(); }), );