Skip to content

Commit

Permalink
Use new consumer create subject when single subject filter specified …
Browse files Browse the repository at this point in the history
…in `SubjectFilters`

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 19, 2023
1 parent 07a887b commit e98dfc8
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
75 changes: 75 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21853,3 +21853,78 @@ func TestJetStreamSyncInterval(t *testing.T) {
})
}
}

func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

extEndpoint := make(chan *nats.Msg, 1)
normalEndpoint := make(chan *nats.Msg, 1)

_, err := nc.ChanSubscribe(JSApiConsumerCreateEx, extEndpoint)
require_NoError(t, err)

_, err = nc.ChanSubscribe(JSApiConsumerCreate, normalEndpoint)
require_NoError(t, err)

testStreamSource := func(name string, shouldBeExtended bool, ss StreamSource) {
t.Run(name, func(t *testing.T) {
req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("foo.%s", name)},
Sources: []*StreamSource{&ss},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)

_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)

select {
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for receive consumer create")
case <-extEndpoint:
if !shouldBeExtended {
t.Fatalf("Expected normal consumer create, got extended")
}
case <-normalEndpoint:
if shouldBeExtended {
t.Fatalf("Expected extended consumer create, got normal")
}
}
})
}

testStreamSource("OneFilterSubject", true, StreamSource{
Name: "source",
FilterSubject: "bar.>",
})

testStreamSource("OneTransform", true, StreamSource{
Name: "source",
SubjectTransforms: []SubjectTransformConfig{
{
Source: "bar.one.>",
Destination: "bar.two.>",
},
},
})

testStreamSource("TwoTransforms", false, StreamSource{
Name: "source",
SubjectTransforms: []SubjectTransformConfig{
{
Source: "bar.one.>",
Destination: "bar.two.>",
},
{
Source: "baz.one.>",
Destination: "baz.two.>",
},
},
})
}
3 changes: 3 additions & 0 deletions server/stream.go
Expand Up @@ -2893,6 +2893,9 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
if req.Config.FilterSubject != _EMPTY_ {
req.Config.Name = fmt.Sprintf("src-%s", createConsumerName())
subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject)
} else if len(req.Config.FilterSubjects) == 1 {
req.Config.Name = fmt.Sprintf("src-%s", createConsumerName())
subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubjects[0])
} else {
subject = fmt.Sprintf(JSApiConsumerCreateT, si.name)
}
Expand Down

0 comments on commit e98dfc8

Please sign in to comment.