Skip to content

Commit

Permalink
feat(wrangler): Queue delivery delay support in wrangler.toml (#5413)
Browse files Browse the repository at this point in the history
* feat(wrangler): Queue delivery delay support in wrangler

* feat(C3): Added delivery controls to Queues template
  • Loading branch information
pmiguel committed Mar 28, 2024
1 parent c234dad commit 976adec
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 30 deletions.
5 changes: 5 additions & 0 deletions .changeset/thin-carpets-do.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"wrangler": minor
---

Added Queue delivery controls support in wrangler.toml
128 changes: 105 additions & 23 deletions packages/wrangler/src/__tests__/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ import writeWranglerToml from "./helpers/write-wrangler-toml";
import type { Config } from "../config";
import type { CustomDomain, CustomDomainChangeset } from "../deploy/deploy";
import type { KVNamespaceInfo } from "../kv/helpers";
import type { PostTypedConsumerBody, PutConsumerBody } from "../queues/client";
import type {
PostQueueBody,
PostTypedConsumerBody,
PutConsumerBody,
} from "../queues/client";
import type { RestRequest } from "msw";

describe("deploy", () => {
Expand Down Expand Up @@ -8985,6 +8989,8 @@ export default{
});

describe("queues", () => {
const queueId = "queue-id";
const queueName = "queue1";
it("should upload producer bindings", async () => {
writeWranglerToml({
queues: {
Expand All @@ -8998,11 +9004,15 @@ export default{
{
type: "queue",
name: "QUEUE_ONE",
queue_name: "queue1",
queue_name: queueName,
},
],
});
mockGetQueue("queue1");
mockGetQueue(queueName, queueId);
mockPutQueue(queueId, {
queue_name: queueName,
settings: {},
});

await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
Expand All @@ -9013,6 +9023,46 @@ export default{
Uploaded test-name (TIMINGS)
Published test-name (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Producer for queue1
Current Deployment ID: Galaxy-Class
NOTE: \\"Deployment ID\\" in this output will be changed to \\"Version ID\\" in a future version of Wrangler. To learn more visit: https://developers.cloudflare.com/workers/configuration/versions-and-deployments"
`);
});

it("should update queue producers on deploy", async () => {
writeWranglerToml({
queues: {
producers: [
{
queue: queueName,
binding: "MY_QUEUE",
delivery_delay: 10,
},
],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
mockGetQueue("queue1", queueId);
mockPutQueue(queueId, {
queue_name: queueName,
settings: {
delivery_delay: 10,
},
});
await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"Total Upload: xx KiB / gzip: xx KiB
Your worker has access to the following bindings:
- Queues:
- MY_QUEUE: queue1
Uploaded test-name (TIMINGS)
Published test-name (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Producer for queue1
Current Deployment ID: Galaxy-Class
Expand All @@ -9025,25 +9075,27 @@ export default{
queues: {
consumers: [
{
queue: "queue1",
queue: queueName,
dead_letter_queue: "myDLQ",
max_batch_size: 5,
max_batch_timeout: 3,
max_retries: 10,
retry_delay: 5,
},
],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
mockGetQueue("queue1");
mockPutQueueConsumer("queue1", "test-name", {
mockGetQueue(queueName);
mockPutQueueConsumer(queueName, "test-name", {
dead_letter_queue: "myDLQ",
settings: {
batch_size: 5,
max_retries: 10,
max_wait_time_ms: 3000,
retry_delay: 5,
},
});
await runWrangler("deploy index.js");
Expand All @@ -9065,7 +9117,7 @@ export default{
queues: {
consumers: [
{
queue: "queue1",
queue: queueName,
type: "http_pull",
dead_letter_queue: "myDLQ",
max_batch_size: 5,
Expand All @@ -9079,8 +9131,8 @@ export default{
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
mockGetQueue("queue1", "queue1-queue-id");
mockPostQueueHTTPConsumer("queue1-queue-id", {
mockGetQueue(queueName, queueId);
mockPostQueueHTTPConsumer(queueId, {
type: "http_pull",
dead_letter_queue: "myDLQ",
settings: {
Expand Down Expand Up @@ -9109,7 +9161,7 @@ export default{
queues: {
consumers: [
{
queue: "queue1",
queue: queueName,
type: "http_pull",
},
],
Expand All @@ -9129,8 +9181,8 @@ export default{
errors: [],
messages: [],
result: {
queue: "queue1",
queue_id: "queue1-queue-id",
queue: queueName,
queue_id: queueId,
consumers: [
{ type: "http_pull", consumer_id: "queue1-consumer-id" },
],
Expand All @@ -9144,7 +9196,7 @@ export default{
rest.put(
`*/accounts/:accountId/queues/:queueId/consumers/:consumerId`,
async (req, res, ctx) => {
expect(req.params.queueId).toEqual("queue1-queue-id");
expect(req.params.queueId).toEqual(queueId);
expect(req.params.consumerId).toEqual("queue1-consumer-id");
expect(req.params.accountId).toEqual("some-account-id");
return res(
Expand Down Expand Up @@ -9177,7 +9229,7 @@ export default{
queues: {
consumers: [
{
queue: "queue1",
queue: queueName,
dead_letter_queue: "myDLQ",
max_batch_size: 5,
max_batch_timeout: 3,
Expand All @@ -9190,8 +9242,8 @@ export default{
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
mockGetQueue("queue1");
mockPutQueueConsumer("queue1", "test-name", {
mockGetQueue(queueName);
mockPutQueueConsumer(queueName, "test-name", {
dead_letter_queue: "myDLQ",
settings: {
batch_size: 5,
Expand Down Expand Up @@ -9219,7 +9271,7 @@ export default{
queues: {
consumers: [
{
queue: "queue1",
queue: queueName,
dead_letter_queue: "myDLQ",
max_batch_size: 5,
max_batch_timeout: 3,
Expand All @@ -9232,8 +9284,8 @@ export default{
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
mockGetQueue("queue1");
mockPutQueueConsumer("queue1", "test-name", {
mockGetQueue(queueName);
mockPutQueueConsumer(queueName, "test-name", {
dead_letter_queue: "myDLQ",
settings: {
batch_size: 5,
Expand Down Expand Up @@ -9261,7 +9313,7 @@ export default{
producers: [],
consumers: [
{
queue: "queue1",
queue: queueName,
dead_letter_queue: "myDLQ",
max_batch_size: 5,
max_batch_timeout: 3,
Expand All @@ -9285,14 +9337,14 @@ export default{
it("producer should error when a queue doesn't exist", async () => {
writeWranglerToml({
queues: {
producers: [{ queue: "queue1", binding: "QUEUE_ONE" }],
producers: [{ queue: queueName, binding: "QUEUE_ONE" }],
consumers: [],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
mockGetQueueMissing("queue1");
mockGetQueueMissing(queueName);

await expect(
runWrangler("deploy index.js")
Expand Down Expand Up @@ -10116,7 +10168,10 @@ function mockGetQueue(expectedQueueName: string, expectedQueueId?: string) {
success: true,
errors: [],
messages: [],
result: { queue: expectedQueueName, queue_id: expectedQueueId },
result: {
queue_name: expectedQueueName,
queue_id: expectedQueueId,
},
})
);
}
Expand Down Expand Up @@ -10181,6 +10236,33 @@ function mockPutQueueConsumer(
return requests;
}

function mockPutQueue(expectedQueueId: string, expectedBody: PostQueueBody) {
const requests = { count: 0 };
msw.use(
rest.put(`*/accounts/:accountId/queues/:queueId`, async (req, res, ctx) => {
const body = await req.json();
expect(req.params.queueId).toEqual(expectedQueueId);
expect(req.params.accountId).toEqual("some-account-id");
expect(body).toEqual(expectedBody);
requests.count += 1;
return res(
ctx.json({
success: true,
errors: [],
messages: [],
result: {
queue: expectedBody.queue_name,
settings: {
delivery_delay: expectedBody.settings?.delivery_delay,
},
},
})
);
})
);
return requests;
}

function mockPostQueueHTTPConsumer(
expectedQueueId: string,
expectedBody: PostTypedConsumerBody
Expand Down
3 changes: 3 additions & 0 deletions packages/wrangler/src/config/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ export interface EnvironmentNonInheritable {

/** The name of this Queue. */
queue: string;

/** The number of seconds to wait before delivering a message */
delivery_delay?: number;
}[];

/** Consumer configuration */
Expand Down
18 changes: 18 additions & 0 deletions packages/wrangler/src/config/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2335,6 +2335,7 @@ const validateQueueBinding: ValidatorFn = (diagnostics, field, value) => {
!validateAdditionalProperties(diagnostics, field, Object.keys(value), [
"binding",
"queue",
"delivery_delay",
])
) {
return false;
Expand All @@ -2350,6 +2351,7 @@ const validateQueueBinding: ValidatorFn = (diagnostics, field, value) => {
);
isValid = false;
}

if (
!isRequiredProperty(value, "queue", "string") ||
(value as { queue: string }).queue.length === 0
Expand All @@ -2361,6 +2363,22 @@ const validateQueueBinding: ValidatorFn = (diagnostics, field, value) => {
);
isValid = false;
}

const options: {
key: string;
type: "number" | "string" | "boolean";
}[] = [{ key: "delivery_delay", type: "number" }];
for (const optionalOpt of options) {
if (!isOptionalProperty(value, optionalOpt.key, optionalOpt.type)) {
diagnostics.errors.push(
`"${field}" should, optionally, have a ${optionalOpt.type} "${
optionalOpt.key
}" field but got ${JSON.stringify(value)}.`
);
isValid = false;
}
}

return isValid;
};

Expand Down
32 changes: 31 additions & 1 deletion packages/wrangler/src/deploy/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
getQueue,
postTypedConsumer,
putConsumer,
putQueue,
putTypedConsumer,
} from "../queues/client";
import { syncAssets } from "../sites";
Expand All @@ -53,7 +54,11 @@ import type {
} from "../config/environment";
import type { Entry } from "../deployment-bundle/entry";
import type { CfPlacement, CfWorkerInit } from "../deployment-bundle/worker";
import type { PostTypedConsumerBody, PutConsumerBody } from "../queues/client";
import type {
PostQueueBody,
PostTypedConsumerBody,
PutConsumerBody,
} from "../queues/client";
import type { AssetPaths } from "../sites";
import type { RetrieveSourceMapFunction } from "../sourcemap";

Expand Down Expand Up @@ -1014,6 +1019,30 @@ export async function ensureQueuesExist(config: Config) {
}
}

export async function updateQueueProducers(
config: Config
): Promise<Promise<string[]>[]> {
const producers = config.queues.producers || [];
const updateProducers: Promise<string[]>[] = [];
for (const producer of producers) {
const queue = await getQueue(config, producer.queue);
const body: PostQueueBody = {
queue_name: queue.queue_name,
settings: {
delivery_delay: producer.delivery_delay,
},
};

updateProducers.push(
putQueue(config, queue.queue_id, body).then(() => [
`Producer for ${producer.queue}`,
])
);
}

return updateProducers;
}

export async function updateQueueConsumers(
config: Config
): Promise<Promise<string[]>[]> {
Expand Down Expand Up @@ -1070,6 +1099,7 @@ export async function updateQueueConsumers(
? 1000 * consumer.max_batch_timeout
: undefined,
max_concurrency: consumer.max_concurrency,
retry_delay: consumer.retry_delay,
},
};

Expand Down

0 comments on commit 976adec

Please sign in to comment.