Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Attempted implemented graceful shutdown #1855

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 36 additions & 3 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import {EventEmitter} from 'events';
import {AckError, Message, Subscriber} from './subscriber';
import {defaultOptions} from './default-options';
import defer = require('p-defer');

export interface FlowControlOptions {
allowExcessMessages?: boolean;
Expand Down Expand Up @@ -65,6 +66,7 @@ export class LeaseManager extends EventEmitter {
private _pending: Message[];
private _subscriber: Subscriber;
private _timer?: NodeJS.Timeout;
private _onDrain?: defer.DeferredPromise<void>;
constructor(sub: Subscriber, options = {}) {
super();

Expand Down Expand Up @@ -119,6 +121,18 @@ export class LeaseManager extends EventEmitter {
this.emit('full');
}
}

/**
* Only clear the messages that are not currently in-flight, helps with
* graceful shutdown.
*/
clearNonDispensedMessages(): void {
for (const message of this._messages) {
if (!message.dispensed) {
this.remove(message);
}
}
}
/**
* Removes ALL messages from inventory.
* @private
Expand Down Expand Up @@ -146,6 +160,15 @@ export class LeaseManager extends EventEmitter {
const {maxBytes, maxMessages} = this._options;
return this.size >= maxMessages! || this.bytes >= maxBytes!;
}
/**
* Returns a promise that resolves when all messages have drained.
*/
onDrain(): Promise<void> {
if (!this._onDrain) {
this._onDrain = defer();
}
return this._onDrain.promise;
}
/**
* Removes a message from the inventory. Stopping the deadline extender if no
* messages are left over.
Expand Down Expand Up @@ -174,8 +197,15 @@ export class LeaseManager extends EventEmitter {
this._dispense(this._pending.shift()!);
}

if (this.size === 0 && this._isLeasing) {
this._cancelExtension();
if (this.size === 0) {
if (this._isLeasing) {
this._cancelExtension();
}

if (this._onDrain) {
this._onDrain.resolve();
delete this._onDrain;
}
}
}
/**
Expand Down Expand Up @@ -240,7 +270,10 @@ export class LeaseManager extends EventEmitter {
*/
private _dispense(message: Message): void {
if (this._subscriber.isOpen) {
process.nextTick(() => this._subscriber.emit('message', message));
message.dispensed = true;
process.nextTick(() => {
this._subscriber.emit('message', message);
});
}
}
/**
Expand Down
18 changes: 16 additions & 2 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class Message {
orderingKey?: string;
publishTime: PreciseDate;
received: number;
dispensed: boolean;
private _handled: boolean;
private _length: number;
private _subscriber: Subscriber;
Expand Down Expand Up @@ -182,6 +183,11 @@ export class Message {
*/
this.received = Date.now();

/**
* Indicates that a message is currently being processed by user code.
*/
this.dispensed = false;

this._handled = false;
this._length = this.data.length;
this._subscriber = sub;
Expand Down Expand Up @@ -563,6 +569,7 @@ export class Subscriber extends EventEmitter {
*/
async ack(message: Message): Promise<void> {
const ackTimeSeconds = (Date.now() - message.received) / 1000;

this.updateAckDeadline(ackTimeSeconds);

// Ignore this in this version of the method (but hook catch
Expand Down Expand Up @@ -607,7 +614,9 @@ export class Subscriber extends EventEmitter {

this.isOpen = false;
this._stream.destroy();
this._inventory.clear();

// Clear only the messages that have not begun processing
this._inventory.clearNonDispensedMessages();

await this._waitForFlush();

Expand Down Expand Up @@ -902,8 +911,13 @@ export class Subscriber extends EventEmitter {
* @returns {Promise}
*/
private async _waitForFlush(): Promise<void> {
const promises: Array<Promise<void>> = [];
// If there are messages in flight, lets wait for them to drain so that we can then
// wait on their ACKs.
if (this._inventory.size) {
await this._inventory.onDrain();
}

const promises: Array<Promise<void>> = [];
if (this._acks.numPendingRequests) {
promises.push(this._acks.onFlush());
this._acks.flush();
Expand Down
48 changes: 48 additions & 0 deletions test/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class FakeSubscriber extends EventEmitter {
class FakeMessage {
length = 20;
received: number;
dispensed = false;
constructor() {
this.received = Date.now();
}
Expand Down Expand Up @@ -169,6 +170,18 @@ describe('LeaseManager', () => {
leaseManager.add(fakeMessage);
});

it('should set dispensed to true after dispatching message', done => {
const fakeMessage = new FakeMessage() as {} as Message;

leaseManager.setOptions({allowExcessMessages: false});
leaseManager.add(fakeMessage);

subscriber.on('message', message => {
assert.strictEqual(message.dispensed, true);
done();
});
});

it('should not dispatch the message if the inventory is full', done => {
const fakeMessage = new FakeMessage() as {} as Message;

Expand Down Expand Up @@ -387,6 +400,28 @@ describe('LeaseManager', () => {
});
});

describe('clearNonDispensedMessages', () => {
it('should only clear messages with dispensed=false', () => {
leaseManager.setOptions({
maxMessages: 2,
maxBytes: 10,
allowExcessMessages: false,
});

const message = new FakeMessage();
const message2 = new FakeMessage();

message.dispensed = true;
message2.dispensed = false;
leaseManager.add(message as {} as Message);
leaseManager.add(message2 as {} as Message);

leaseManager.clearNonDispensedMessages();
assert.strictEqual(leaseManager.size, 1);
assert.strictEqual(leaseManager.bytes, message.length);
});
});

describe('isFull', () => {
it('should return true if the maxMessages threshold is hit', () => {
const maxMessages = 1;
Expand Down Expand Up @@ -506,6 +541,19 @@ describe('LeaseManager', () => {

assert.strictEqual(stub.callCount, 0);
});

it('should resolve onDrain if no messages are left', done => {
const message = new FakeMessage() as {} as Message;

const onDrain = leaseManager.onDrain();

onDrain.then(() => {
done();
});

leaseManager.add(message);
leaseManager.remove(message);
});
});

describe('setOptions', () => {
Expand Down
29 changes: 27 additions & 2 deletions test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ class FakeLeaseManager extends EventEmitter {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
add(message: s.Message): void {}
clear(): void {}
clearNonDispensedMessages(): void {}
async onDrain(): Promise<void> {}
get size(): number {
return 0;
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
remove(message: s.Message): void {}
}
Expand Down Expand Up @@ -543,9 +548,9 @@ describe('Subscriber', () => {
assert.strictEqual(stub.callCount, 1);
});

it('should clear the inventory', () => {
it('should clear the inventory of non-dispensed messages', () => {
const inventory: FakeLeaseManager = stubs.get('inventory');
const stub = sandbox.stub(inventory, 'clear');
const stub = sandbox.stub(inventory, 'clearNonDispensedMessages');

subscriber.close();
assert.strictEqual(stub.callCount, 1);
Expand Down Expand Up @@ -621,6 +626,26 @@ describe('Subscriber', () => {
assert.strictEqual(ackOnDrain.callCount, 1);
assert.strictEqual(modAckOnDrain.callCount, 1);
});

it('should wait for dispensed messages to drain', async () => {
const inventory: FakeLeaseManager = stubs.get('inventory');
const inventoryOnDrain = sandbox.stub(inventory, 'onDrain');

const ackQueue: FakeAckQueue = stubs.get('ackQueue');
const modAckQueue: FakeModAckQueue = stubs.get('modAckQueue');
const ackOnDrain = sandbox.stub(ackQueue, 'onDrain').resolves();
const modAckOnDrain = sandbox.stub(modAckQueue, 'onDrain').resolves();

ackQueue.numInFlightRequests = 1;
modAckQueue.numInFlightRequests = 1;
sandbox.stub(inventory, 'size').get(() => 1);

await subscriber.close();

assert.strictEqual(ackOnDrain.callCount, 1);
assert.strictEqual(modAckOnDrain.callCount, 1);
assert.strictEqual(inventoryOnDrain.callCount, 1);
});
});
});

Expand Down