From 796d6c8cf5547ee55cba171e468d0f7b379cc7be Mon Sep 17 00:00:00 2001 From: Lukas Taegert-Atkinson Date: Tue, 30 Aug 2022 14:06:38 +0200 Subject: [PATCH] [v3.0] Custom awaiting watch emitter (#4609) * Use a custom watch emitter implementation * Add documentation * Improve coverage --- docs/02-javascript-api.md | 10 ++++- src/Graph.ts | 4 +- src/rollup/types.d.ts | 55 ++++++++++++------------ src/watch/WatchEmitter.ts | 88 +++++++++++++++++++++++++-------------- src/watch/watch.ts | 27 ++++++------ test/watch/index.js | 53 +++++++++++++++++++++++ 6 files changed, 160 insertions(+), 77 deletions(-) diff --git a/docs/02-javascript-api.md b/docs/02-javascript-api.md index 87bdc5b995e..c4cc17bfae8 100755 --- a/docs/02-javascript-api.md +++ b/docs/02-javascript-api.md @@ -223,6 +223,8 @@ watcher.on('event', event => { // bundle object for output generation errors. As with // "BUNDLE_END", you should call "event.result.close()" if // present once you are done. + // If you return a Promise from your event handler, Rollup will wait until the + // Promise is resolved before continuing. }); // This will make sure that bundles are properly closed after each run @@ -232,7 +234,13 @@ watcher.on('event', ({ result }) => { } }); -// stop watching +// Additionally, you can hook into the following. Again, return a Promise to +// make Rollup wait at that stage: +watcher.on('change', (id, { event }) => { /* a file was modified */ }) +watcher.on('restart', () => { /* a new run was triggered */ }) +watcher.on('close', () => { /* the watcher was closed, see below */ }) + +// to stop watching watcher.close(); ``` diff --git a/src/Graph.ts b/src/Graph.ts index eda8e66e2dd..4b68bad11fc 100644 --- a/src/Graph.ts +++ b/src/Graph.ts @@ -87,8 +87,8 @@ export default class Graph { const handleChange = (...args: Parameters) => this.pluginDriver.hookParallel('watchChange', args); const handleClose = () => this.pluginDriver.hookParallel('closeWatcher', []); - watcher.onCurrentAwaited('change', handleChange); - watcher.onCurrentAwaited('close', handleClose); + watcher.onCurrentRun('change', handleChange); + watcher.onCurrentRun('close', handleClose); } this.pluginDriver = new PluginDriver(this, options, options.plugins, this.pluginCache); this.acornParser = acorn.Parser.extend(...(options.acornInjectPlugins as any)); diff --git a/src/rollup/types.d.ts b/src/rollup/types.d.ts index 2600129111f..f03e6fdb480 100644 --- a/src/rollup/types.d.ts +++ b/src/rollup/types.d.ts @@ -833,40 +833,37 @@ export interface RollupWatchOptions extends InputOptions { watch?: WatcherOptions | false; } -interface TypedEventEmitter any }> { - addListener(event: K, listener: T[K]): this; - emit(event: K, ...args: Parameters): boolean; - eventNames(): Array; - getMaxListeners(): number; - listenerCount(type: keyof T): number; - listeners(event: K): Array; - off(event: K, listener: T[K]): this; - on(event: K, listener: T[K]): this; - once(event: K, listener: T[K]): this; - prependListener(event: K, listener: T[K]): this; - prependOnceListener(event: K, listener: T[K]): this; - rawListeners(event: K): Array; - removeAllListeners(event?: K): this; - removeListener(event: K, listener: T[K]): this; - setMaxListeners(n: number): this; -} - -export interface RollupAwaitingEmitter any }> - extends TypedEventEmitter { +export type AwaitedEventListener< + T extends { [event: string]: (...args: any) => any }, + K extends keyof T +> = (...args: Parameters) => void | Promise; + +export interface AwaitingEventEmitter any }> { close(): Promise; - emitAndAwait(event: K, ...args: Parameters): Promise[]>; + emit(event: K, ...args: Parameters): Promise; + /** + * Removes an event listener. + */ + off(event: K, listener: AwaitedEventListener): this; + /** + * Registers an event listener that will be awaited before Rollup continues. + * All listeners will be awaited in parallel while rejections are tracked via + * Promise.all. + */ + on(event: K, listener: AwaitedEventListener): this; /** - * Registers an event listener that will be awaited before Rollup continues - * for events emitted via emitAndAwait. All listeners will be awaited in - * parallel while rejections are tracked via Promise.all. - * Listeners are removed automatically when removeAwaited is called, which - * happens automatically after each run. + * Registers an event listener that will be awaited before Rollup continues. + * All listeners will be awaited in parallel while rejections are tracked via + * Promise.all. + * Listeners are removed automatically when removeListenersForCurrentRun is + * called, which happens automatically after each run. */ - onCurrentAwaited( + onCurrentRun( event: K, listener: (...args: Parameters) => Promise> ): this; - removeAwaited(): this; + removeAllListeners(): this; + removeListenersForCurrentRun(): this; } export type RollupWatcherEvent = @@ -882,7 +879,7 @@ export type RollupWatcherEvent = | { code: 'END' } | { code: 'ERROR'; error: RollupError; result: RollupBuild | null }; -export type RollupWatcher = RollupAwaitingEmitter<{ +export type RollupWatcher = AwaitingEventEmitter<{ change: (id: string, change: { event: ChangeEvent }) => void; close: () => void; event: (event: RollupWatcherEvent) => void; diff --git a/src/watch/WatchEmitter.ts b/src/watch/WatchEmitter.ts index f56bb664203..564f2abfe43 100644 --- a/src/watch/WatchEmitter.ts +++ b/src/watch/WatchEmitter.ts @@ -1,48 +1,72 @@ -import { EventEmitter } from 'node:events'; +import { AwaitedEventListener, AwaitingEventEmitter } from '../rollup/types'; -type PromiseReturn any> = ( - ...args: Parameters -) => Promise>; - -export class WatchEmitter< - T extends { [event: string]: (...args: any) => any } -> extends EventEmitter { - private awaitedHandlers: { - [K in keyof T]?: PromiseReturn[]; +export class WatchEmitter any }> + implements AwaitingEventEmitter +{ + private currentHandlers: { + [K in keyof T]?: AwaitedEventListener[]; + } = Object.create(null); + private persistentHandlers: { + [K in keyof T]?: AwaitedEventListener[]; } = Object.create(null); - - constructor() { - super(); - // Allows more than 10 bundles to be watched without - // showing the `MaxListenersExceededWarning` to the user. - this.setMaxListeners(Infinity); - } // Will be overwritten by Rollup async close(): Promise {} - emitAndAwait( - event: K, - ...args: Parameters - ): Promise[]> { - this.emit(event as string, ...(args as any[])); - return Promise.all(this.getHandlers(event).map(handler => handler(...args))); + emit(event: K, ...args: Parameters): Promise { + return Promise.all( + this.getCurrentHandlers(event) + .concat(this.getPersistentHandlers(event)) + .map(handler => handler(...args)) + ); + } + + off(event: K, listener: AwaitedEventListener): this { + const listeners = this.persistentHandlers[event]; + if (listeners) { + // A hack stolen from "mitt": ">>> 0" does not change numbers >= 0, but -1 + // (which would remove the last array element if used unchanged) is turned + // into max_int, which is outside the array and does not change anything. + listeners.splice(listeners.indexOf(listener) >>> 0, 1); + } + return this; + } + + on(event: K, listener: AwaitedEventListener): this { + this.getPersistentHandlers(event).push(listener); + return this; } - onCurrentAwaited( - event: K, - listener: (...args: Parameters) => Promise> - ): this { - this.getHandlers(event).push(listener); + onCurrentRun(event: K, listener: AwaitedEventListener): this { + this.getCurrentHandlers(event).push(listener); return this; } - removeAwaited(): this { - this.awaitedHandlers = {}; + once(event: K, listener: AwaitedEventListener): this { + const selfRemovingListener: AwaitedEventListener = (...args) => { + this.off(event, selfRemovingListener); + return listener(...args); + }; + this.on(event, selfRemovingListener); return this; } - private getHandlers(event: K): PromiseReturn[] { - return this.awaitedHandlers[event] || (this.awaitedHandlers[event] = []); + removeAllListeners(): this { + this.removeListenersForCurrentRun(); + this.persistentHandlers = Object.create(null); + return this; + } + + removeListenersForCurrentRun(): this { + this.currentHandlers = Object.create(null); + return this; + } + + private getCurrentHandlers(event: K): AwaitedEventListener[] { + return this.currentHandlers[event] || (this.currentHandlers[event] = []); + } + + private getPersistentHandlers(event: K): AwaitedEventListener[] { + return this.persistentHandlers[event] || (this.persistentHandlers[event] = []); } } diff --git a/src/watch/watch.ts b/src/watch/watch.ts index 22f34b5195c..907c4a64e55 100644 --- a/src/watch/watch.ts +++ b/src/watch/watch.ts @@ -38,6 +38,7 @@ export class Watcher { private buildDelay = 0; private buildTimeout: NodeJS.Timer | null = null; + private closed = false; private readonly invalidatedIds = new Map(); private rerun = false; private running = true; @@ -58,11 +59,13 @@ export class Watcher { } async close(): Promise { + if (this.closed) return; + this.closed = true; if (this.buildTimeout) clearTimeout(this.buildTimeout); for (const task of this.tasks) { task.close(); } - await this.emitter.emitAndAwait('close'); + await this.emitter.emit('close'); this.emitter.removeAllListeners(); } @@ -91,22 +94,20 @@ export class Watcher { this.buildTimeout = null; try { await Promise.all( - [...this.invalidatedIds].map(([id, event]) => - this.emitter.emitAndAwait('change', id, { event }) - ) + [...this.invalidatedIds].map(([id, event]) => this.emitter.emit('change', id, { event })) ); this.invalidatedIds.clear(); - this.emitter.emit('restart'); - this.emitter.removeAwaited(); + await this.emitter.emit('restart'); + this.emitter.removeListenersForCurrentRun(); this.run(); } catch (error: any) { this.invalidatedIds.clear(); - this.emitter.emit('event', { + await this.emitter.emit('event', { code: 'ERROR', error, result: null }); - this.emitter.emit('event', { + await this.emitter.emit('event', { code: 'END' }); } @@ -115,7 +116,7 @@ export class Watcher { private async run(): Promise { this.running = true; - this.emitter.emit('event', { + await this.emitter.emit('event', { code: 'START' }); @@ -124,7 +125,7 @@ export class Watcher { } this.running = false; - this.emitter.emit('event', { + await this.emitter.emit('event', { code: 'END' }); if (this.rerun) { @@ -197,7 +198,7 @@ export class Task { const start = Date.now(); - this.watcher.emitter.emit('event', { + await this.watcher.emitter.emit('event', { code: 'BUNDLE_START', input: this.options.input, output: this.outputFiles @@ -211,7 +212,7 @@ export class Task { } this.updateWatchedFiles(result); this.skipWrite || (await Promise.all(this.outputs.map(output => result!.write(output)))); - this.watcher.emitter.emit('event', { + await this.watcher.emitter.emit('event', { code: 'BUNDLE_END', duration: Date.now() - start, input: this.options.input, @@ -229,7 +230,7 @@ export class Task { this.cache.modules = this.cache.modules.filter(module => module.id !== error.id); } } - this.watcher.emitter.emit('event', { + await this.watcher.emitter.emit('event', { code: 'ERROR', error, result diff --git a/test/watch/index.js b/test/watch/index.js index b6bc33a1e94..85f357c6483 100644 --- a/test/watch/index.js +++ b/test/watch/index.js @@ -134,6 +134,59 @@ describe('rollup.watch', () => { ]); }); + it('waits for event listeners', async () => { + let run = 0; + const events = new Set(); + + await copy('test/watch/samples/basic', 'test/_tmp/input'); + watcher = rollup.watch({ + input: 'test/_tmp/input/main.js', + plugins: { + async writeBundle() { + if (run++ === 0) { + await wait(100); + atomicWriteFileSync('test/_tmp/input/main.js', 'export default 48;'); + await wait(100); + } + if (run === 2) { + watcher.close(); + } + } + }, + output: { + file: 'test/_tmp/output/bundle.js', + format: 'cjs', + exports: 'auto' + } + }); + await new Promise((resolve, reject) => { + let currentEvent = null; + const handleEvent = async (...args) => { + events.add(args[0]?.code); + if (currentEvent) { + watcher.close(); + return reject( + new Error( + `Event ${JSON.stringify(args)} was emitted while handling ${JSON.stringify( + currentEvent + )}.` + ) + ); + } + currentEvent = args; + await wait(100); + currentEvent = null; + }; + // This should work but should not have an effect + watcher.off('event', handleEvent); + watcher.on('event', handleEvent); + watcher.on('change', handleEvent); + watcher.on('restart', handleEvent); + watcher.on('close', resolve); + }); + assert.deepStrictEqual([...events], ['START', 'BUNDLE_START', 'BUNDLE_END', 'END', undefined]); + }); + it('does not fail for virtual files', async () => { await copy('test/watch/samples/basic', 'test/_tmp/input'); watcher = rollup.watch({