Skip to content

Commit

Permalink
[v3.0] Custom awaiting watch emitter (#4609)
Browse files Browse the repository at this point in the history
* Use a custom watch emitter implementation

* Add documentation

* Improve coverage
  • Loading branch information
lukastaegert committed Sep 6, 2022
1 parent 5e8385a commit a1383e7
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 77 deletions.
10 changes: 9 additions & 1 deletion docs/02-javascript-api.md
Expand Up @@ -223,6 +223,8 @@ watcher.on('event', event => {
// bundle object for output generation errors. As with
// "BUNDLE_END", you should call "event.result.close()" if
// present once you are done.
// If you return a Promise from your event handler, Rollup will wait until the
// Promise is resolved before continuing.
});

// This will make sure that bundles are properly closed after each run
Expand All @@ -232,7 +234,13 @@ watcher.on('event', ({ result }) => {
}
});

// stop watching
// Additionally, you can hook into the following. Again, return a Promise to
// make Rollup wait at that stage:
watcher.on('change', (id, { event }) => { /* a file was modified */ })
watcher.on('restart', () => { /* a new run was triggered */ })
watcher.on('close', () => { /* the watcher was closed, see below */ })

// to stop watching
watcher.close();
```

Expand Down
4 changes: 2 additions & 2 deletions src/Graph.ts
Expand Up @@ -87,8 +87,8 @@ export default class Graph {
const handleChange = (...args: Parameters<WatchChangeHook>) =>
this.pluginDriver.hookParallel('watchChange', args);
const handleClose = () => this.pluginDriver.hookParallel('closeWatcher', []);
watcher.onCurrentAwaited('change', handleChange);
watcher.onCurrentAwaited('close', handleClose);
watcher.onCurrentRun('change', handleChange);
watcher.onCurrentRun('close', handleClose);
}
this.pluginDriver = new PluginDriver(this, options, options.plugins, this.pluginCache);
this.acornParser = acorn.Parser.extend(...(options.acornInjectPlugins as any));
Expand Down
55 changes: 26 additions & 29 deletions src/rollup/types.d.ts
Expand Up @@ -841,40 +841,37 @@ export interface RollupWatchOptions extends InputOptions {
watch?: WatcherOptions | false;
}

interface TypedEventEmitter<T extends { [event: string]: (...args: any) => any }> {
addListener<K extends keyof T>(event: K, listener: T[K]): this;
emit<K extends keyof T>(event: K, ...args: Parameters<T[K]>): boolean;
eventNames(): Array<keyof T>;
getMaxListeners(): number;
listenerCount(type: keyof T): number;
listeners<K extends keyof T>(event: K): Array<T[K]>;
off<K extends keyof T>(event: K, listener: T[K]): this;
on<K extends keyof T>(event: K, listener: T[K]): this;
once<K extends keyof T>(event: K, listener: T[K]): this;
prependListener<K extends keyof T>(event: K, listener: T[K]): this;
prependOnceListener<K extends keyof T>(event: K, listener: T[K]): this;
rawListeners<K extends keyof T>(event: K): Array<T[K]>;
removeAllListeners<K extends keyof T>(event?: K): this;
removeListener<K extends keyof T>(event: K, listener: T[K]): this;
setMaxListeners(n: number): this;
}

export interface RollupAwaitingEmitter<T extends { [event: string]: (...args: any) => any }>
extends TypedEventEmitter<T> {
export type AwaitedEventListener<
T extends { [event: string]: (...args: any) => any },
K extends keyof T
> = (...args: Parameters<T[K]>) => void | Promise<void>;

export interface AwaitingEventEmitter<T extends { [event: string]: (...args: any) => any }> {
close(): Promise<void>;
emitAndAwait<K extends keyof T>(event: K, ...args: Parameters<T[K]>): Promise<ReturnType<T[K]>[]>;
emit<K extends keyof T>(event: K, ...args: Parameters<T[K]>): Promise<unknown>;
/**
* Removes an event listener.
*/
off<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this;
/**
* Registers an event listener that will be awaited before Rollup continues.
* All listeners will be awaited in parallel while rejections are tracked via
* Promise.all.
*/
on<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this;
/**
* Registers an event listener that will be awaited before Rollup continues
* for events emitted via emitAndAwait. All listeners will be awaited in
* parallel while rejections are tracked via Promise.all.
* Listeners are removed automatically when removeAwaited is called, which
* happens automatically after each run.
* Registers an event listener that will be awaited before Rollup continues.
* All listeners will be awaited in parallel while rejections are tracked via
* Promise.all.
* Listeners are removed automatically when removeListenersForCurrentRun is
* called, which happens automatically after each run.
*/
onCurrentAwaited<K extends keyof T>(
onCurrentRun<K extends keyof T>(
event: K,
listener: (...args: Parameters<T[K]>) => Promise<ReturnType<T[K]>>
): this;
removeAwaited(): this;
removeAllListeners(): this;
removeListenersForCurrentRun(): this;
}

export type RollupWatcherEvent =
Expand All @@ -890,7 +887,7 @@ export type RollupWatcherEvent =
| { code: 'END' }
| { code: 'ERROR'; error: RollupError; result: RollupBuild | null };

export type RollupWatcher = RollupAwaitingEmitter<{
export type RollupWatcher = AwaitingEventEmitter<{
change: (id: string, change: { event: ChangeEvent }) => void;
close: () => void;
event: (event: RollupWatcherEvent) => void;
Expand Down
88 changes: 56 additions & 32 deletions src/watch/WatchEmitter.ts
@@ -1,48 +1,72 @@
import { EventEmitter } from 'node:events';
import { AwaitedEventListener, AwaitingEventEmitter } from '../rollup/types';

type PromiseReturn<T extends (...args: any) => any> = (
...args: Parameters<T>
) => Promise<ReturnType<T>>;

export class WatchEmitter<
T extends { [event: string]: (...args: any) => any }
> extends EventEmitter {
private awaitedHandlers: {
[K in keyof T]?: PromiseReturn<T[K]>[];
export class WatchEmitter<T extends { [event: string]: (...args: any) => any }>
implements AwaitingEventEmitter<T>
{
private currentHandlers: {
[K in keyof T]?: AwaitedEventListener<T, K>[];
} = Object.create(null);
private persistentHandlers: {
[K in keyof T]?: AwaitedEventListener<T, K>[];
} = Object.create(null);

constructor() {
super();
// Allows more than 10 bundles to be watched without
// showing the `MaxListenersExceededWarning` to the user.
this.setMaxListeners(Infinity);
}

// Will be overwritten by Rollup
async close(): Promise<void> {}

emitAndAwait<K extends keyof T>(
event: K,
...args: Parameters<T[K]>
): Promise<ReturnType<T[K]>[]> {
this.emit(event as string, ...(args as any[]));
return Promise.all(this.getHandlers(event).map(handler => handler(...args)));
emit<K extends keyof T>(event: K, ...args: Parameters<T[K]>): Promise<unknown> {
return Promise.all(
this.getCurrentHandlers(event)
.concat(this.getPersistentHandlers(event))
.map(handler => handler(...args))
);
}

off<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this {
const listeners = this.persistentHandlers[event];
if (listeners) {
// A hack stolen from "mitt": ">>> 0" does not change numbers >= 0, but -1
// (which would remove the last array element if used unchanged) is turned
// into max_int, which is outside the array and does not change anything.
listeners.splice(listeners.indexOf(listener) >>> 0, 1);
}
return this;
}

on<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this {
this.getPersistentHandlers(event).push(listener);
return this;
}

onCurrentAwaited<K extends keyof T>(
event: K,
listener: (...args: Parameters<T[K]>) => Promise<ReturnType<T[K]>>
): this {
this.getHandlers(event).push(listener);
onCurrentRun<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this {
this.getCurrentHandlers(event).push(listener);
return this;
}

removeAwaited(): this {
this.awaitedHandlers = {};
once<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this {
const selfRemovingListener: AwaitedEventListener<T, K> = (...args) => {
this.off(event, selfRemovingListener);
return listener(...args);
};
this.on(event, selfRemovingListener);
return this;
}

private getHandlers<K extends keyof T>(event: K): PromiseReturn<T[K]>[] {
return this.awaitedHandlers[event] || (this.awaitedHandlers[event] = []);
removeAllListeners(): this {
this.removeListenersForCurrentRun();
this.persistentHandlers = Object.create(null);
return this;
}

removeListenersForCurrentRun(): this {
this.currentHandlers = Object.create(null);
return this;
}

private getCurrentHandlers<K extends keyof T>(event: K): AwaitedEventListener<T, K>[] {
return this.currentHandlers[event] || (this.currentHandlers[event] = []);
}

private getPersistentHandlers<K extends keyof T>(event: K): AwaitedEventListener<T, K>[] {
return this.persistentHandlers[event] || (this.persistentHandlers[event] = []);
}
}
27 changes: 14 additions & 13 deletions src/watch/watch.ts
Expand Up @@ -38,6 +38,7 @@ export class Watcher {

private buildDelay = 0;
private buildTimeout: NodeJS.Timer | null = null;
private closed = false;
private readonly invalidatedIds = new Map<string, ChangeEvent>();
private rerun = false;
private running = true;
Expand All @@ -58,11 +59,13 @@ export class Watcher {
}

async close(): Promise<void> {
if (this.closed) return;
this.closed = true;
if (this.buildTimeout) clearTimeout(this.buildTimeout);
for (const task of this.tasks) {
task.close();
}
await this.emitter.emitAndAwait('close');
await this.emitter.emit('close');
this.emitter.removeAllListeners();
}

Expand Down Expand Up @@ -91,22 +94,20 @@ export class Watcher {
this.buildTimeout = null;
try {
await Promise.all(
[...this.invalidatedIds].map(([id, event]) =>
this.emitter.emitAndAwait('change', id, { event })
)
[...this.invalidatedIds].map(([id, event]) => this.emitter.emit('change', id, { event }))
);
this.invalidatedIds.clear();
this.emitter.emit('restart');
this.emitter.removeAwaited();
await this.emitter.emit('restart');
this.emitter.removeListenersForCurrentRun();
this.run();
} catch (error: any) {
this.invalidatedIds.clear();
this.emitter.emit('event', {
await this.emitter.emit('event', {
code: 'ERROR',
error,
result: null
});
this.emitter.emit('event', {
await this.emitter.emit('event', {
code: 'END'
});
}
Expand All @@ -115,7 +116,7 @@ export class Watcher {

private async run(): Promise<void> {
this.running = true;
this.emitter.emit('event', {
await this.emitter.emit('event', {
code: 'START'
});

Expand All @@ -124,7 +125,7 @@ export class Watcher {
}

this.running = false;
this.emitter.emit('event', {
await this.emitter.emit('event', {
code: 'END'
});
if (this.rerun) {
Expand Down Expand Up @@ -197,7 +198,7 @@ export class Task {

const start = Date.now();

this.watcher.emitter.emit('event', {
await this.watcher.emitter.emit('event', {
code: 'BUNDLE_START',
input: this.options.input,
output: this.outputFiles
Expand All @@ -211,7 +212,7 @@ export class Task {
}
this.updateWatchedFiles(result);
this.skipWrite || (await Promise.all(this.outputs.map(output => result!.write(output))));
this.watcher.emitter.emit('event', {
await this.watcher.emitter.emit('event', {
code: 'BUNDLE_END',
duration: Date.now() - start,
input: this.options.input,
Expand All @@ -229,7 +230,7 @@ export class Task {
this.cache.modules = this.cache.modules.filter(module => module.id !== error.id);
}
}
this.watcher.emitter.emit('event', {
await this.watcher.emitter.emit('event', {
code: 'ERROR',
error,
result
Expand Down
53 changes: 53 additions & 0 deletions test/watch/index.js
Expand Up @@ -134,6 +134,59 @@ describe('rollup.watch', () => {
]);
});

it('waits for event listeners', async () => {
let run = 0;
const events = new Set();

await copy('test/watch/samples/basic', 'test/_tmp/input');
watcher = rollup.watch({
input: 'test/_tmp/input/main.js',
plugins: {
async writeBundle() {
if (run++ === 0) {
await wait(100);
atomicWriteFileSync('test/_tmp/input/main.js', 'export default 48;');
await wait(100);
}
if (run === 2) {
watcher.close();
}
}
},
output: {
file: 'test/_tmp/output/bundle.js',
format: 'cjs',
exports: 'auto'
}
});
await new Promise((resolve, reject) => {
let currentEvent = null;
const handleEvent = async (...args) => {
events.add(args[0]?.code);
if (currentEvent) {
watcher.close();
return reject(
new Error(
`Event ${JSON.stringify(args)} was emitted while handling ${JSON.stringify(
currentEvent
)}.`
)
);
}
currentEvent = args;
await wait(100);
currentEvent = null;
};
// This should work but should not have an effect
watcher.off('event', handleEvent);
watcher.on('event', handleEvent);
watcher.on('change', handleEvent);
watcher.on('restart', handleEvent);
watcher.on('close', resolve);
});
assert.deepStrictEqual([...events], ['START', 'BUNDLE_START', 'BUNDLE_END', 'END', undefined]);
});

it('does not fail for virtual files', async () => {
await copy('test/watch/samples/basic', 'test/_tmp/input');
watcher = rollup.watch({
Expand Down

0 comments on commit a1383e7

Please sign in to comment.