Skip to content

Commit

Permalink
Make sure that o.subjf is nil or checked if empty
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Jun 1, 2023
1 parent a8adeb7 commit 1c4f664
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
11 changes: 8 additions & 3 deletions server/consumer.go
Expand Up @@ -1735,7 +1735,12 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
o.mu.Lock()

// When we're done with signaling, we can replace the subjects.
o.subjf = newSubjf
// If filters were removed, set `o.subjf` to nil.
if len(newSubjf) == 0 {
o.subjf = nil
} else {
o.subjf = newSubjf
}
}

// Record new config for others that do not need special handling.
Expand Down Expand Up @@ -3247,7 +3252,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
store := o.mset.store

// If no filters are specified, optimize to fetch just non-filtered messages.
if o.subjf == nil {
if len(o.subjf) == 0 {
// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
o.mu.Unlock()
Expand Down Expand Up @@ -3308,7 +3313,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// even if len == 0 or 1.
// TODO(tp): we should have sort based off generics for server
// to avoid reflection.
if o.subjf != nil && len(o.subjf) > 1 {
if len(o.subjf) > 1 {
sort.Slice(o.subjf, func(i, j int) bool {
if o.subjf[j].pmsg != nil && o.subjf[i].pmsg == nil {
return false
Expand Down
50 changes: 50 additions & 0 deletions server/jetstream_consumer_test.go
Expand Up @@ -28,6 +28,56 @@ import (
"github.com/nats-io/nats.go"
)

func TestJetStreamConsumerMultipleFiltersRemoveFilters(t *testing.T) {

s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
acc := s.GlobalAccount()

mset, err := acc.addStream(&StreamConfig{
Name: "TEST",
Retention: LimitsPolicy,
Subjects: []string{"one", "two", "three"},
MaxAge: time.Second * 90,
})
require_NoError(t, err)

_, err = mset.addConsumer(&ConsumerConfig{
Durable: "consumer",
FilterSubjects: []string{"one", "two"},
})
require_NoError(t, err)

sendStreamMsg(t, nc, "one", "data")
sendStreamMsg(t, nc, "two", "data")
sendStreamMsg(t, nc, "three", "data")

consumer, err := js.PullSubscribe("", "consumer", nats.Bind("TEST", "consumer"))
require_NoError(t, err)

msgs, err := consumer.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)

_, err = mset.addConsumer(&ConsumerConfig{
Durable: "consumer",
FilterSubjects: []string{},
})
require_NoError(t, err)

msgs, err = consumer.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)

msgs, err = consumer.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)

}

func TestJetStreamConsumerMultipleFiltersRace(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down

0 comments on commit 1c4f664

Please sign in to comment.