diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index a9fe4224d..3ff88be25 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -354,6 +354,14 @@ export class OrderedQueue extends MessageQueue { * @fires OrderedQueue#drain */ async publish(): Promise { + // If there's nothing to flush, don't try, just short-circuit to the drain event. + // This can happen if we get a publish() call after already being drained, in + // the case that topic.flush() pulls a reference to us before we get deleted. + if (!this.batches.length) { + this.emit('drain'); + return; + } + this.inFlight = true; if (this.pending) { diff --git a/test/publisher/message-queues.ts b/test/publisher/message-queues.ts index f28b4e33b..4c62917e7 100644 --- a/test/publisher/message-queues.ts +++ b/test/publisher/message-queues.ts @@ -729,6 +729,17 @@ describe('Message Queues', () => { assert.strictEqual(spy.callCount, 1); }); + + it('should emit "drain" if already empty on publish', async () => { + const spy = sandbox.spy(); + sandbox.stub(queue, '_publish').resolves(); + + queue.on('drain', spy); + await queue.publish(); + await queue.publish(); + + assert.strictEqual(spy.callCount, 2); + }); }); describe('resumePublishing', () => {