Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Name in consumer configuration #580

Merged
merged 2 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2128,10 +2128,11 @@ _processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig
}

natsStatus
js_checkDurName(const char *dur)
js_checkConsName(const char *cons, bool isDurable)
{
if (strchr(dur, '.') != NULL)
return nats_setError(NATS_INVALID_ARG, "%s '%s' (cannot contain '.')", jsErrInvalidDurableName, dur);
if (strchr(cons, '.') != NULL)
return nats_setError(NATS_INVALID_ARG, "%s '%s' (cannot contain '.')",
(isDurable ? jsErrInvalidDurableName : jsErrInvalidConsumerName), cons);
return NATS_OK;
}

Expand Down Expand Up @@ -2243,7 +2244,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
// If a durable name is specified, check that it is valid
if (!nats_IsStringEmpty(durable))
{
if ((s = js_checkDurName(durable)) != NATS_OK)
if ((s = js_checkConsName(durable, true)) != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);
}

Expand Down
8 changes: 7 additions & 1 deletion src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ extern const int64_t jsDefaultRequestWait;
// jsApiDurableCreateT is used to create durable consumers.
#define jsApiDurableCreateT "%.*s.CONSUMER.DURABLE.CREATE.%s.%s"

// jsApiConsumerCreateExT is used to create a named consumer.
#define jsApiConsumerCreateExT "%.*s.CONSUMER.CREATE.%s.%s"

// jsApiConsumerCreateExWithFilterT is used to create a named consumer with a filter subject.
#define jsApiConsumerCreateExWithFilterT "%.*s.CONSUMER.CREATE.%s.%s.%s"

// jsApiConsumerInfoT is used to get information about consumers.
#define jsApiConsumerInfoT "%.*s.CONSUMER.INFO.%s.%s"

Expand Down Expand Up @@ -218,7 +224,7 @@ void
js_cleanStreamState(jsStreamState *state);

natsStatus
js_checkDurName(const char *dur);
js_checkConsName(const char *cons, bool isDurable);

natsStatus
js_getMetaData(const char *reply,
Expand Down
29 changes: 27 additions & 2 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,12 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
// Marshal something that is always present first, so that the optionals
// will always start with a "," and we know that there will be a field before that.
IFOK(s, _marshalDeliverPolicy(buf, cfg->DeliverPolicy));
if ((s == NATS_OK) && !nats_IsStringEmpty(cfg->Name))
{
s = natsBuf_Append(buf, ",\"name\":\"", -1);
IFOK(s, natsBuf_Append(buf, cfg->Name, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
if ((s == NATS_OK) && (!nats_IsStringEmpty(cfg->Description)))
{
s = natsBuf_Append(buf, ",\"description\":\"", -1);
Expand Down Expand Up @@ -2314,9 +2320,14 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js,
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

if (!nats_IsStringEmpty(cfg->Name))
{
if ((s = js_checkConsName(cfg->Name, false)) != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);
}
if (!nats_IsStringEmpty(cfg->Durable))
{
if ((s = js_checkDurName(cfg->Durable)) != NATS_OK)
if ((s = js_checkConsName(cfg->Durable, true)) != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);
}

Expand All @@ -2325,7 +2336,21 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js,
{
int res;

if (nats_IsStringEmpty(cfg->Durable))
// If there is a Name in the config, this takes precedence.
if (!nats_IsStringEmpty(cfg->Name))
{
// No subject filter, use <stream>.<consumer name>
// otherwise, the filter subject goes at the end.
if (nats_IsStringEmpty(cfg->FilterSubject))
res = nats_asprintf(&subj, jsApiConsumerCreateExT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name);
else
res = nats_asprintf(&subj, jsApiConsumerCreateExWithFilterT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name, cfg->FilterSubject);
}
else if (nats_IsStringEmpty(cfg->Durable))
res = nats_asprintf(&subj, jsApiConsumerCreateT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream);
Expand Down
1 change: 1 addition & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ typedef struct jsStreamInfo
*/
typedef struct jsConsumerConfig
{
const char *Name;
const char *Durable;
const char *Description;
jsDeliverPolicy DeliverPolicy;
Expand Down
127 changes: 127 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -23316,6 +23316,133 @@ test_JetStreamMgtConsumers(void)
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Create stream: ");
jsStreamConfig_Init(&scfg);
scfg.Name = "A";
scfg.Subjects = (const char*[2]){"foo", "bar"};
scfg.SubjectsLen = 2;
s = js_AddStream(NULL, js, &scfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Create sub: ");
s = natsConnection_SubscribeSync(&sub, nc, "$JS.API.CONSUMER.>");
testCond(s == NATS_OK);

test("Ephemeral with name: ");
jsConsumerConfig_Init(&cfg);
cfg.Name = "a";
cfg.AckPolicy = js_AckExplicit;
cfg.InactiveThreshold = NATS_SECONDS_TO_NANOS(1);
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (strcmp(ci->Name, "a") == 0)
&& (ci->Config->Durable == NULL)
&& (ci->Config->InactiveThreshold == NATS_SECONDS_TO_NANOS(1))
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A.a") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":\"a\"") != NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Durable: ");
jsConsumerConfig_Init(&cfg);
cfg.Durable = "b";
cfg.AckPolicy = js_AckExplicit;
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (strcmp(ci->Name, "b") == 0)
&& (strcmp(ci->Config->Durable, "b") == 0)
&& (ci->Config->InactiveThreshold == 0)
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.DURABLE.CREATE.A.b") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":") == NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Durable and Name same: ");
jsConsumerConfig_Init(&cfg);
cfg.Name = "b";
cfg.Durable = "b";
cfg.AckPolicy = js_AckExplicit;
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (strcmp(ci->Name, "b") == 0)
&& (strcmp(ci->Config->Durable, "b") == 0)
&& (ci->Config->InactiveThreshold == 0)
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check subject: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A.b") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":\"b\"") != NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Ephemeral with filter: ");
jsConsumerConfig_Init(&cfg);
cfg.Name = "c";
cfg.AckPolicy = js_AckExplicit;
cfg.FilterSubject = "bar";
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (strcmp(ci->Name, "c") == 0)
&& (ci->Config->Durable == NULL)
&& (ci->Config->InactiveThreshold != 0)
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check subject: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A.c.bar") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":\"c\"") != NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Legacy ephemeral: ");
jsConsumerConfig_Init(&cfg);
cfg.AckPolicy = js_AckExplicit;
cfg.FilterSubject = "bar";
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (ci->Name != NULL)
&& (ci->Config->Durable == NULL)
&& (ci->Config->InactiveThreshold != 0)
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check subject: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":") == NULL));
natsMsg_Destroy(resp);
resp = NULL;

natsSubscription_Destroy(sub);

JS_TEARDOWN;
}

Expand Down