From 18512081e0c19fbc71d4077b1fbe219ffe525f15 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 27 Sep 2022 15:53:59 -0500 Subject: [PATCH] discard_new_per_subject functionality as described in https://github.com/nats-io/nats-architecture-and-design/issues/160 --- nats-base-client/kv.ts | 16 ++++++++++--- nats-base-client/types.ts | 5 ++++ tests/jsm_test.ts | 50 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/nats-base-client/kv.ts b/nats-base-client/kv.ts index 04fc4d48..0c094718 100644 --- a/nats-base-client/kv.ts +++ b/nats-base-client/kv.ts @@ -53,7 +53,7 @@ import { import { millis, nanos } from "./jsutil.ts"; import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts"; import { headers, MsgHdrs } from "./headers.ts"; -import { consumerOpts, deferred, ErrorCode } from "./mod.ts"; +import { consumerOpts, deferred, ErrorCode, NatsError } from "./mod.ts"; import { compare, Feature, parseSemVer } from "./semver.ts"; import { JetStreamManagerImpl } from "./jsm.ts"; @@ -422,8 +422,18 @@ export class Bucket implements KV, KvRemove { o.headers = h; h.set("Nats-Expected-Last-Subject-Sequence", `${opts.previousSeq}`); } - const pa = await this.js.publish(this.subjectForKey(ek), data, o); - return pa.seq; + try { + const pa = await this.js.publish(this.subjectForKey(ek), data, o); + return pa.seq; + } catch (err) { + const ne = err as NatsError; + if (ne.isJetStreamError()) { + ne.message = ne.api_error?.description!; + ne.code = `${ne.api_error?.code!}`; + return Promise.reject(ne); + } + return Promise.reject(err); + } } async get( diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 1a29bb81..c36f367c 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -1760,6 +1760,11 @@ export interface StreamUpdateConfig { * When a Stream reach its limits either old messages are deleted or new ones are denied */ discard: DiscardPolicy; + /** + * Sets the context of the on a per subject basis. Requires {@link DiscardPolicy.New} as the + * {@link discard} policy. + */ + discard_new_per_subject: boolean; /** * Disables acknowledging messages that are received by the Stream. */ diff --git a/tests/jsm_test.ts b/tests/jsm_test.ts index 172f156d..f399278a 100644 --- a/tests/jsm_test.ts +++ b/tests/jsm_test.ts @@ -1717,3 +1717,53 @@ Deno.test("jsm - list filter", async () => { } await cleanup(ns, nc); }); + +Deno.test("jsm - discard_new_per_subject option", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + + if (await notCompatible(ns, nc, "2.9.2")) { + return; + } + + const jsm = await nc.jetstreamManager(); + + // discard policy new is required + await assertRejects( + async () => { + await jsm.streams.add({ + name: "A", + discard_new_per_subject: true, + subjects: ["$KV.A.>"], + max_msgs_per_subject: 1, + }); + }, + Error, + "discard new per subject requires discard new policy to be set", + ); + + let si = await jsm.streams.add({ + name: "KV_A", + discard: DiscardPolicy.New, + discard_new_per_subject: true, + subjects: ["$KV.A.>"], + max_msgs_per_subject: 1, + }); + assertEquals(si.config.discard_new_per_subject, true); + + const js = nc.jetstream(); + + const kv = await js.views.kv("A", { bindOnly: true }); + await kv.put("B", Empty); + await assertRejects( + async () => { + (nc as NatsConnectionImpl).options.debug = true; + await kv.put("B", Empty); + }, + Error, + "maximum messages per subject exceeded", + ); + + await cleanup(ns, nc); +});