Skip to content

Commit

Permalink
test(rate-limit): add flow case
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jan 11, 2023
1 parent 5a78f58 commit cc631a6
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 5 deletions.
18 changes: 18 additions & 0 deletions src/interfaces/minimal-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ export interface MinimalJob<
* @returns
*/
asJSONSandbox(): JobJsonSandbox;
/**
* Updates a job's data
*
* @param data - the data that will replace the current jobs data.
*/
update(data: DataType): Promise<void>;
/**
* Updates a job's progress
*
* @param progress - number or object to be saved as progress.
*/
updateProgress(progress: number | object): Promise<void>;
/**
* Logs one row of log data.
*
* @param logRow - string with log data to be logged.
*/
log(logRow: string): Promise<number>;
get queueName(): string;
/**
* @returns the prefix that is used.
Expand Down
110 changes: 108 additions & 2 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { default as IORedis } from 'ioredis';
import { after, every } from 'lodash';
import { beforeEach, describe, it } from 'mocha';
import { v4 } from 'uuid';
import { Queue, QueueEvents, Worker } from '../src/classes';
import { FlowProducer, Queue, QueueEvents, Worker } from '../src/classes';
import { delay, removeAllQueueData } from '../src/utils';

describe('Rate Limiter', function () {
Expand Down Expand Up @@ -107,6 +107,113 @@ describe('Rate Limiter', function () {
await worker.close();
});

describe('when using flows', () => {
it('should obey the rate limit per queue', async function () {
this.timeout(20000);
const name = 'child-job';
const parentQueueName = `parent-queue-${v4()}`;
const parentQueueEvents = new QueueEvents(parentQueueName, {
connection,
});
const numJobs = 10;

const worker = new Worker(
queueName,
async () => {
await delay(100);
},
{
connection,
concurrency: 2,
limiter: {
max: 1,
duration: 1000,
},
},
);

const parentWorker = new Worker(
parentQueueName,
async () => {
await delay(100);
},
{
connection,
concurrency: 2,
limiter: {
max: 1,
duration: 2000,
},
},
);

const flow = new FlowProducer({ connection });
const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
await worker.close();

try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte((numJobs - 1) * 1000);
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

const parentResult = new Promise<void>((resolve, reject) => {
parentQueueEvents.on(
'completed',
// after every job has been completed
after(numJobs / 2, async () => {
await worker.close();

try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte((numJobs / 2 - 1) * 2000);
resolve();
} catch (err) {
reject(err);
}
}),
);

parentQueueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

const startTime = new Date().getTime();
const values = Array.from(Array(5).keys()).map(() => ({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{ name, data: { idx: 1, foo: 'baz' }, queueName },
],
}));
await flow.addBulk(values);

await result;
await parentResult;
await worker.close();
await parentWorker.close();
await parentQueueEvents.close();
});
});

it('should obey the rate limit with max value greater than 1', async function () {
this.timeout(20000);

Expand Down Expand Up @@ -226,7 +333,6 @@ describe('Rate Limiter', function () {
limiter: {
max: 1,
duration: 1000,
workerDelay: true,
},
});

Expand Down
6 changes: 3 additions & 3 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
UnrecoverableError,
Worker,
} from '../src/classes';
import { KeepJobs } from '../src/interfaces';
import { KeepJobs, MinimalJob } from '../src/interfaces';
import { JobsOptions } from '../src/types';
import { delay, removeAllQueueData } from '../src/utils';

Expand Down Expand Up @@ -2336,7 +2336,7 @@ describe('workers', function () {
attemptsMade: number,
type: string,
err: Error,
job: Job,
job: MinimalJob,
) => {
switch (type) {
case 'custom1': {
Expand Down Expand Up @@ -2521,7 +2521,7 @@ describe('workers', function () {
attemptsMade: number,
type: string,
err: Error,
job: Job,
job: MinimalJob,
) => {
if (err instanceof CustomError) {
const data = job.data;
Expand Down

0 comments on commit cc631a6

Please sign in to comment.