From 8ab6ff1c841a56200db20f029b3a317dbc39813b Mon Sep 17 00:00:00 2001 From: Gustavo Henke Date: Mon, 25 Mar 2024 08:43:05 +1100 Subject: [PATCH] [breaking] Abort commands not running when max processes < N (#460) --- src/completion-listener.spec.ts | 33 ++++++++++- src/completion-listener.ts | 57 +++++++++++++----- src/concurrently.spec.ts | 10 ++++ src/concurrently.ts | 18 ++++-- src/flow-control/kill-on-signal.spec.ts | 39 +++++-------- src/flow-control/kill-on-signal.ts | 11 +++- src/flow-control/kill-others.spec.ts | 78 ++++++++++++------------- src/flow-control/kill-others.ts | 6 ++ src/index.ts | 8 ++- 9 files changed, 173 insertions(+), 87 deletions(-) diff --git a/src/completion-listener.spec.ts b/src/completion-listener.spec.ts index 054d5926..c4931798 100644 --- a/src/completion-listener.spec.ts +++ b/src/completion-listener.spec.ts @@ -21,12 +21,41 @@ const createController = (successCondition?: SuccessCondition) => scheduler, }); -const emitFakeCloseEvent = (command: FakeCommand, event?: Partial) => - command.close.next(createFakeCloseEvent({ ...event, command, index: command.index })); +const emitFakeCloseEvent = (command: FakeCommand, event?: Partial) => { + const fakeEvent = createFakeCloseEvent({ ...event, command, index: command.index }); + command.state = 'exited'; + command.close.next(fakeEvent); + return fakeEvent; +}; const flushPromises = () => new Promise((resolve) => setTimeout(resolve, 0)); describe('listen', () => { + it('completes only when commands emit a close event, returns close event', async () => { + const abortCtrl = new AbortController(); + const result = createController('all').listen(commands, abortCtrl.signal); + + commands[0].state = 'started'; + abortCtrl.abort(); + + const event = emitFakeCloseEvent(commands[0]); + scheduler.flush(); + + await expect(result).resolves.toHaveLength(1); + await expect(result).resolves.toEqual([event]); + }); + + it('completes when abort signal is received and command is stopped, returns nothing', async () => { + const abortCtrl = new AbortController(); + // Use success condition = first to test index access when there are no close events + const result = createController('first').listen([new FakeCommand()], abortCtrl.signal); + + abortCtrl.abort(); + scheduler.flush(); + + await expect(result).resolves.toHaveLength(0); + }); + it('check for success once all commands have emitted at least a single close event', async () => { const finallyCallback = jest.fn(); const result = createController().listen(commands).finally(finallyCallback); diff --git a/src/completion-listener.ts b/src/completion-listener.ts index eaf72c21..e03414e1 100644 --- a/src/completion-listener.ts +++ b/src/completion-listener.ts @@ -1,5 +1,5 @@ import * as Rx from 'rxjs'; -import { filter, map, switchMap, take } from 'rxjs/operators'; +import { delay, filter, map, switchMap, take } from 'rxjs/operators'; import { CloseEvent, Command } from './command'; @@ -48,6 +48,11 @@ export class CompletionListener { } private isSuccess(events: CloseEvent[]) { + if (!events.length) { + // When every command was aborted, consider a success. + return true; + } + if (this.successCondition === 'first') { return events[0].exitCode === 0; } else if (this.successCondition === 'last') { @@ -56,7 +61,7 @@ export class CompletionListener { const commandSyntaxMatch = this.successCondition.match(/^!?command-(.+)$/); if (commandSyntaxMatch == null) { - // If not a `command-` syntax, then it's an 'all' condition, or it's treated as such. + // If not a `command-` syntax, then it's an 'all' condition or it's treated as such. return events.every(({ exitCode }) => exitCode === 0); } @@ -73,7 +78,7 @@ export class CompletionListener { (event) => targetCommandsEvents.includes(event) || event.exitCode === 0, ); } - // Only the specified commands must exit successfully + // Only the specified commands must exit succesfully return ( targetCommandsEvents.length > 0 && targetCommandsEvents.every((event) => event.exitCode === 0) @@ -84,23 +89,47 @@ export class CompletionListener { * Given a list of commands, wait for all of them to exit and then evaluate their exit codes. * * @returns A Promise that resolves if the success condition is met, or rejects otherwise. + * In either case, the value is a list of close events for commands that spawned. + * Commands that didn't spawn are filtered out. */ - listen(commands: Command[]): Promise { - const closeStreams = commands.map((command) => command.close); + listen(commands: Command[], abortSignal?: AbortSignal): Promise { + const abort = + abortSignal && + Rx.fromEvent(abortSignal, 'abort', { once: true }).pipe( + // The abort signal must happen before commands are killed, otherwise new commands + // might spawn. Because of this, it's not be possible to capture the close events + // without an immediate delay + delay(0, this.scheduler), + map(() => undefined), + ); + const closeStreams = commands.map((command) => + abort + ? // Commands that have been started must close. + Rx.race(command.close, abort.pipe(filter(() => command.state === 'stopped'))) + : command.close, + ); return Rx.lastValueFrom( Rx.combineLatest(closeStreams).pipe( - filter(() => commands.every((command) => command.state !== 'started')), - map((exitInfos) => - exitInfos.sort( - (first, second) => - first.timings.endDate.getTime() - second.timings.endDate.getTime(), + filter((events) => + commands.every( + (command, i) => command.state !== 'started' || events[i] === undefined, ), ), - switchMap((exitInfos) => - this.isSuccess(exitInfos) - ? this.emitWithScheduler(Rx.of(exitInfos)) - : this.emitWithScheduler(Rx.throwError(() => exitInfos)), + map((events) => + events + // Filter out aborts, since they cannot be sorted and are considered success condition anyways + .filter((event): event is CloseEvent => event != null) + // Sort according to exit time + .sort( + (first, second) => + first.timings.endDate.getTime() - second.timings.endDate.getTime(), + ), + ), + switchMap((events) => + this.isSuccess(events) + ? this.emitWithScheduler(Rx.of(events)) + : this.emitWithScheduler(Rx.throwError(() => events)), ), take(1), ), diff --git a/src/concurrently.spec.ts b/src/concurrently.spec.ts index dcd65b0b..a07a6399 100644 --- a/src/concurrently.spec.ts +++ b/src/concurrently.spec.ts @@ -111,6 +111,16 @@ it('spawns commands up to percent based limit at once', () => { expect(spawn).toHaveBeenCalledWith('qux', expect.objectContaining({})); }); +it('does not spawn further commands on abort signal aborted', () => { + const abortController = new AbortController(); + create(['foo', 'bar'], { maxProcesses: 1, abortSignal: abortController.signal }); + expect(spawn).toHaveBeenCalledTimes(1); + + abortController.abort(); + processes[0].emit('close', 0, null); + expect(spawn).toHaveBeenCalledTimes(1); +}); + it('runs controllers with the commands', () => { create(['echo', '"echo wrapped"']); diff --git a/src/concurrently.ts b/src/concurrently.ts index 2c5b9ec8..e995939d 100644 --- a/src/concurrently.ts +++ b/src/concurrently.ts @@ -43,7 +43,8 @@ export type ConcurrentlyResult = { * A promise that resolves when concurrently ran successfully according to the specified * success condition, or reject otherwise. * - * Both the resolved and rejected value is the list of all command's close events. + * Both the resolved and rejected value is a list of all the close events for commands that + * spawned; commands that didn't spawn are filtered out. */ result: Promise; }; @@ -105,6 +106,11 @@ export type ConcurrentlyOptions = { */ successCondition?: SuccessCondition; + /** + * A signal to stop spawning further processes. + */ + abortSignal?: AbortSignal; + /** * Which flow controllers should be applied on commands spawned by concurrently. * Defaults to an empty array. @@ -217,11 +223,11 @@ export function concurrently( : Number(options.maxProcesses)) || commandsLeft.length, ); for (let i = 0; i < maxProcesses; i++) { - maybeRunMore(commandsLeft); + maybeRunMore(commandsLeft, options.abortSignal); } const result = new CompletionListener({ successCondition: options.successCondition }) - .listen(commands) + .listen(commands, options.abortSignal) .finally(() => { handleResult.onFinishCallbacks.forEach((onFinish) => onFinish()); }); @@ -263,14 +269,14 @@ function parseCommand(command: CommandInfo, parsers: CommandParser[]) { ); } -function maybeRunMore(commandsLeft: Command[]) { +function maybeRunMore(commandsLeft: Command[], abortSignal?: AbortSignal) { const command = commandsLeft.shift(); - if (!command) { + if (!command || abortSignal?.aborted) { return; } command.start(); command.close.subscribe(() => { - maybeRunMore(commandsLeft); + maybeRunMore(commandsLeft, abortSignal); }); } diff --git a/src/flow-control/kill-on-signal.spec.ts b/src/flow-control/kill-on-signal.spec.ts index 0780fcc5..020e3ca8 100644 --- a/src/flow-control/kill-on-signal.spec.ts +++ b/src/flow-control/kill-on-signal.spec.ts @@ -7,10 +7,12 @@ import { KillOnSignal } from './kill-on-signal'; let commands: Command[]; let controller: KillOnSignal; let process: EventEmitter; +let abortController: AbortController; beforeEach(() => { process = new EventEmitter(); commands = [new FakeCommand(), new FakeCommand()]; - controller = new KillOnSignal({ process }); + abortController = new AbortController(); + controller = new KillOnSignal({ process, abortController }); }); it('returns commands that keep non-close streams from original commands', () => { @@ -51,29 +53,20 @@ it('returns commands that keep non-SIGINT exit codes', () => { expect(callback).toHaveBeenCalledWith(expect.objectContaining({ exitCode: 1 })); }); -it('kills all commands on SIGINT', () => { - controller.handle(commands); - process.emit('SIGINT'); - - expect(process.listenerCount('SIGINT')).toBe(1); - expect(commands[0].kill).toHaveBeenCalledWith('SIGINT'); - expect(commands[1].kill).toHaveBeenCalledWith('SIGINT'); -}); - -it('kills all commands on SIGTERM', () => { - controller.handle(commands); - process.emit('SIGTERM'); +describe.each(['SIGINT', 'SIGTERM', 'SIGHUP'])('on %s', (signal) => { + it('kills all commands', () => { + controller.handle(commands); + process.emit(signal); - expect(process.listenerCount('SIGTERM')).toBe(1); - expect(commands[0].kill).toHaveBeenCalledWith('SIGTERM'); - expect(commands[1].kill).toHaveBeenCalledWith('SIGTERM'); -}); + expect(process.listenerCount(signal)).toBe(1); + expect(commands[0].kill).toHaveBeenCalledWith(signal); + expect(commands[1].kill).toHaveBeenCalledWith(signal); + }); -it('kills all commands on SIGHUP', () => { - controller.handle(commands); - process.emit('SIGHUP'); + it('sends abort signal', () => { + controller.handle(commands); + process.emit(signal); - expect(process.listenerCount('SIGHUP')).toBe(1); - expect(commands[0].kill).toHaveBeenCalledWith('SIGHUP'); - expect(commands[1].kill).toHaveBeenCalledWith('SIGHUP'); + expect(abortController.signal.aborted).toBe(true); + }); }); diff --git a/src/flow-control/kill-on-signal.ts b/src/flow-control/kill-on-signal.ts index 599409aa..eff7eca9 100644 --- a/src/flow-control/kill-on-signal.ts +++ b/src/flow-control/kill-on-signal.ts @@ -10,9 +10,17 @@ import { FlowController } from './flow-controller'; */ export class KillOnSignal implements FlowController { private readonly process: EventEmitter; + private readonly abortController?: AbortController; - constructor({ process }: { process: EventEmitter }) { + constructor({ + process, + abortController, + }: { + process: EventEmitter; + abortController?: AbortController; + }) { this.process = process; + this.abortController = abortController; } handle(commands: Command[]) { @@ -20,6 +28,7 @@ export class KillOnSignal implements FlowController { (['SIGINT', 'SIGTERM', 'SIGHUP'] as NodeJS.Signals[]).forEach((signal) => { this.process.on(signal, () => { caughtSignal = signal; + this.abortController?.abort(); commands.forEach((command) => command.kill(signal)); }); }); diff --git a/src/flow-control/kill-others.spec.ts b/src/flow-control/kill-others.spec.ts index a96e9179..3da3639c 100644 --- a/src/flow-control/kill-others.spec.ts +++ b/src/flow-control/kill-others.spec.ts @@ -14,14 +14,17 @@ beforeAll(() => { let commands: FakeCommand[]; let logger: Logger; +let abortController: AbortController; beforeEach(() => { commands = [new FakeCommand(), new FakeCommand()]; logger = createMockInstance(Logger); + abortController = new AbortController(); }); const createWithConditions = (conditions: ProcessCloseCondition[], killSignal?: string) => new KillOthers({ logger, + abortController, conditions, killSignal, }); @@ -41,26 +44,45 @@ it('does not kill others if condition does not match', () => { expect(commands[1].kill).not.toHaveBeenCalled(); }); -it('kills other killable processes on success', () => { - createWithConditions(['success']).handle(commands); - commands[1].isKillable = true; - commands[0].close.next(createFakeCloseEvent({ exitCode: 0 })); +describe.each(['success', 'failure'] as const)('on %s', (condition) => { + const exitCode = condition === 'success' ? 0 : 1; + const inversedCode = exitCode === 1 ? 0 : 1; - expect(logger.logGlobalEvent).toHaveBeenCalledTimes(1); - expect(logger.logGlobalEvent).toHaveBeenCalledWith('Sending SIGTERM to other processes..'); - expect(commands[0].kill).not.toHaveBeenCalled(); - expect(commands[1].kill).toHaveBeenCalledWith(undefined); -}); + it('kills other killable processes', () => { + createWithConditions([condition]).handle(commands); + commands[1].isKillable = true; + commands[0].close.next(createFakeCloseEvent({ exitCode })); -it('kills other killable processes on success, with specified signal', () => { - createWithConditions(['success'], 'SIGKILL').handle(commands); - commands[1].isKillable = true; - commands[0].close.next(createFakeCloseEvent({ exitCode: 0 })); + expect(logger.logGlobalEvent).toHaveBeenCalledTimes(1); + expect(logger.logGlobalEvent).toHaveBeenCalledWith('Sending SIGTERM to other processes..'); + expect(commands[0].kill).not.toHaveBeenCalled(); + expect(commands[1].kill).toHaveBeenCalledWith(undefined); + }); - expect(logger.logGlobalEvent).toHaveBeenCalledTimes(1); - expect(logger.logGlobalEvent).toHaveBeenCalledWith('Sending SIGKILL to other processes..'); - expect(commands[0].kill).not.toHaveBeenCalled(); - expect(commands[1].kill).toHaveBeenCalledWith('SIGKILL'); + it('kills other killable processes on success, with specified signal', () => { + createWithConditions([condition], 'SIGKILL').handle(commands); + commands[1].isKillable = true; + commands[0].close.next(createFakeCloseEvent({ exitCode })); + + expect(logger.logGlobalEvent).toHaveBeenCalledTimes(1); + expect(logger.logGlobalEvent).toHaveBeenCalledWith('Sending SIGKILL to other processes..'); + expect(commands[0].kill).not.toHaveBeenCalled(); + expect(commands[1].kill).toHaveBeenCalledWith('SIGKILL'); + }); + + it('sends abort signal on condition match', () => { + createWithConditions([condition]).handle(commands); + commands[0].close.next(createFakeCloseEvent({ exitCode })); + + expect(abortController.signal.aborted).toBe(true); + }); + + it('does not send abort signal on condition mismatch', () => { + createWithConditions([condition]).handle(commands); + commands[0].close.next(createFakeCloseEvent({ exitCode: inversedCode })); + + expect(abortController.signal.aborted).toBe(false); + }); }); it('does nothing if called without conditions', () => { @@ -73,28 +95,6 @@ it('does nothing if called without conditions', () => { expect(commands[1].kill).not.toHaveBeenCalled(); }); -it('kills other killable processes on failure', () => { - createWithConditions(['failure']).handle(commands); - commands[1].isKillable = true; - commands[0].close.next(createFakeCloseEvent({ exitCode: 1 })); - - expect(logger.logGlobalEvent).toHaveBeenCalledTimes(1); - expect(logger.logGlobalEvent).toHaveBeenCalledWith('Sending SIGTERM to other processes..'); - expect(commands[0].kill).not.toHaveBeenCalled(); - expect(commands[1].kill).toHaveBeenCalledWith(undefined); -}); - -it('kills other killable processes on failure, with specified signal', () => { - createWithConditions(['failure'], 'SIGKILL').handle(commands); - commands[1].isKillable = true; - commands[0].close.next(createFakeCloseEvent({ exitCode: 1 })); - - expect(logger.logGlobalEvent).toHaveBeenCalledTimes(1); - expect(logger.logGlobalEvent).toHaveBeenCalledWith('Sending SIGKILL to other processes..'); - expect(commands[0].kill).not.toHaveBeenCalled(); - expect(commands[1].kill).toHaveBeenCalledWith('SIGKILL'); -}); - it('does not try to kill processes already dead', () => { createWithConditions(['failure']).handle(commands); commands[0].close.next(createFakeCloseEvent({ exitCode: 1 })); diff --git a/src/flow-control/kill-others.ts b/src/flow-control/kill-others.ts index abc20e06..f693c879 100644 --- a/src/flow-control/kill-others.ts +++ b/src/flow-control/kill-others.ts @@ -12,19 +12,23 @@ export type ProcessCloseCondition = 'failure' | 'success'; */ export class KillOthers implements FlowController { private readonly logger: Logger; + private readonly abortController?: AbortController; private readonly conditions: ProcessCloseCondition[]; private readonly killSignal: string | undefined; constructor({ logger, + abortController, conditions, killSignal, }: { logger: Logger; + abortController?: AbortController; conditions: ProcessCloseCondition | ProcessCloseCondition[]; killSignal: string | undefined; }) { this.logger = logger; + this.abortController = abortController; this.conditions = _.castArray(conditions); this.killSignal = killSignal; } @@ -49,6 +53,8 @@ export class KillOthers implements FlowController { closeStates.forEach((closeState) => closeState.subscribe(() => { + this.abortController?.abort(); + const killableCommands = commands.filter((command) => Command.canKill(command)); if (killableCommands.length) { this.logger.logGlobalEvent( diff --git a/src/index.ts b/src/index.ts index 72f35e72..b95ad194 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,7 +18,7 @@ import { LogTimings } from './flow-control/log-timings'; import { RestartDelay, RestartProcess } from './flow-control/restart-process'; import { Logger } from './logger'; -export type ConcurrentlyOptions = BaseConcurrentlyOptions & { +export type ConcurrentlyOptions = Omit & { // Logger options /** * Which command(s) should have their output hidden. @@ -103,6 +103,8 @@ export function concurrently( timestampFormat: options.timestampFormat, }); + const abortController = new AbortController(); + return createConcurrently(commands, { maxProcesses: options.maxProcesses, raw: options.raw, @@ -111,6 +113,7 @@ export function concurrently( logger, outputStream: options.outputStream || process.stdout, group: options.group, + abortSignal: abortController.signal, controllers: [ new LogError({ logger }), new LogOutput({ logger }), @@ -122,7 +125,7 @@ export function concurrently( options.inputStream || (options.handleInput ? process.stdin : undefined), pauseInputStreamOnFinish: options.pauseInputStreamOnFinish, }), - new KillOnSignal({ process }), + new KillOnSignal({ process, abortController }), new RestartProcess({ logger, delay: options.restartDelay, @@ -132,6 +135,7 @@ export function concurrently( logger, conditions: options.killOthers || [], killSignal: options.killSignal, + abortController, }), new LogTimings({ logger: options.timings ? logger : undefined,