Skip to content

Commit

Permalink
[IMPROVE] Improvements and fixes to stream source consumer creation o…
Browse files Browse the repository at this point in the history
…n leadership change or on stream config source updates (#4009)

 - [X] Tests added
- [X] Branch rebased on top of current main (`git pull --rebase origin
main`)
- [X] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [x] Build is green in Travis CI
- [X] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)
  • Loading branch information
derekcollison committed Apr 5, 2023
2 parents ce115ab + 11d88ef commit 4525bde
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 128 deletions.
10 changes: 10 additions & 0 deletions server/errors.json
Expand Up @@ -1378,5 +1378,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSSourceDuplicateDetected",
"code": 400,
"error_code": 10140,
"description": "duplicate source configuration detected",
"comment": "source stream, filter and transform (plus external if present) must form a unique combination",
"help": "",
"url": "",
"deprecates": ""
}
]
78 changes: 51 additions & 27 deletions server/jetstream_cluster_1_test.go
Expand Up @@ -5252,7 +5252,7 @@ func TestJetStreamClusterLeaderStepdown(t *testing.T) {
}
}

func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) {
func TestJetStreamClusterSourcesFilteringAndUpdating(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 5)
defer c.shutdown()

Expand Down Expand Up @@ -5295,6 +5295,7 @@ func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) {
require_NoError(t, err)
defer js.DeleteStream("TEST")

// Create M stream with a single source on "foo"
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "foo"}},
Expand All @@ -5303,54 +5304,77 @@ func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) {
require_NoError(t, err)
defer js.DeleteStream("M")

// check a message on "bar" doesn't get sourced
sendBatch("bar", 100)
checkSync(100, 0)
// check a message on "foo" does get sourced
sendBatch("foo", 100)
// The source stream remains at 100 msgs as it filters for foo
checkSync(200, 100)

// change filter subject
// change remove the source on "foo" and add a new source on "bar"
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "M",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "bar"}},
Replicas: 2,
})
require_NoError(t, err)

sendBatch("foo", 100)
// The source stream remains at 100 msgs as it filters for bar
checkSync(300, 100)
// as it is a new source (never been sourced before) it starts sourcing at the start of TEST
// and therefore sources the message on "bar" that is in TEST
checkSync(200, 200)

sendBatch("bar", 100)
checkSync(400, 200)

// test unsuspected re delivery by sending to filtered subject
// new messages on "foo" are being filtered as it's not being currently sourced
sendBatch("foo", 100)
checkSync(500, 200)
checkSync(300, 200)
// new messages on "bar" are being sourced
sendBatch("bar", 100)
checkSync(400, 300)

// change filter subject to foo, as the internal sequence number does not cover the previously filtered tail end
// re-add the source for "foo" keep the source on "bar"
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "M",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "foo"}},
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "bar"}, {Name: "TEST", FilterSubject: "foo"}},
Replicas: 2,
})
require_NoError(t, err)
// The filter was completely switched, which is why we only receive new messages
checkSync(500, 200)
sendBatch("foo", 100)
checkSync(600, 300)

// check the 'backfill' of messages on "foo" that were published while the source was inactive
checkSync(400, 400)

// causes startingSequenceForSources() to be called
nc.Close()
c.stopAll()
c.restartAll()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnStreamLeader("$G", "M")

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// check that it restarted the sources' consumers at the right place
checkSync(400, 400)

// check both sources are still active
sendBatch("bar", 100)
checkSync(700, 300)
checkSync(500, 500)
sendBatch("foo", 100)
checkSync(600, 600)

// change filter subject to *, as the internal sequence number does not cover the previously filtered tail end
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "M",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "*"}},
Replicas: 2,
})
require_NoError(t, err)
// no send was necessary as we received previously filtered messages
checkSync(700, 400)
// Check that purging the stream and does not cause the sourcing of the messages
js.PurgeStream("M")
checkSync(600, 0)

// Even after a leader change or restart
nc.Close()
c.stopAll()
c.restartAll()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnStreamLeader("$G", "M")

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

checkSync(600, 0)
}

func TestJetStreamClusterSourcesUpdateOriginError(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Expand Up @@ -278,6 +278,9 @@ const (
// JSSourceConsumerSetupFailedErrF General source consumer setup failure string ({err})
JSSourceConsumerSetupFailedErrF ErrorIdentifier = 10045

// JSSourceDuplicateDetected source stream, filter and transform must form a unique combination (duplicate source configuration detected)
JSSourceDuplicateDetected ErrorIdentifier = 10140

// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046

Expand Down Expand Up @@ -513,6 +516,7 @@ var (
JSSequenceNotFoundErrF: {Code: 400, ErrCode: 10043, Description: "sequence {seq} not found"},
JSSnapshotDeliverSubjectInvalidErr: {Code: 400, ErrCode: 10015, Description: "deliver subject not valid"},
JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"},
JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"},
JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"},
Expand Down Expand Up @@ -1585,6 +1589,16 @@ func NewJSSourceConsumerSetupFailedError(err error, opts ...ErrorOption) *ApiErr
}
}

// NewJSSourceDuplicateDetectedError creates a new JSSourceDuplicateDetected error: "duplicate source configuration detected"
func NewJSSourceDuplicateDetectedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSSourceDuplicateDetected]
}

// NewJSSourceMaxMessageSizeTooBigError creates a new JSSourceMaxMessageSizeTooBigErr error: "stream source must have max message size >= target"
func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
16 changes: 8 additions & 8 deletions server/jetstream_test.go
Expand Up @@ -11415,12 +11415,12 @@ func TestJetStreamSourceBasics(t *testing.T) {
return nil
})

m, err := js.GetMsg("MS", 1)
ss, err := js.SubscribeSync("foo2.foo", nats.BindStream("MS"))
require_NoError(t, err)

if m.Subject != "foo2.foo" {
t.Fatalf("Expected message subject foo2.foo, got %s", m.Subject)
}
// we must have at least one message on the transformed subject name (ie no timeout)
_, err = ss.NextMsg(time.Millisecond)
require_NoError(t, err)
ss.Drain()

// Test Source Updates
ncfg := &nats.StreamConfig{
Expand Down Expand Up @@ -11461,11 +11461,11 @@ func TestJetStreamSourceBasics(t *testing.T) {
return nil
})
// Double check first starting.
m, err = js.GetMsg("FMS", 1)
m, err := js.GetMsg("FMS", 1)
require_NoError(t, err)
if shdr := m.Header.Get(JSStreamSource); shdr == _EMPTY_ {
t.Fatalf("Expected a header, got none")
} else if _, sseq := streamAndSeq(shdr); sseq != 26 {
} else if _, _, sseq := streamAndSeq(shdr); sseq != 26 {
t.Fatalf("Expected header sequence of 26, got %d", sseq)
}

Expand Down Expand Up @@ -11493,7 +11493,7 @@ func TestJetStreamSourceBasics(t *testing.T) {
}
if shdr := m.Header.Get(JSStreamSource); shdr == _EMPTY_ {
t.Fatalf("Expected a header, got none")
} else if _, sseq := streamAndSeq(shdr); sseq != 11 {
} else if _, _, sseq := streamAndSeq(shdr); sseq != 11 {
t.Fatalf("Expected header sequence of 11, got %d", sseq)
}
}
Expand Down

0 comments on commit 4525bde

Please sign in to comment.