Skip to content
This repository has been archived by the owner on Apr 12, 2022. It is now read-only.

Commit

Permalink
When Grouparoo versions change, clear redis and resque (#2289)
Browse files Browse the repository at this point in the history
* When Grouparoo versions change, clear redis and resque

* fix open handles in test

* stop timers after test

* create instance in hook
  • Loading branch information
evantahler committed Sep 12, 2021
1 parent 25b4d0b commit 0a3d670
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 9 deletions.
46 changes: 41 additions & 5 deletions core/__tests__/initializers/resque.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { helper } from "@grouparoo/spec-helper";
import { specHelper, api } from "actionhero";
import { specHelper, api, task } from "actionhero";
import { Redis } from "ioredis";
import { ResqueInitializer } from "../../src/initializers/resque";
import { getCoreVersion } from "../../src/modules/pluginDetails";

jest.mock("../../src/config/tasks.ts", () => ({
__esModule: true,
Expand All @@ -27,21 +29,24 @@ jest.mock("../../src/config/tasks.ts", () => ({

describe("initializers/resque", () => {
helper.grouparooTestServer({ truncate: true });
let instance: ResqueInitializer;

beforeEach(async () => {
helper.sleep(1000);
await helper.sleep(1000);
await api.resque.queue.connection.redis.flushdb();
helper.sleep(1000);
await helper.sleep(1000);
});

beforeEach(async () => (instance = new ResqueInitializer()));
afterEach(async () => await instance.stop());

test("it will check for missing periodic tasks if the resque leader", async () => {
api.resque.scheduler.leader = true;

await api.resque.queue.connection.redis.flushdb();
let found = await specHelper.findEnqueuedTasks("status");
expect(found.length).toBe(0);

const instance = new ResqueInitializer();
await instance.recheckPeriodicTasks();

found = await specHelper.findEnqueuedTasks("status");
Expand All @@ -55,10 +60,41 @@ describe("initializers/resque", () => {
let found = await specHelper.findEnqueuedTasks("status");
expect(found.length).toBe(0);

const instance = new ResqueInitializer();
await instance.recheckPeriodicTasks();

found = await specHelper.findEnqueuedTasks("status");
expect(found.length).toBe(0);
});

describe("version changes", () => {
let client: Redis;
let grouparooVersion: string;

beforeAll(async () => {
client = api.redis.clients.client;
grouparooVersion = getCoreVersion();
});

test("it will clear resque if the grouparoo version changes", async () => {
await client.set("grouparoo:version", "x");
await task.enqueue("record:export", { recordId: "foo" });

await instance.start();

const foundTasks = await specHelper.findEnqueuedTasks("record:export");
expect(foundTasks.length).toBe(0);
expect(await client.get("grouparoo:version")).toEqual(grouparooVersion);
});

test("it will not clear resque if the grouparoo version is the same", async () => {
await client.set("grouparoo:version", grouparooVersion);
await task.enqueue("record:export", { recordId: "foo" });

await instance.start();

const foundTasks = await specHelper.findEnqueuedTasks("record:export");
expect(foundTasks.length).toBe(1);
expect(await client.get("grouparoo:version")).toEqual(grouparooVersion);
});
});
});
10 changes: 10 additions & 0 deletions core/src/initializers/resque.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { Initializer, route, task, api, log } from "actionhero";
import { getCoreVersion } from "../modules/pluginDetails";
import { Reset } from "../modules/reset";

const grouparooVersionKey = "grouparoo:version";
let taskRecheckInterval: NodeJS.Timeout;

export class ResqueInitializer extends Initializer {
constructor() {
super();
this.name = `@grouparoo/resque`;
this.loadPriority = 1000;
this.startPriority = 100;
}

async initialize() {
Expand Down Expand Up @@ -93,6 +97,12 @@ export class ResqueInitializer extends Initializer {
async start() {
const delay = 1000 * 60 * 60 * 1; // 1 hour
taskRecheckInterval = setInterval(this.recheckPeriodicTasks, delay);

const client = api.redis.clients.client;
const currentVersion = getCoreVersion();
const previousVersion = await client.get(grouparooVersionKey);
if (currentVersion !== previousVersion) await Reset.clearRedis();
await client.set(grouparooVersionKey, currentVersion);
}

async stop() {
Expand Down
21 changes: 17 additions & 4 deletions core/src/modules/reset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,23 @@ export namespace Reset {
}

async function deleteKeys(pattern: string) {
const keys = await api.resque.queue.connection.redis.keys(pattern);
return Promise.all(
keys.map((k) => api.resque.queue.connection.redis.del(k))
);
const client = api.resque.queue.connection.redis;

const result: number = await new Promise((resolve, reject) => {
let count = 0;
const scanStream = client.scanStream({ match: pattern });
scanStream.once("error", (error) => reject(error));
scanStream.once("end", () => resolve(count));

scanStream.on("data", async (keys: string[]) => {
scanStream.pause();
await Promise.all(keys.map((k) => client.del(k)));
count += keys.length;
scanStream.resume();
});
});

return result;
}

async function clearFailedTasks() {
Expand Down

0 comments on commit 0a3d670

Please sign in to comment.