Skip to content

Commit

Permalink
Fix updating a non unique consumer on workqueue stream not returning …
Browse files Browse the repository at this point in the history
…an error (#4654)

This is a possible fix for #4653.

Changes made:
1. Added tests for creating and updating consumers on a work queue
stream with overlapping subjects.
2. Check for overlapping subjects before
[updating](https://github.com/nats-io/nats-server/blob/a25af02c7348d1e39cce66a5c275e1740fb0ca81/server/consumer.go#L770)
the consumer config.
3. Changed [`func (*stream).partitionUnique(partitions []string)
bool`](https://github.com/nats-io/nats-server/blob/a25af02c7348d1e39cce66a5c275e1740fb0ca81/server/stream.go#L5269)
to accept the consumer name being checked so we can skip it while
checking for overlapping subjects (Required for
[`FilterSubjects`](https://github.com/nats-io/nats-server/blob/a25af02c7348d1e39cce66a5c275e1740fb0ca81/server/consumer.go#L75)
updates), wasn't needed before because the checks were made on creation
only.

There's only 1 thing that I'm not sure about.

In the [current work queue stream conflict
checks](https://github.com/nats-io/nats-server/blob/a25af02c7348d1e39cce66a5c275e1740fb0ca81/server/consumer.go#L796),
the consumer config `Direct` is being checked if `false`, should we also
make this check before the update?

Signed-off-by: Pierre Mdawar <pierre@mdawar.dev>
  • Loading branch information
derekcollison committed Oct 12, 2023
2 parents ea0843f + c46d809 commit 1e8f6bf
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 3 deletions.
9 changes: 8 additions & 1 deletion server/consumer.go
Expand Up @@ -767,6 +767,13 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if action == ActionCreate && !reflect.DeepEqual(*config, eo.config()) {
return nil, NewJSConsumerAlreadyExistsError()
}
// Check for overlapping subjects.
if mset.cfg.Retention == WorkQueuePolicy {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if !mset.partitionUnique(cName, subjects) {
return nil, NewJSConsumerWQConsumerNotUniqueError()
}
}
err := eo.updateConfig(config)
if err == nil {
return eo, nil
Expand Down Expand Up @@ -805,7 +812,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if len(subjects) == 0 {
mset.mu.Unlock()
return nil, NewJSConsumerWQMultipleUnfilteredError()
} else if !mset.partitionUnique(subjects) {
} else if !mset.partitionUnique(cName, subjects) {
// Prior to v2.9.7, on a stream with WorkQueue policy, the servers
// were not catching the error of having multiple consumers with
// overlapping filter subjects depending on the scope, for instance
Expand Down
75 changes: 75 additions & 0 deletions server/jetstream_consumer_test.go
Expand Up @@ -534,6 +534,81 @@ func TestJetStreamConsumerActions(t *testing.T) {
require_Error(t, err)
}

func TestJetStreamConsumerActionsOnWorkQueuePolicyStream(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()
acc := s.GlobalAccount()

mset, err := acc.addStream(&StreamConfig{
Name: "TEST",
Retention: WorkQueuePolicy,
Subjects: []string{"one", "two", "three", "four", "five.>"},
})
require_NoError(t, err)

_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C1",
FilterSubjects: []string{"one", "two"},
AckPolicy: AckExplicit,
}, ActionCreate)
require_NoError(t, err)

_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C2",
FilterSubjects: []string{"three", "four"},
AckPolicy: AckExplicit,
}, ActionCreate)
require_NoError(t, err)

_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C3",
FilterSubjects: []string{"five.*"},
AckPolicy: AckExplicit,
}, ActionCreate)
require_NoError(t, err)

// Updating a consumer by removing a previous subject filter.
_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C1",
FilterSubjects: []string{"one"}, // Remove a subject.
AckPolicy: AckExplicit,
}, ActionUpdate)
require_NoError(t, err)

// Updating a consumer without overlapping subjects.
_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C2",
FilterSubjects: []string{"three", "four", "two"}, // Add previously removed subject.
AckPolicy: AckExplicit,
}, ActionUpdate)
require_NoError(t, err)

// Creating a consumer with overlapping subjects should return an error.
_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C4",
FilterSubjects: []string{"one", "two", "three", "four"},
AckPolicy: AckExplicit,
}, ActionCreate)
require_Error(t, err)
if !IsNatsErr(err, JSConsumerWQConsumerNotUniqueErr) {
t.Errorf("want error %q, got %q", ApiErrors[JSConsumerWQConsumerNotUniqueErr], err)
}

// Updating a consumer with overlapping subjects should return an error.
_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C3",
FilterSubjects: []string{"one", "two", "three", "four"},
AckPolicy: AckExplicit,
}, ActionUpdate)
require_Error(t, err)
if !IsNatsErr(err, JSConsumerWQConsumerNotUniqueErr) {
t.Errorf("want error %q, got %q", ApiErrors[JSConsumerWQConsumerNotUniqueErr], err)
}
}

func TestJetStreamConsumerActionsViaAPI(t *testing.T) {

s := RunBasicJetStreamServer(t)
Expand Down
8 changes: 6 additions & 2 deletions server/stream.go
Expand Up @@ -5266,9 +5266,13 @@ func (mset *stream) Store() StreamStore {

// Determines if the new proposed partition is unique amongst all consumers.
// Lock should be held.
func (mset *stream) partitionUnique(partitions []string) bool {
func (mset *stream) partitionUnique(name string, partitions []string) bool {
for _, partition := range partitions {
for _, o := range mset.consumers {
for n, o := range mset.consumers {
// Skip the consumer being checked.
if n == name {
continue
}
if o.subjf == nil {
return false
}
Expand Down

0 comments on commit 1e8f6bf

Please sign in to comment.