Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [JS] discard_new_per_subject functionality #373

Merged
merged 1 commit into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 13 additions & 3 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import {
import { millis, nanos } from "./jsutil.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { headers, MsgHdrs } from "./headers.ts";
import { consumerOpts, deferred, ErrorCode } from "./mod.ts";
import { consumerOpts, deferred, ErrorCode, NatsError } from "./mod.ts";
import { compare, Feature, parseSemVer } from "./semver.ts";
import { JetStreamManagerImpl } from "./jsm.ts";

Expand Down Expand Up @@ -422,8 +422,18 @@ export class Bucket implements KV, KvRemove {
o.headers = h;
h.set("Nats-Expected-Last-Subject-Sequence", `${opts.previousSeq}`);
}
const pa = await this.js.publish(this.subjectForKey(ek), data, o);
return pa.seq;
try {
const pa = await this.js.publish(this.subjectForKey(ek), data, o);
return pa.seq;
} catch (err) {
const ne = err as NatsError;
if (ne.isJetStreamError()) {
ne.message = ne.api_error?.description!;
ne.code = `${ne.api_error?.code!}`;
return Promise.reject(ne);
}
return Promise.reject(err);
}
}

async get(
Expand Down
5 changes: 5 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,11 @@ export interface StreamUpdateConfig {
* When a Stream reach its limits either old messages are deleted or new ones are denied
*/
discard: DiscardPolicy;
/**
* Sets the context of the on a per subject basis. Requires {@link DiscardPolicy.New} as the
* {@link discard} policy.
*/
discard_new_per_subject: boolean;
/**
* Disables acknowledging messages that are received by the Stream.
*/
Expand Down
50 changes: 50 additions & 0 deletions tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1717,3 +1717,53 @@ Deno.test("jsm - list filter", async () => {
}
await cleanup(ns, nc);
});

Deno.test("jsm - discard_new_per_subject option", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

if (await notCompatible(ns, nc, "2.9.2")) {
return;
}

const jsm = await nc.jetstreamManager();

// discard policy new is required
await assertRejects(
async () => {
await jsm.streams.add({
name: "A",
discard_new_per_subject: true,
subjects: ["$KV.A.>"],
max_msgs_per_subject: 1,
});
},
Error,
"discard new per subject requires discard new policy to be set",
);

let si = await jsm.streams.add({
name: "KV_A",
discard: DiscardPolicy.New,
discard_new_per_subject: true,
subjects: ["$KV.A.>"],
max_msgs_per_subject: 1,
});
assertEquals(si.config.discard_new_per_subject, true);

const js = nc.jetstream();

const kv = await js.views.kv("A", { bindOnly: true });
await kv.put("B", Empty);
await assertRejects(
async () => {
(nc as NatsConnectionImpl).options.debug = true;
await kv.put("B", Empty);
},
Error,
"maximum messages per subject exceeded",
);

await cleanup(ns, nc);
});