Skip to content

Commit

Permalink
Merge pull request #580 from nats-io/js_cc_secure
Browse files Browse the repository at this point in the history
[ADDED] `Name` in consumer configuration
  • Loading branch information
kozlovic committed Sep 14, 2022
2 parents 1192c01 + 2179f21 commit 35abb18
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 7 deletions.
9 changes: 5 additions & 4 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2128,10 +2128,11 @@ _processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig
}

natsStatus
js_checkDurName(const char *dur)
js_checkConsName(const char *cons, bool isDurable)
{
if (strchr(dur, '.') != NULL)
return nats_setError(NATS_INVALID_ARG, "%s '%s' (cannot contain '.')", jsErrInvalidDurableName, dur);
if (strchr(cons, '.') != NULL)
return nats_setError(NATS_INVALID_ARG, "%s '%s' (cannot contain '.')",
(isDurable ? jsErrInvalidDurableName : jsErrInvalidConsumerName), cons);
return NATS_OK;
}

Expand Down Expand Up @@ -2243,7 +2244,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
// If a durable name is specified, check that it is valid
if (!nats_IsStringEmpty(durable))
{
if ((s = js_checkDurName(durable)) != NATS_OK)
if ((s = js_checkConsName(durable, true)) != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);
}

Expand Down
8 changes: 7 additions & 1 deletion src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ extern const int64_t jsDefaultRequestWait;
// jsApiDurableCreateT is used to create durable consumers.
#define jsApiDurableCreateT "%.*s.CONSUMER.DURABLE.CREATE.%s.%s"

// jsApiConsumerCreateExT is used to create a named consumer.
#define jsApiConsumerCreateExT "%.*s.CONSUMER.CREATE.%s.%s"

// jsApiConsumerCreateExWithFilterT is used to create a named consumer with a filter subject.
#define jsApiConsumerCreateExWithFilterT "%.*s.CONSUMER.CREATE.%s.%s.%s"

// jsApiConsumerInfoT is used to get information about consumers.
#define jsApiConsumerInfoT "%.*s.CONSUMER.INFO.%s.%s"

Expand Down Expand Up @@ -218,7 +224,7 @@ void
js_cleanStreamState(jsStreamState *state);

natsStatus
js_checkDurName(const char *dur);
js_checkConsName(const char *cons, bool isDurable);

natsStatus
js_getMetaData(const char *reply,
Expand Down
29 changes: 27 additions & 2 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,12 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
// Marshal something that is always present first, so that the optionals
// will always start with a "," and we know that there will be a field before that.
IFOK(s, _marshalDeliverPolicy(buf, cfg->DeliverPolicy));
if ((s == NATS_OK) && !nats_IsStringEmpty(cfg->Name))
{
s = natsBuf_Append(buf, ",\"name\":\"", -1);
IFOK(s, natsBuf_Append(buf, cfg->Name, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
if ((s == NATS_OK) && (!nats_IsStringEmpty(cfg->Description)))
{
s = natsBuf_Append(buf, ",\"description\":\"", -1);
Expand Down Expand Up @@ -2314,9 +2320,14 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js,
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

if (!nats_IsStringEmpty(cfg->Name))
{
if ((s = js_checkConsName(cfg->Name, false)) != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);
}
if (!nats_IsStringEmpty(cfg->Durable))
{
if ((s = js_checkDurName(cfg->Durable)) != NATS_OK)
if ((s = js_checkConsName(cfg->Durable, true)) != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);
}

Expand All @@ -2325,7 +2336,21 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js,
{
int res;

if (nats_IsStringEmpty(cfg->Durable))
// If there is a Name in the config, this takes precedence.
if (!nats_IsStringEmpty(cfg->Name))
{
// No subject filter, use <stream>.<consumer name>
// otherwise, the filter subject goes at the end.
if (nats_IsStringEmpty(cfg->FilterSubject))
res = nats_asprintf(&subj, jsApiConsumerCreateExT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name);
else
res = nats_asprintf(&subj, jsApiConsumerCreateExWithFilterT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name, cfg->FilterSubject);
}
else if (nats_IsStringEmpty(cfg->Durable))
res = nats_asprintf(&subj, jsApiConsumerCreateT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream);
Expand Down
1 change: 1 addition & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ typedef struct jsStreamInfo
*/
typedef struct jsConsumerConfig
{
const char *Name;
const char *Durable;
const char *Description;
jsDeliverPolicy DeliverPolicy;
Expand Down
146 changes: 146 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -23316,6 +23316,152 @@ test_JetStreamMgtConsumers(void)
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Create stream: ");
jsStreamConfig_Init(&scfg);
scfg.Name = "A";
scfg.Subjects = (const char*[2]){"foo", "bar"};
scfg.SubjectsLen = 2;
s = js_AddStream(NULL, js, &scfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Create sub: ");
s = natsConnection_SubscribeSync(&sub, nc, "$JS.API.CONSUMER.>");
testCond(s == NATS_OK);

test("Ephemeral with name: ");
jsConsumerConfig_Init(&cfg);
cfg.Name = "a";
cfg.AckPolicy = js_AckExplicit;
cfg.InactiveThreshold = NATS_SECONDS_TO_NANOS(1);
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (strcmp(ci->Name, "a") == 0)
&& (ci->Config->Durable == NULL)
&& (ci->Config->InactiveThreshold == NATS_SECONDS_TO_NANOS(1))
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A.a") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":\"a\"") != NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Durable: ");
jsConsumerConfig_Init(&cfg);
cfg.Durable = "b";
cfg.AckPolicy = js_AckExplicit;
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (strcmp(ci->Name, "b") == 0)
&& (strcmp(ci->Config->Durable, "b") == 0)
&& (ci->Config->InactiveThreshold == 0)
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.DURABLE.CREATE.A.b") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":") == NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Durable and Name same: ");
jsConsumerConfig_Init(&cfg);
cfg.Name = "b";
cfg.Durable = "b";
cfg.AckPolicy = js_AckExplicit;
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (strcmp(ci->Name, "b") == 0)
&& (strcmp(ci->Config->Durable, "b") == 0)
&& (ci->Config->InactiveThreshold == 0)
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check subject: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A.b") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":\"b\"") != NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Durable and Name different: ");
jsConsumerConfig_Init(&cfg);
cfg.Name = "x";
cfg.Durable = "y";
cfg.AckPolicy = js_AckExplicit;
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_ERR)
&& (jerr == JSConsumerDurableNameNotMatchSubjectErr)
&& (strstr(nats_GetLastError(NULL), "consumer name in subject does not match durable name in request") != NULL));

test("Check subject: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A.x") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":\"x\"") != NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Ephemeral with filter: ");
jsConsumerConfig_Init(&cfg);
cfg.Name = "c";
cfg.AckPolicy = js_AckExplicit;
cfg.FilterSubject = "bar";
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (strcmp(ci->Name, "c") == 0)
&& (ci->Config->Durable == NULL)
&& (ci->Config->InactiveThreshold != 0)
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check subject: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A.c.bar") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":\"c\"") != NULL));
natsMsg_Destroy(resp);
resp = NULL;

test("Legacy ephemeral: ");
jsConsumerConfig_Init(&cfg);
cfg.AckPolicy = js_AckExplicit;
cfg.FilterSubject = "bar";
s = js_AddConsumer(&ci, js, "A", &cfg, NULL, &jerr);
testCond((s == NATS_OK)
&& (ci->Name != NULL)
&& (ci->Config->Durable == NULL)
&& (ci->Config->InactiveThreshold != 0)
&& (jerr == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;

test("Check subject: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK)
&& (resp != NULL)
&& (strcmp(natsMsg_GetSubject(resp), "$JS.API.CONSUMER.CREATE.A") == 0)
&& (strstr(natsMsg_GetData(resp), "\"name\":") == NULL));
natsMsg_Destroy(resp);
resp = NULL;

natsSubscription_Destroy(sub);

JS_TEARDOWN;
}

Expand Down

0 comments on commit 35abb18

Please sign in to comment.