Skip to content

Commit

Permalink
[FEAT] discard_new_per_subject functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Sep 28, 2022
1 parent 6bdf6ff commit cdee523
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 3 deletions.
16 changes: 13 additions & 3 deletions nats-base-client/kv.ts
Expand Up @@ -53,7 +53,7 @@ import {
import { millis, nanos } from "./jsutil.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { headers, MsgHdrs } from "./headers.ts";
import { consumerOpts, deferred, ErrorCode } from "./mod.ts";
import { consumerOpts, deferred, ErrorCode, NatsError } from "./mod.ts";
import { compare, Feature, parseSemVer } from "./semver.ts";
import { JetStreamManagerImpl } from "./jsm.ts";

Expand Down Expand Up @@ -422,8 +422,18 @@ export class Bucket implements KV, KvRemove {
o.headers = h;
h.set("Nats-Expected-Last-Subject-Sequence", `${opts.previousSeq}`);
}
const pa = await this.js.publish(this.subjectForKey(ek), data, o);
return pa.seq;
try {
const pa = await this.js.publish(this.subjectForKey(ek), data, o);
return pa.seq;
} catch (err) {
const ne = err as NatsError;
if (ne.isJetStreamError()) {
ne.message = ne.api_error?.description!;
ne.code = `${ne.api_error?.code!}`;
return Promise.reject(ne);
}
return Promise.reject(err);
}
}

async get(
Expand Down
5 changes: 5 additions & 0 deletions nats-base-client/types.ts
Expand Up @@ -1760,6 +1760,11 @@ export interface StreamUpdateConfig {
* When a Stream reach its limits either old messages are deleted or new ones are denied
*/
discard: DiscardPolicy;
/**
* Sets the context of the on a per subject basis. Requires {@link DiscardPolicy.New} as the
* {@link discard} policy.
*/
discard_new_per_subject: boolean;
/**
* Disables acknowledging messages that are received by the Stream.
*/
Expand Down
50 changes: 50 additions & 0 deletions tests/jsm_test.ts
Expand Up @@ -1717,3 +1717,53 @@ Deno.test("jsm - list filter", async () => {
}
await cleanup(ns, nc);
});

Deno.test("jsm - discard_new_per_subject option", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

if (await notCompatible(ns, nc, "2.9.2")) {
return;
}

const jsm = await nc.jetstreamManager();

// discard policy new is required
await assertRejects(
async () => {
await jsm.streams.add({
name: "A",
discard_new_per_subject: true,
subjects: ["$KV.A.>"],
max_msgs_per_subject: 1,
});
},
Error,
"discard new per subject requires discard new policy to be set",
);

let si = await jsm.streams.add({
name: "KV_A",
discard: DiscardPolicy.New,
discard_new_per_subject: true,
subjects: ["$KV.A.>"],
max_msgs_per_subject: 1,
});
assertEquals(si.config.discard_new_per_subject, true);

const js = nc.jetstream();

const kv = await js.views.kv("A", { bindOnly: true });
await kv.put("B", Empty);
await assertRejects(
async () => {
(nc as NatsConnectionImpl).options.debug = true;
await kv.put("B", Empty);
},
Error,
"maximum messages per subject exceeded",
);

await cleanup(ns, nc);
});

0 comments on commit cdee523

Please sign in to comment.