Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple consumer subject filters #3500

Merged
merged 1 commit into from Feb 13, 2023

Conversation

Jarema
Copy link
Member

@Jarema Jarema commented Sep 27, 2022

Assumptions:

  • Separate single filter and multiple filters logic to not break anything in current codebase (not all done)
  • Make it reasonable performant, but not at the cost of increasing complexity (a first pass)

What's missing:

  • Make sure all tests pass
  • Add more coverage for multiple subject filters edge cases

@ripienaar
Copy link
Contributor

This would allow bypassing the recently added filter specific consumer create subject mechanism - I personally dont think that's fit for purpose and should be remove - but anyway it needs consideration.

@derekcollison
Copy link
Member

This would allow bypassing the recently added filter specific consumer create subject mechanism - I personally dont think that's fit for purpose and should be remove - but anyway it needs consideration.

Any thoughts on how to secure without a callout architecture?

@ripienaar
Copy link
Contributor

Any thoughts on how to secure without a callout architecture?

not right now as my thinking is around the larger roadmap item, but I know the model we have now will not work long term and as we already know have significant holes and bypasses via consumer update to name one. It's unfortunate that clients already started adopting the new subject as there's no way this will work long term

@derekcollison
Copy link
Member

Any thoughts on how to secure without a callout architecture?

not right now as my thinking is around the larger roadmap item, but I know the model we have now will not work long term and as we already know have significant holes and bypasses via consumer update to name one. It's unfortunate that clients already started adopting the new subject as there's no way this will work long term

With correct permissions works quite well for what we wanted for a large customer.

@ppd0705
Copy link

ppd0705 commented Oct 26, 2022

I'm a new user of nats. in my usage scenario I really need this feature, otherwise I need to create multiple consumers.

@derekcollison
Copy link
Member

This will most likely make it in for 2.10.0 release..

In the meantime, you could have the subjects in separate streams and source them into a single one in which you connect your consumer.

Or have a common prefix for the ones you want to bundle together and use wildcard matching as it exists today.

@Jarema Jarema changed the base branch from main to dev December 14, 2022 12:17
@derekcollison
Copy link
Member

Rebase please don't merge fir feature branches.

@Jarema Jarema force-pushed the multiple-consumer-filters branch 3 times, most recently from 822e1a5 to 0090ee1 Compare January 6, 2023 09:59
@Jarema Jarema force-pushed the multiple-consumer-filters branch 2 times, most recently from 7982181 to 5f2ea90 Compare January 16, 2023 14:50
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/jetstream_test.go Outdated Show resolved Hide resolved
server/jetstream_test.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
Copy link
Contributor

@piotrpio piotrpio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@Jarema Jarema force-pushed the multiple-consumer-filters branch 2 times, most recently from 82ffb54 to 4d208d5 Compare January 23, 2023 13:56
@Jarema Jarema marked this pull request as ready for review January 23, 2023 13:56
@Jarema Jarema changed the title [DRAFT WIP] Multiple consumer subject filters Multiple consumer subject filters Jan 23, 2023
server/errors.json Outdated Show resolved Hide resolved
server/errors.json Outdated Show resolved Hide resolved
server/jetstream_test.go Outdated Show resolved Hide resolved
server/jetstream_test.go Outdated Show resolved Hide resolved
server/jetstream_test.go Outdated Show resolved Hide resolved
server/jetstream_test.go Outdated Show resolved Hide resolved
})
require_NoError(t, err)

js.Publish("other", []byte(fmt.Sprintf("%d", 100)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendMsg

server/stream.go Outdated
if preq != nil && !o.isFilteredMatch(preq.Subject) {
o.mu.RUnlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe capture in variable to simplify locking.

o.mu.RLock()
doPurge := preq == nil || o.isFilteredMatch(preq.Subject)
o.mu.RUnlock()
if doPurge {
  o.purge(fseq, lseq)
}

server/stream.go Show resolved Hide resolved
server/stream.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
}
}
}
if subjectFilters[0] != _EMPTY_ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can subjectFilters be empty slice or nil?

We should check len(subjectFilters) > 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't.

func subjectFilters(filter string, filters []string) []string {
	if len(filters) == 0 {
		return []string{filter}
	}
	return filters
}

Will always return slice of len >= 1, but I don't like that part of the code too, as that's implicit and simple change in that function can break it.

What about changing subjectFilters to:

// Utility for simpler if conditions in Consumer config checks.
// In future iteration, we can immediately create `o.subjf` and
// use it to validate things.
func subjectFilters(filter string, filters []string) ([]string, bool) {
	if len(filters) == 0 {
		return []string{filter}, filter == _EMPTY_
	}
	return filters, filters[0] == _EMPTY_
}

That simplifies most uses of it and removes the assumption that len() > 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of that, if not filters subjf should be nil, and check for len(subjf) > 0 should always return true iff there are filters.

Suggest we rework that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about..

func subjectFilters(filter string, filters []string) ([]string, bool) {
	if filter != _EMPTY_ {
		filters = append(filters, filter)
	}
	return filters, len(filters) > 0
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing the rework now.

sub := &subjectFilter{
subject: filter,
}
if filter != _EMPTY_ && subjectHasWildcard(filter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why could filter be empty?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change above this can be simplified.

server/consumer.go Outdated Show resolved Hide resolved
@@ -988,8 +1019,9 @@ func (o *consumer) setLeader(isLeader bool) {
}

// Update the group on the our starting sequence if we are starting but we skipped some in the stream.
// TODO(tp): check how we should approach this for multiple filters. Ask Derek about this one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sseq should be proper starting point, so code below should be fine regardless of multiple subjects.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove comment.

@@ -3942,20 +4126,37 @@ func (o *consumer) selectStartingSeqNo() {

if state.FirstSeq == 0 {
o.sseq = 1
for i := range o.subjf {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is subjf ptr now? Just use normal range if pointer vs using index.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's switch to updating the value?

o.subjf[i].nextSeq = 1
}
if filter.nextSeq < state.FirstSeq {
o.subjf[i].nextSeq = state.FirstSeq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

o.subjf[i].nextSeq = state.FirstSeq
}
if filter.nextSeq > state.LastSeq {
o.subjf[i].nextSeq = state.LastSeq + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

server/consumer.go Show resolved Hide resolved
server/stream.go Outdated
@@ -4610,6 +4612,11 @@ func (mset *stream) setConsumer(o *consumer) {
if o.cfg.FilterSubject != _EMPTY_ {
mset.numFilter++
}
for _, subject := range o.cfg.FilterSubjects {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be valid after checking config that an entry be == EMPTY?

Copy link
Member Author

@Jarema Jarema Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you pass `filterSubjects: []string{"", "two"} it will fail the validation and return overlapping subjects error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here subject can not ever equal EMPTY correct?

Copy link
Member Author

@Jarema Jarema Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, I now treat this as an error (client passing []string{""}

@@ -536,6 +545,9 @@ var (
JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"},
JSTempStorageFailedErr: {Code: 500, ErrCode: 10072, Description: "JetStream unable to open temp storage for restore"},
JSTemplateNameNotMatchSubjectErr: {Code: 400, ErrCode: 10073, Description: "template name in subject does not match request"},
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"},
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "consumer cannot have both FilterSubject and FilterSubjects specified"},

@@ -536,6 +545,9 @@ var (
JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"},
JSTempStorageFailedErr: {Code: 500, ErrCode: 10072, Description: "JetStream unable to open temp storage for restore"},
JSTemplateNameNotMatchSubjectErr: {Code: 400, ErrCode: 10073, Description: "template name in subject does not match request"},
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"},
JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10135, Description: "Consumer with multiple subject filters cannot use subject based API"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10135, Description: "Consumer with multiple subject filters cannot use subject based API"},
JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10135, Description: "consumer with multiple subject filters cannot use subject based API"},

@@ -536,6 +545,9 @@ var (
JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"},
JSTempStorageFailedErr: {Code: 500, ErrCode: 10072, Description: "JetStream unable to open temp storage for restore"},
JSTemplateNameNotMatchSubjectErr: {Code: 400, ErrCode: 10073, Description: "template name in subject does not match request"},
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"},
JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10135, Description: "Consumer with multiple subject filters cannot use subject based API"},
JsConsumerOverlappingSubjectFilters: {Code: 400, ErrCode: 10136, Description: "Consumer subject filters cannot overlap"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JsConsumerOverlappingSubjectFilters: {Code: 400, ErrCode: 10136, Description: "Consumer subject filters cannot overlap"},
JsConsumerOverlappingSubjectFilters: {Code: 400, ErrCode: 10136, Description: "consumer subject filters cannot overlap"},

@Jarema Jarema force-pushed the multiple-consumer-filters branch 5 times, most recently from 6754604 to 618027b Compare February 1, 2023 21:57
@Jarema Jarema force-pushed the multiple-consumer-filters branch 3 times, most recently from e94752e to 123ec50 Compare February 6, 2023 22:10
server/stream.go Outdated
if o.cfg.FilterSubject != _EMPTY_ {
mset.numFilter++
if o.subjf != nil {
mset.numFilter += len(o.subjf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just is number of filtered consumers, so still could be just ++ here.

so would suggest.

if len(o.subjf) > 0 {
  mset.numFilter++
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah! Thought it's number of filters.
Fixed and added comment to struct field.

server/stream.go Outdated
}
if newFilter != _EMPTY_ {
mset.numFilter++
mset.numFilter += len(newFilters)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just check once all updated, if len(o.subjf) == 0 { mset.numFilter-- }

server/stream.go Outdated
func (mset *stream) partitionUnique(partitions []string) bool {
for _, partition := range partitions {
for _, o := range mset.consumers {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer tighter, so no new line here I think. Also can remove check for nil since not in fast path range setup ok.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small change and we should be good to go.

server/stream.go Outdated
@@ -4625,7 +4625,7 @@ func (mset *stream) numConsumers() int {
func (mset *stream) setConsumer(o *consumer) {
mset.consumers[o.name] = o
if o.subjf != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would do len(o.subjf) > 0.. I think that is more succinct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also would match decrement version.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@derekcollison derekcollison merged commit fcd6f87 into nats-io:dev Feb 13, 2023
JsConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137

// JsConsumerOverlappingSubjectFilters consumer subject filters cannot overlap
JsConsumerOverlappingSubjectFilters ErrorIdentifier = 10138
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be like the others above to be consistent, like:

Suggested change
JsConsumerOverlappingSubjectFilters ErrorIdentifier = 10138
JSConsumerOverlappingSubjectFilters ErrorIdentifier = 10138

JsConsumerDuplicateFilterSubjects ErrorIdentifier = 10136

// JsConsumerMultipleFiltersNotAllowed consumer with multiple subject filters cannot use subject based API
JsConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JsConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137
JSConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137

@@ -406,6 +409,15 @@ const (

// JSTemplateNameNotMatchSubjectErr template name in subject does not match request
JSTemplateNameNotMatchSubjectErr ErrorIdentifier = 10073

// JsConsumerDuplicateFilterSubjects consumer cannot have both FilterSubject and FilterSubjects specified
JsConsumerDuplicateFilterSubjects ErrorIdentifier = 10136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JsConsumerDuplicateFilterSubjects ErrorIdentifier = 10136
JSConsumerDuplicateFilterSubjects ErrorIdentifier = 10136

@@ -2124,3 +2150,33 @@ func NewJSTemplateNameNotMatchSubjectError(opts ...ErrorOption) *ApiError {

return ApiErrors[JSTemplateNameNotMatchSubjectErr]
}

// NewJsConsumerDuplicateFilterSubjectsError creates a new JsConsumerDuplicateFilterSubjects error: "consumer cannot have both FilterSubject and FilterSubjects specified"
func NewJsConsumerDuplicateFilterSubjectsError(opts ...ErrorOption) *ApiError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewJsConsumerDuplicateFilterSubjectsError(opts ...ErrorOption) *ApiError {
func NewJSConsumerDuplicateFilterSubjectsError(opts ...ErrorOption) *ApiError {

}

// NewJsConsumerOverlappingSubjectFiltersError creates a new JsConsumerOverlappingSubjectFilters error: "consumer subject filters cannot overlap"
func NewJsConsumerOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewJsConsumerOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError {
func NewJSConsumerOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @Jarema will do in separate PR, but great feedback!

@@ -9387,7 +9387,7 @@ func TestJetStreamPubWithSyncPerf(t *testing.T) {

func TestJetStreamConsumerPerf(t *testing.T) {
// Comment out to run, holding place for now.
t.SkipNow()
// t.SkipNow()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe meant to leave it uncommented to not run the consumer perf?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants