New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple consumer subject filters #3500
Conversation
This would allow bypassing the recently added filter specific consumer create subject mechanism - I personally dont think that's fit for purpose and should be remove - but anyway it needs consideration. |
Any thoughts on how to secure without a callout architecture? |
not right now as my thinking is around the larger roadmap item, but I know the model we have now will not work long term and as we already know have significant holes and bypasses via consumer update to name one. It's unfortunate that clients already started adopting the new subject as there's no way this will work long term |
With correct permissions works quite well for what we wanted for a large customer. |
I'm a new user of nats. in my usage scenario I really need this feature, otherwise I need to create multiple consumers. |
This will most likely make it in for 2.10.0 release.. In the meantime, you could have the subjects in separate streams and source them into a single one in which you connect your consumer. Or have a common prefix for the ones you want to bundle together and use wildcard matching as it exists today. |
Rebase please don't merge fir feature branches. |
822e1a5
to
0090ee1
Compare
7982181
to
5f2ea90
Compare
756885f
to
3a36f36
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
82ffb54
to
4d208d5
Compare
server/jetstream_test.go
Outdated
}) | ||
require_NoError(t, err) | ||
|
||
js.Publish("other", []byte(fmt.Sprintf("%d", 100))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sendMsg
server/stream.go
Outdated
if preq != nil && !o.isFilteredMatch(preq.Subject) { | ||
o.mu.RUnlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe capture in variable to simplify locking.
o.mu.RLock()
doPurge := preq == nil || o.isFilteredMatch(preq.Subject)
o.mu.RUnlock()
if doPurge {
o.purge(fseq, lseq)
}
c7d7201
to
03d702c
Compare
server/consumer.go
Outdated
} | ||
} | ||
} | ||
if subjectFilters[0] != _EMPTY_ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can subjectFilters be empty slice or nil?
We should check len(subjectFilters) > 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't.
func subjectFilters(filter string, filters []string) []string {
if len(filters) == 0 {
return []string{filter}
}
return filters
}
Will always return slice of len >= 1, but I don't like that part of the code too, as that's implicit and simple change in that function can break it.
What about changing subjectFilters
to:
// Utility for simpler if conditions in Consumer config checks.
// In future iteration, we can immediately create `o.subjf` and
// use it to validate things.
func subjectFilters(filter string, filters []string) ([]string, bool) {
if len(filters) == 0 {
return []string{filter}, filter == _EMPTY_
}
return filters, filters[0] == _EMPTY_
}
That simplifies most uses of it and removes the assumption that len() > 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a fan of that, if not filters subjf should be nil, and check for len(subjf) > 0 should always return true iff there are filters.
Suggest we rework that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about..
func subjectFilters(filter string, filters []string) ([]string, bool) {
if filter != _EMPTY_ {
filters = append(filters, filter)
}
return filters, len(filters) > 0
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing the rework now.
server/consumer.go
Outdated
sub := &subjectFilter{ | ||
subject: filter, | ||
} | ||
if filter != _EMPTY_ && subjectHasWildcard(filter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why could filter be empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change above this can be simplified.
server/consumer.go
Outdated
@@ -988,8 +1019,9 @@ func (o *consumer) setLeader(isLeader bool) { | |||
} | |||
|
|||
// Update the group on the our starting sequence if we are starting but we skipped some in the stream. | |||
// TODO(tp): check how we should approach this for multiple filters. Ask Derek about this one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sseq should be proper starting point, so code below should be fine regardless of multiple subjects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove comment.
server/consumer.go
Outdated
@@ -3942,20 +4126,37 @@ func (o *consumer) selectStartingSeqNo() { | |||
|
|||
if state.FirstSeq == 0 { | |||
o.sseq = 1 | |||
for i := range o.subjf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is subjf ptr now? Just use normal range if pointer vs using index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's switch to updating the value?
server/consumer.go
Outdated
o.subjf[i].nextSeq = 1 | ||
} | ||
if filter.nextSeq < state.FirstSeq { | ||
o.subjf[i].nextSeq = state.FirstSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
server/consumer.go
Outdated
o.subjf[i].nextSeq = state.FirstSeq | ||
} | ||
if filter.nextSeq > state.LastSeq { | ||
o.subjf[i].nextSeq = state.LastSeq + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here.
server/stream.go
Outdated
@@ -4610,6 +4612,11 @@ func (mset *stream) setConsumer(o *consumer) { | |||
if o.cfg.FilterSubject != _EMPTY_ { | |||
mset.numFilter++ | |||
} | |||
for _, subject := range o.cfg.FilterSubjects { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be valid after checking config that an entry be == EMPTY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you pass `filterSubjects: []string{"", "two"} it will fail the validation and return overlapping subjects error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here subject can not ever equal EMPTY correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, I now treat this as an error (client passing []string{""}
server/jetstream_errors_generated.go
Outdated
@@ -536,6 +545,9 @@ var ( | |||
JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"}, | |||
JSTempStorageFailedErr: {Code: 500, ErrCode: 10072, Description: "JetStream unable to open temp storage for restore"}, | |||
JSTemplateNameNotMatchSubjectErr: {Code: 400, ErrCode: 10073, Description: "template name in subject does not match request"}, | |||
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"}, | |
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "consumer cannot have both FilterSubject and FilterSubjects specified"}, |
server/jetstream_errors_generated.go
Outdated
@@ -536,6 +545,9 @@ var ( | |||
JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"}, | |||
JSTempStorageFailedErr: {Code: 500, ErrCode: 10072, Description: "JetStream unable to open temp storage for restore"}, | |||
JSTemplateNameNotMatchSubjectErr: {Code: 400, ErrCode: 10073, Description: "template name in subject does not match request"}, | |||
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"}, | |||
JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10135, Description: "Consumer with multiple subject filters cannot use subject based API"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10135, Description: "Consumer with multiple subject filters cannot use subject based API"}, | |
JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10135, Description: "consumer with multiple subject filters cannot use subject based API"}, |
server/jetstream_errors_generated.go
Outdated
@@ -536,6 +545,9 @@ var ( | |||
JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"}, | |||
JSTempStorageFailedErr: {Code: 500, ErrCode: 10072, Description: "JetStream unable to open temp storage for restore"}, | |||
JSTemplateNameNotMatchSubjectErr: {Code: 400, ErrCode: 10073, Description: "template name in subject does not match request"}, | |||
JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"}, | |||
JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10135, Description: "Consumer with multiple subject filters cannot use subject based API"}, | |||
JsConsumerOverlappingSubjectFilters: {Code: 400, ErrCode: 10136, Description: "Consumer subject filters cannot overlap"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsConsumerOverlappingSubjectFilters: {Code: 400, ErrCode: 10136, Description: "Consumer subject filters cannot overlap"}, | |
JsConsumerOverlappingSubjectFilters: {Code: 400, ErrCode: 10136, Description: "consumer subject filters cannot overlap"}, |
6754604
to
618027b
Compare
e94752e
to
123ec50
Compare
server/stream.go
Outdated
if o.cfg.FilterSubject != _EMPTY_ { | ||
mset.numFilter++ | ||
if o.subjf != nil { | ||
mset.numFilter += len(o.subjf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just is number of filtered consumers, so still could be just ++ here.
so would suggest.
if len(o.subjf) > 0 {
mset.numFilter++
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah! Thought it's number of filters.
Fixed and added comment to struct field.
server/stream.go
Outdated
} | ||
if newFilter != _EMPTY_ { | ||
mset.numFilter++ | ||
mset.numFilter += len(newFilters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just check once all updated, if len(o.subjf) == 0 { mset.numFilter-- }
server/stream.go
Outdated
func (mset *stream) partitionUnique(partitions []string) bool { | ||
for _, partition := range partitions { | ||
for _, o := range mset.consumers { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer tighter, so no new line here I think. Also can remove check for nil since not in fast path range setup ok.
75a2476
to
33eb6a9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small change and we should be good to go.
server/stream.go
Outdated
@@ -4625,7 +4625,7 @@ func (mset *stream) numConsumers() int { | |||
func (mset *stream) setConsumer(o *consumer) { | |||
mset.consumers[o.name] = o | |||
if o.subjf != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would do len(o.subjf) > 0.. I think that is more succinct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also would match decrement version.
33eb6a9
to
af338d0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
JsConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137 | ||
|
||
// JsConsumerOverlappingSubjectFilters consumer subject filters cannot overlap | ||
JsConsumerOverlappingSubjectFilters ErrorIdentifier = 10138 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be like the others above to be consistent, like:
JsConsumerOverlappingSubjectFilters ErrorIdentifier = 10138 | |
JSConsumerOverlappingSubjectFilters ErrorIdentifier = 10138 |
JsConsumerDuplicateFilterSubjects ErrorIdentifier = 10136 | ||
|
||
// JsConsumerMultipleFiltersNotAllowed consumer with multiple subject filters cannot use subject based API | ||
JsConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137 | |
JSConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137 |
@@ -406,6 +409,15 @@ const ( | |||
|
|||
// JSTemplateNameNotMatchSubjectErr template name in subject does not match request | |||
JSTemplateNameNotMatchSubjectErr ErrorIdentifier = 10073 | |||
|
|||
// JsConsumerDuplicateFilterSubjects consumer cannot have both FilterSubject and FilterSubjects specified | |||
JsConsumerDuplicateFilterSubjects ErrorIdentifier = 10136 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsConsumerDuplicateFilterSubjects ErrorIdentifier = 10136 | |
JSConsumerDuplicateFilterSubjects ErrorIdentifier = 10136 |
@@ -2124,3 +2150,33 @@ func NewJSTemplateNameNotMatchSubjectError(opts ...ErrorOption) *ApiError { | |||
|
|||
return ApiErrors[JSTemplateNameNotMatchSubjectErr] | |||
} | |||
|
|||
// NewJsConsumerDuplicateFilterSubjectsError creates a new JsConsumerDuplicateFilterSubjects error: "consumer cannot have both FilterSubject and FilterSubjects specified" | |||
func NewJsConsumerDuplicateFilterSubjectsError(opts ...ErrorOption) *ApiError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func NewJsConsumerDuplicateFilterSubjectsError(opts ...ErrorOption) *ApiError { | |
func NewJSConsumerDuplicateFilterSubjectsError(opts ...ErrorOption) *ApiError { |
} | ||
|
||
// NewJsConsumerOverlappingSubjectFiltersError creates a new JsConsumerOverlappingSubjectFilters error: "consumer subject filters cannot overlap" | ||
func NewJsConsumerOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func NewJsConsumerOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError { | |
func NewJSConsumerOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @Jarema will do in separate PR, but great feedback!
@@ -9387,7 +9387,7 @@ func TestJetStreamPubWithSyncPerf(t *testing.T) { | |||
|
|||
func TestJetStreamConsumerPerf(t *testing.T) { | |||
// Comment out to run, holding place for now. | |||
t.SkipNow() | |||
// t.SkipNow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe meant to leave it uncommented to not run the consumer perf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
Assumptions:
What's missing: