Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v3.0] Custom awaiting watch emitter #4609

Merged
merged 3 commits into from Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/02-javascript-api.md
Expand Up @@ -223,6 +223,8 @@ watcher.on('event', event => {
// bundle object for output generation errors. As with
// "BUNDLE_END", you should call "event.result.close()" if
// present once you are done.
// If you return a Promise from your event handler, Rollup will wait until the
// Promise is resolved before continuing.
});

// This will make sure that bundles are properly closed after each run
Expand All @@ -232,7 +234,13 @@ watcher.on('event', ({ result }) => {
}
});

// stop watching
// Additionally, you can hook into the following. Again, return a Promise to
// make Rollup wait at that stage:
watcher.on('change', (id, { event }) => { /* a file was modified */ })
watcher.on('restart', () => { /* a new run was triggered */ })
watcher.on('close', () => { /* the watcher was closed, see below */ })

// to stop watching
watcher.close();
```

Expand Down
4 changes: 2 additions & 2 deletions src/Graph.ts
Expand Up @@ -87,8 +87,8 @@ export default class Graph {
const handleChange = (...args: Parameters<WatchChangeHook>) =>
this.pluginDriver.hookParallel('watchChange', args);
const handleClose = () => this.pluginDriver.hookParallel('closeWatcher', []);
watcher.onCurrentAwaited('change', handleChange);
watcher.onCurrentAwaited('close', handleClose);
watcher.onCurrentRun('change', handleChange);
watcher.onCurrentRun('close', handleClose);
}
this.pluginDriver = new PluginDriver(this, options, options.plugins, this.pluginCache);
this.acornParser = acorn.Parser.extend(...(options.acornInjectPlugins as any));
Expand Down
55 changes: 26 additions & 29 deletions src/rollup/types.d.ts
Expand Up @@ -839,40 +839,37 @@ export interface RollupWatchOptions extends InputOptions {
watch?: WatcherOptions | false;
}

interface TypedEventEmitter<T extends { [event: string]: (...args: any) => any }> {
addListener<K extends keyof T>(event: K, listener: T[K]): this;
emit<K extends keyof T>(event: K, ...args: Parameters<T[K]>): boolean;
eventNames(): Array<keyof T>;
getMaxListeners(): number;
listenerCount(type: keyof T): number;
listeners<K extends keyof T>(event: K): Array<T[K]>;
off<K extends keyof T>(event: K, listener: T[K]): this;
on<K extends keyof T>(event: K, listener: T[K]): this;
once<K extends keyof T>(event: K, listener: T[K]): this;
prependListener<K extends keyof T>(event: K, listener: T[K]): this;
prependOnceListener<K extends keyof T>(event: K, listener: T[K]): this;
rawListeners<K extends keyof T>(event: K): Array<T[K]>;
removeAllListeners<K extends keyof T>(event?: K): this;
removeListener<K extends keyof T>(event: K, listener: T[K]): this;
setMaxListeners(n: number): this;
}

export interface RollupAwaitingEmitter<T extends { [event: string]: (...args: any) => any }>
extends TypedEventEmitter<T> {
export type AwaitedEventListener<
T extends { [event: string]: (...args: any) => any },
K extends keyof T
> = (...args: Parameters<T[K]>) => void | Promise<void>;

export interface AwaitingEventEmitter<T extends { [event: string]: (...args: any) => any }> {
close(): Promise<void>;
emitAndAwait<K extends keyof T>(event: K, ...args: Parameters<T[K]>): Promise<ReturnType<T[K]>[]>;
emit<K extends keyof T>(event: K, ...args: Parameters<T[K]>): Promise<unknown>;
/**
* Removes an event listener.
*/
off<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this;
/**
* Registers an event listener that will be awaited before Rollup continues.
* All listeners will be awaited in parallel while rejections are tracked via
* Promise.all.
*/
on<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this;
/**
* Registers an event listener that will be awaited before Rollup continues
* for events emitted via emitAndAwait. All listeners will be awaited in
* parallel while rejections are tracked via Promise.all.
* Listeners are removed automatically when removeAwaited is called, which
* happens automatically after each run.
* Registers an event listener that will be awaited before Rollup continues.
* All listeners will be awaited in parallel while rejections are tracked via
* Promise.all.
* Listeners are removed automatically when removeListenersForCurrentRun is
* called, which happens automatically after each run.
*/
onCurrentAwaited<K extends keyof T>(
onCurrentRun<K extends keyof T>(
event: K,
listener: (...args: Parameters<T[K]>) => Promise<ReturnType<T[K]>>
): this;
removeAwaited(): this;
removeAllListeners(): this;
removeListenersForCurrentRun(): this;
}

export type RollupWatcherEvent =
Expand All @@ -888,7 +885,7 @@ export type RollupWatcherEvent =
| { code: 'END' }
| { code: 'ERROR'; error: RollupError; result: RollupBuild | null };

export type RollupWatcher = RollupAwaitingEmitter<{
export type RollupWatcher = AwaitingEventEmitter<{
change: (id: string, change: { event: ChangeEvent }) => void;
close: () => void;
event: (event: RollupWatcherEvent) => void;
Expand Down
88 changes: 56 additions & 32 deletions src/watch/WatchEmitter.ts
@@ -1,48 +1,72 @@
import { EventEmitter } from 'node:events';
import { AwaitedEventListener, AwaitingEventEmitter } from '../rollup/types';

type PromiseReturn<T extends (...args: any) => any> = (
...args: Parameters<T>
) => Promise<ReturnType<T>>;

export class WatchEmitter<
T extends { [event: string]: (...args: any) => any }
> extends EventEmitter {
private awaitedHandlers: {
[K in keyof T]?: PromiseReturn<T[K]>[];
export class WatchEmitter<T extends { [event: string]: (...args: any) => any }>
implements AwaitingEventEmitter<T>
{
private currentHandlers: {
[K in keyof T]?: AwaitedEventListener<T, K>[];
} = Object.create(null);
private persistentHandlers: {
[K in keyof T]?: AwaitedEventListener<T, K>[];
} = Object.create(null);

constructor() {
super();
// Allows more than 10 bundles to be watched without
// showing the `MaxListenersExceededWarning` to the user.
this.setMaxListeners(Infinity);
}

// Will be overwritten by Rollup
async close(): Promise<void> {}

emitAndAwait<K extends keyof T>(
event: K,
...args: Parameters<T[K]>
): Promise<ReturnType<T[K]>[]> {
this.emit(event as string, ...(args as any[]));
return Promise.all(this.getHandlers(event).map(handler => handler(...args)));
emit<K extends keyof T>(event: K, ...args: Parameters<T[K]>): Promise<unknown> {
return Promise.all(
this.getCurrentHandlers(event)
.concat(this.getPersistentHandlers(event))
.map(handler => handler(...args))
);
}

off<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this {
const listeners = this.persistentHandlers[event];
if (listeners) {
// A hack stolen from "mitt": ">>> 0" does not change numbers >= 0, but -1
// (which would remove the last array element if used unchanged) is turned
// into max_int, which is outside the array and does not change anything.
listeners.splice(listeners.indexOf(listener) >>> 0, 1);
}
return this;
}

on<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this {
this.getPersistentHandlers(event).push(listener);
return this;
}

onCurrentAwaited<K extends keyof T>(
event: K,
listener: (...args: Parameters<T[K]>) => Promise<ReturnType<T[K]>>
): this {
this.getHandlers(event).push(listener);
onCurrentRun<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this {
this.getCurrentHandlers(event).push(listener);
return this;
}

removeAwaited(): this {
this.awaitedHandlers = {};
once<K extends keyof T>(event: K, listener: AwaitedEventListener<T, K>): this {
const selfRemovingListener: AwaitedEventListener<T, K> = (...args) => {
this.off(event, selfRemovingListener);
return listener(...args);
};
this.on(event, selfRemovingListener);
return this;
}

private getHandlers<K extends keyof T>(event: K): PromiseReturn<T[K]>[] {
return this.awaitedHandlers[event] || (this.awaitedHandlers[event] = []);
removeAllListeners(): this {
this.removeListenersForCurrentRun();
this.persistentHandlers = Object.create(null);
return this;
}

removeListenersForCurrentRun(): this {
this.currentHandlers = Object.create(null);
return this;
}

private getCurrentHandlers<K extends keyof T>(event: K): AwaitedEventListener<T, K>[] {
return this.currentHandlers[event] || (this.currentHandlers[event] = []);
}

private getPersistentHandlers<K extends keyof T>(event: K): AwaitedEventListener<T, K>[] {
return this.persistentHandlers[event] || (this.persistentHandlers[event] = []);
}
}
27 changes: 14 additions & 13 deletions src/watch/watch.ts
Expand Up @@ -38,6 +38,7 @@ export class Watcher {

private buildDelay = 0;
private buildTimeout: NodeJS.Timer | null = null;
private closed = false;
private readonly invalidatedIds = new Map<string, ChangeEvent>();
private rerun = false;
private running = true;
Expand All @@ -58,11 +59,13 @@ export class Watcher {
}

async close(): Promise<void> {
if (this.closed) return;
this.closed = true;
if (this.buildTimeout) clearTimeout(this.buildTimeout);
for (const task of this.tasks) {
task.close();
}
await this.emitter.emitAndAwait('close');
await this.emitter.emit('close');
this.emitter.removeAllListeners();
}

Expand Down Expand Up @@ -91,22 +94,20 @@ export class Watcher {
this.buildTimeout = null;
try {
await Promise.all(
[...this.invalidatedIds].map(([id, event]) =>
this.emitter.emitAndAwait('change', id, { event })
)
[...this.invalidatedIds].map(([id, event]) => this.emitter.emit('change', id, { event }))
);
this.invalidatedIds.clear();
this.emitter.emit('restart');
this.emitter.removeAwaited();
await this.emitter.emit('restart');
this.emitter.removeListenersForCurrentRun();
this.run();
} catch (error: any) {
this.invalidatedIds.clear();
this.emitter.emit('event', {
await this.emitter.emit('event', {
code: 'ERROR',
error,
result: null
});
this.emitter.emit('event', {
await this.emitter.emit('event', {
code: 'END'
});
}
Expand All @@ -115,7 +116,7 @@ export class Watcher {

private async run(): Promise<void> {
this.running = true;
this.emitter.emit('event', {
await this.emitter.emit('event', {
code: 'START'
});

Expand All @@ -124,7 +125,7 @@ export class Watcher {
}

this.running = false;
this.emitter.emit('event', {
await this.emitter.emit('event', {
code: 'END'
});
if (this.rerun) {
Expand Down Expand Up @@ -197,7 +198,7 @@ export class Task {

const start = Date.now();

this.watcher.emitter.emit('event', {
await this.watcher.emitter.emit('event', {
code: 'BUNDLE_START',
input: this.options.input,
output: this.outputFiles
Expand All @@ -211,7 +212,7 @@ export class Task {
}
this.updateWatchedFiles(result);
this.skipWrite || (await Promise.all(this.outputs.map(output => result!.write(output))));
this.watcher.emitter.emit('event', {
await this.watcher.emitter.emit('event', {
code: 'BUNDLE_END',
duration: Date.now() - start,
input: this.options.input,
Expand All @@ -229,7 +230,7 @@ export class Task {
this.cache.modules = this.cache.modules.filter(module => module.id !== error.id);
}
}
this.watcher.emitter.emit('event', {
await this.watcher.emitter.emit('event', {
code: 'ERROR',
error,
result
Expand Down
53 changes: 53 additions & 0 deletions test/watch/index.js
Expand Up @@ -134,6 +134,59 @@ describe('rollup.watch', () => {
]);
});

it('waits for event listeners', async () => {
let run = 0;
const events = new Set();

await copy('test/watch/samples/basic', 'test/_tmp/input');
watcher = rollup.watch({
input: 'test/_tmp/input/main.js',
plugins: {
async writeBundle() {
if (run++ === 0) {
await wait(100);
atomicWriteFileSync('test/_tmp/input/main.js', 'export default 48;');
await wait(100);
}
if (run === 2) {
watcher.close();
}
}
},
output: {
file: 'test/_tmp/output/bundle.js',
format: 'cjs',
exports: 'auto'
}
});
await new Promise((resolve, reject) => {
let currentEvent = null;
const handleEvent = async (...args) => {
events.add(args[0]?.code);
if (currentEvent) {
watcher.close();
return reject(
new Error(
`Event ${JSON.stringify(args)} was emitted while handling ${JSON.stringify(
currentEvent
)}.`
)
);
}
currentEvent = args;
await wait(100);
currentEvent = null;
};
// This should work but should not have an effect
watcher.off('event', handleEvent);
watcher.on('event', handleEvent);
watcher.on('change', handleEvent);
watcher.on('restart', handleEvent);
watcher.on('close', resolve);
});
assert.deepStrictEqual([...events], ['START', 'BUNDLE_START', 'BUNDLE_END', 'END', undefined]);
});

it('does not fail for virtual files', async () => {
await copy('test/watch/samples/basic', 'test/_tmp/input');
watcher = rollup.watch({
Expand Down