Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix updating a non unique consumer on workqueue stream not returning an error #4654

Merged
merged 1 commit into from Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion server/consumer.go
Expand Up @@ -767,6 +767,13 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if action == ActionCreate && !reflect.DeepEqual(*config, eo.config()) {
return nil, NewJSConsumerAlreadyExistsError()
}
// Check for overlapping subjects.
if mset.cfg.Retention == WorkQueuePolicy {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if !mset.partitionUnique(cName, subjects) {
return nil, NewJSConsumerWQConsumerNotUniqueError()
}
}
err := eo.updateConfig(config)
if err == nil {
return eo, nil
Expand Down Expand Up @@ -805,7 +812,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if len(subjects) == 0 {
mset.mu.Unlock()
return nil, NewJSConsumerWQMultipleUnfilteredError()
} else if !mset.partitionUnique(subjects) {
} else if !mset.partitionUnique(cName, subjects) {
// Prior to v2.9.7, on a stream with WorkQueue policy, the servers
// were not catching the error of having multiple consumers with
// overlapping filter subjects depending on the scope, for instance
Expand Down
75 changes: 75 additions & 0 deletions server/jetstream_consumer_test.go
Expand Up @@ -534,6 +534,81 @@ func TestJetStreamConsumerActions(t *testing.T) {
require_Error(t, err)
}

func TestJetStreamConsumerActionsOnWorkQueuePolicyStream(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()
acc := s.GlobalAccount()

mset, err := acc.addStream(&StreamConfig{
Name: "TEST",
Retention: WorkQueuePolicy,
Subjects: []string{"one", "two", "three", "four", "five.>"},
})
require_NoError(t, err)

_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C1",
FilterSubjects: []string{"one", "two"},
AckPolicy: AckExplicit,
}, ActionCreate)
require_NoError(t, err)

_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C2",
FilterSubjects: []string{"three", "four"},
AckPolicy: AckExplicit,
}, ActionCreate)
require_NoError(t, err)

_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C3",
FilterSubjects: []string{"five.*"},
AckPolicy: AckExplicit,
}, ActionCreate)
require_NoError(t, err)

// Updating a consumer by removing a previous subject filter.
_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C1",
FilterSubjects: []string{"one"}, // Remove a subject.
AckPolicy: AckExplicit,
}, ActionUpdate)
require_NoError(t, err)

// Updating a consumer without overlapping subjects.
_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C2",
FilterSubjects: []string{"three", "four", "two"}, // Add previously removed subject.
AckPolicy: AckExplicit,
}, ActionUpdate)
require_NoError(t, err)

// Creating a consumer with overlapping subjects should return an error.
_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C4",
FilterSubjects: []string{"one", "two", "three", "four"},
AckPolicy: AckExplicit,
}, ActionCreate)
require_Error(t, err)
if !IsNatsErr(err, JSConsumerWQConsumerNotUniqueErr) {
t.Errorf("want error %q, got %q", ApiErrors[JSConsumerWQConsumerNotUniqueErr], err)
}

// Updating a consumer with overlapping subjects should return an error.
_, err = mset.addConsumerWithAction(&ConsumerConfig{
Durable: "C3",
FilterSubjects: []string{"one", "two", "three", "four"},
AckPolicy: AckExplicit,
}, ActionUpdate)
require_Error(t, err)
if !IsNatsErr(err, JSConsumerWQConsumerNotUniqueErr) {
t.Errorf("want error %q, got %q", ApiErrors[JSConsumerWQConsumerNotUniqueErr], err)
}
}

func TestJetStreamConsumerActionsViaAPI(t *testing.T) {

s := RunBasicJetStreamServer(t)
Expand Down
8 changes: 6 additions & 2 deletions server/stream.go
Expand Up @@ -5266,9 +5266,13 @@ func (mset *stream) Store() StreamStore {

// Determines if the new proposed partition is unique amongst all consumers.
// Lock should be held.
func (mset *stream) partitionUnique(partitions []string) bool {
func (mset *stream) partitionUnique(name string, partitions []string) bool {
for _, partition := range partitions {
for _, o := range mset.consumers {
for n, o := range mset.consumers {
// Skip the consumer being checked.
if n == name {
continue
}
if o.subjf == nil {
return false
}
Expand Down