From 79629036ab049025274ebcf817216dbfb8edd3ea Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Fri, 5 Nov 2021 15:35:44 +0100 Subject: [PATCH] fix(NODE-3116): reschedule unreliable async interval first (#3006) * fix(NODE-3116): reschedule unreliable async interval first Co-authored-by: Daria Pardue --- src/utils.ts | 32 ++-- test/unit/utils.test.js | 383 ++++++++++++++++++++++++++++------------ 2 files changed, 287 insertions(+), 128 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 6b1d216e7f..eb275e862d 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -954,7 +954,7 @@ export function makeInterruptibleAsyncInterval( ): InterruptibleAsyncInterval { let timerId: NodeJS.Timeout | undefined; let lastCallTime: number; - let lastWakeTime: number; + let cannotBeExpedited = false; let stopped = false; options = options ?? {}; @@ -965,10 +965,8 @@ export function makeInterruptibleAsyncInterval( function wake() { const currentTime = clock(); - const timeSinceLastWake = currentTime - lastWakeTime; - const timeSinceLastCall = currentTime - lastCallTime; - const timeUntilNextCall = interval - timeSinceLastCall; - lastWakeTime = currentTime; + const nextScheduledCallTime = lastCallTime + interval; + const timeUntilNextCall = nextScheduledCallTime - currentTime; // For the streaming protocol: there is nothing obviously stopping this // interval from being woken up again while we are waiting "infinitely" @@ -976,8 +974,17 @@ export function makeInterruptibleAsyncInterval( // never completes, the `timeUntilNextCall` will continue to grow // negatively unbounded, so it will never trigger a reschedule here. + // This is possible in virtualized environments like AWS Lambda where our + // clock is unreliable. In these cases the timer is "running" but never + // actually completes, so we want to execute immediately and then attempt + // to reschedule. + if (timeUntilNextCall < 0) { + executeAndReschedule(); + return; + } + // debounce multiple calls to wake within the `minInterval` - if (timeSinceLastWake < minInterval) { + if (cannotBeExpedited) { return; } @@ -985,14 +992,7 @@ export function makeInterruptibleAsyncInterval( // faster than the `minInterval` if (timeUntilNextCall > minInterval) { reschedule(minInterval); - } - - // This is possible in virtualized environments like AWS Lambda where our - // clock is unreliable. In these cases the timer is "running" but never - // actually completes, so we want to execute immediately and then attempt - // to reschedule. - if (timeUntilNextCall < 0) { - executeAndReschedule(); + cannotBeExpedited = true; } } @@ -1004,7 +1004,7 @@ export function makeInterruptibleAsyncInterval( } lastCallTime = 0; - lastWakeTime = 0; + cannotBeExpedited = false; } function reschedule(ms?: number) { @@ -1017,7 +1017,7 @@ export function makeInterruptibleAsyncInterval( } function executeAndReschedule() { - lastWakeTime = 0; + cannotBeExpedited = false; lastCallTime = clock(); fn(err => { diff --git a/test/unit/utils.test.js b/test/unit/utils.test.js index 36c4ef2efb..bee1f80e13 100644 --- a/test/unit/utils.test.js +++ b/test/unit/utils.test.js @@ -2,7 +2,6 @@ const { eachAsync, executeLegacyOperation, - now, makeInterruptibleAsyncInterval, BufferPool } = require('../../src/utils'); @@ -41,130 +40,290 @@ describe('utils', function () { }); }); - context('makeInterruptibleAsyncInterval', function () { - before(function () { - this.clock = sinon.useFakeTimers(); - }); + describe('#makeInterruptibleAsyncInterval', function () { + let clock, executor, fnSpy; - after(function () { - this.clock.restore(); + beforeEach(function () { + clock = sinon.useFakeTimers(); + fnSpy = sinon.spy(cb => { + cb(); + }); }); - it('should execute a method in an repeating interval', function (done) { - let lastTime = now(); - const marks = []; - const executor = makeInterruptibleAsyncInterval( - callback => { - marks.push(now() - lastTime); - lastTime = now(); - callback(); - }, - { interval: 10 } - ); - - setTimeout(() => { - expect(marks).to.eql([10, 10, 10, 10, 10]); - expect(marks.every(mark => marks[0] === mark)).to.be.true; + afterEach(function () { + if (executor) { executor.stop(); - done(); - }, 51); - - this.clock.tick(51); + } + clock.restore(); }); - it('should schedule execution sooner if requested within min interval threshold', function (done) { - let lastTime = now(); - const marks = []; - const executor = makeInterruptibleAsyncInterval( - callback => { - marks.push(now() - lastTime); - lastTime = now(); - callback(); - }, - { interval: 50, minInterval: 10 } - ); - - // immediately schedule execution - executor.wake(); - - setTimeout(() => { - expect(marks).to.eql([10, 50]); - executor.stop(); - done(); - }, 100); - - this.clock.tick(100); + context('when the immediate option is provided', function () { + it('executes the function immediately and schedules the next execution on the interval', function () { + executor = makeInterruptibleAsyncInterval(fnSpy, { + immediate: true, + minInterval: 10, + interval: 30 + }); + // expect immediate invocation + expect(fnSpy.calledOnce).to.be.true; + // advance clock by less than the scheduled interval to ensure we don't execute early + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + // advance clock to the interval + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); }); - it('should debounce multiple requests to wake the interval sooner', function (done) { - let lastTime = now(); - const marks = []; - const executor = makeInterruptibleAsyncInterval( - callback => { - marks.push(now() - lastTime); - lastTime = now(); - callback(); - }, - { interval: 50, minInterval: 10 } - ); - - for (let i = 0; i < 100; ++i) { - executor.wake(); - } - - setTimeout(() => { - expect(marks).to.eql([10, 50, 50, 50, 50]); - executor.stop(); - done(); - }, 250); - - this.clock.tick(250); + context('when the immediate option is not provided', function () { + it('executes the function on the provided interval', function () { + executor = makeInterruptibleAsyncInterval(fnSpy, { minInterval: 10, interval: 30 }); + // advance clock by less than the scheduled interval to ensure we don't execute early + clock.tick(29); + expect(fnSpy.callCount).to.equal(0); + // advance clock to the interval + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // advance clock by the interval + clock.tick(30); + expect(fnSpy.calledTwice).to.be.true; + }); }); - it('should immediately schedule if the clock is unreliable', function (done) { - let clockCalled = 0; - let lastTime = now(); - const marks = []; - const executor = makeInterruptibleAsyncInterval( - callback => { - marks.push(now() - lastTime); - lastTime = now(); - callback(); - }, - { - interval: 50, - minInterval: 10, - immediate: true, - clock() { - clockCalled += 1; - - // needs to happen on the third call because `wake` checks - // the `currentTime` at the beginning of the function - if (clockCalled === 3) { - return now() - 100000; + describe('#wake', function () { + context('when the time until next call is negative', () => { + // somehow we missed the execution, due to an unreliable clock + + it('should execute immediately and schedule the next execution on the interval if this is the first wake', () => { + let fakeClockHasTicked = false; + executor = makeInterruptibleAsyncInterval(fnSpy, { + minInterval: 10, + interval: 30, + clock: () => { + if (fakeClockHasTicked) { + return 81; + } + fakeClockHasTicked = true; + return 50; } + }); + + // tick the environment clock by a smaller amount than the interval + clock.tick(2); + // sanity check to make sure we haven't called execute yet + expect(fnSpy.callCount).to.equal(0); + executor.wake(); + // expect immediate execution since expected next call time was 50 + 30 = 80, but the clock shows 81 + expect(fnSpy.calledOnce).to.be.true; + // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + // move forward by the full interval to make sure the scheduled call executes + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute immediately and schedule the next execution on the interval if this is a repeated wake and the current execution is not rescheduled', () => { + let fakeClockTickCount = 0; + executor = makeInterruptibleAsyncInterval(fnSpy, { + minInterval: 10, + interval: 30, + clock: () => { + if (fakeClockTickCount === 0) { + // on init, return arbitrary starting time + fakeClockTickCount++; + return 50; + } + if (fakeClockTickCount === 1) { + // expected execution time is 80 + // on first wake return a time so less than minInterval is left and no need to reschedule + fakeClockTickCount++; + return 71; + } + return 81; + } + }); + + // tick the clock by a small amount before and after the wake to make sure no unexpected async things are happening + clock.tick(11); + executor.wake(); + clock.tick(5); + expect(fnSpy.callCount).to.equal(0); + // call our second wake that gets the overdue timer, so expect immediate execution + executor.wake(); + expect(fnSpy.calledOnce).to.be.true; + // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + // move forward by the full interval to make sure the scheduled call executes + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute immediately and schedule the next execution on the interval if this is a repeated wake even if the current execution is rescheduled', () => { + let fakeClockTickCount = 0; + executor = makeInterruptibleAsyncInterval(fnSpy, { + minInterval: 10, + interval: 30, + clock: () => { + if (fakeClockTickCount === 0) { + // on init, return arbitrary starting time + fakeClockTickCount++; + return 50; + } + if (fakeClockTickCount === 1) { + // expected execution time is 80 + // on first wake return a time so that more than minInterval is left + fakeClockTickCount++; + return 61; + } + return 81; + } + }); + + // tick the clock by a small amount before and after the wake to make sure no unexpected async things are happening + clock.tick(2); + executor.wake(); + clock.tick(9); + expect(fnSpy.callCount).to.equal(0); + // call our second wake that gets the overdue timer, so expect immediate execution + executor.wake(); + expect(fnSpy.calledOnce).to.be.true; + // move forward by more than minInterval but less than full interval to ensure we're scheduling correctly + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + // move forward by the full interval to make sure the scheduled call executes + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + }); - return now(); - } - } - ); - - // force mark at 20ms, and then the unreliable system clock - // will report a very stale `lastCallTime` on this mark. - setTimeout(() => executor.wake(), 10); - - // try to wake again in another `minInterval + immediate`, now - // using a very old `lastCallTime`. This should result in an - // immediate scheduling: 0ms (immediate), 20ms (wake with minIterval) - // and then 10ms for another immediate. - setTimeout(() => executor.wake(), 30); + context('when the time until next call is less than the minInterval', () => { + // we can't make it go any faster, so we should let the scheduled execution run + + it('should execute on the interval if this is the first wake', () => { + executor = makeInterruptibleAsyncInterval(fnSpy, { + minInterval: 10, + interval: 30 + }); + // tick the environment clock so that less than minInterval is left + clock.tick(21); + executor.wake(); + // move forward to just before exepected execution time + clock.tick(8); + expect(fnSpy.callCount).to.equal(0); + // move forward to the full interval to make sure the scheduled call executes + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute on the original interval if this is a repeated wake and the current execution is not rescheduled', () => { + executor = makeInterruptibleAsyncInterval(fnSpy, { + minInterval: 10, + interval: 30 + }); + // tick the environment clock so that less than minInterval is left + clock.tick(21); + executor.wake(); + // tick the environment clock some more so that the next wake is called at a different time + clock.tick(2); + executor.wake(); + // tick to just before the expected execution time + clock.tick(6); + expect(fnSpy.callCount).to.equal(0); + // tick up to 20 for the expected execution + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute on the minInterval from the first wake if this is a repeated wake and the current execution is rescheduled', () => { + executor = makeInterruptibleAsyncInterval(fnSpy, { + minInterval: 10, + interval: 30 + }); + // tick the environment clock so that more than minInterval is left + clock.tick(13); + executor.wake(); + // the first wake should move up the execution to occur at 23 ticks from the start + // we tick 8 to get to 21, so that less than minInterval is left on the original interval expected execution + clock.tick(8); + executor.wake(); + // now we tick to just before the rescheduled execution time + clock.tick(1); + expect(fnSpy.callCount).to.equal(0); + // tick up to 23 for the expected execution + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + }); - setTimeout(() => { - executor.stop(); - expect(marks).to.eql([0, 20, 10, 50, 50, 50, 50]); - done(); - }, 250); - this.clock.tick(250); + context('when the time until next call is more than the minInterval', () => { + // expedite the execution to minInterval + + it('should execute on the minInterval if this is the first wake', () => { + executor = makeInterruptibleAsyncInterval(fnSpy, { + minInterval: 10, + interval: 30 + }); + // tick the environment clock so that more than minInterval is left + clock.tick(3); + executor.wake(); + // the first wake should move up the execution to occur at 13 ticks from the start + // we tick to just before the rescheduled execution time + clock.tick(9); + expect(fnSpy.callCount).to.equal(0); + // tick up to 13 for the expected execution + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + + it('should execute on the minInterval from the first wake if this is a repeated wake', () => { + // NOTE: under regular circumstances, if the second wake is early enough to warrant a reschedule + // then the first wake must have already warranted a reschedule + executor = makeInterruptibleAsyncInterval(fnSpy, { + minInterval: 10, + interval: 30 + }); + // tick the environment clock so that more than minInterval is left + clock.tick(3); + executor.wake(); + // the first wake should move up the execution to occur at 13 ticks from the start + // we tick a bit more so that more than minInterval is still left and call our repeated wake + clock.tick(2); + executor.wake(); + // tick up to just before the expected execution + clock.tick(7); + expect(fnSpy.callCount).to.equal(0); + // now go up to 13 + clock.tick(1); + expect(fnSpy.calledOnce).to.be.true; + // check to make sure the next execution runs as expected + clock.tick(29); + expect(fnSpy.calledOnce).to.be.true; + clock.tick(1); + expect(fnSpy.calledTwice).to.be.true; + }); + }); }); });