Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVE] Improvements and fixes to stream source consumer creation on leadership change or on stream config source updates #4009

Merged
merged 2 commits into from Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/errors.json
Expand Up @@ -1378,5 +1378,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSSourceDuplicateDetected",
"code": 400,
"error_code": 10140,
"description": "duplicate source configuration detected",
"comment": "source stream, filter and transform (plus external if present) must form a unique combination",
"help": "",
"url": "",
"deprecates": ""
}
]
78 changes: 51 additions & 27 deletions server/jetstream_cluster_1_test.go
Expand Up @@ -5252,7 +5252,7 @@ func TestJetStreamClusterLeaderStepdown(t *testing.T) {
}
}

func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) {
func TestJetStreamClusterSourcesFilteringAndUpdating(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 5)
defer c.shutdown()

Expand Down Expand Up @@ -5295,6 +5295,7 @@ func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) {
require_NoError(t, err)
defer js.DeleteStream("TEST")

// Create M stream with a single source on "foo"
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "foo"}},
Expand All @@ -5303,54 +5304,77 @@ func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) {
require_NoError(t, err)
defer js.DeleteStream("M")

// check a message on "bar" doesn't get sourced
sendBatch("bar", 100)
checkSync(100, 0)
// check a message on "foo" does get sourced
sendBatch("foo", 100)
// The source stream remains at 100 msgs as it filters for foo
checkSync(200, 100)

// change filter subject
// change remove the source on "foo" and add a new source on "bar"
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "M",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "bar"}},
Replicas: 2,
})
require_NoError(t, err)

sendBatch("foo", 100)
// The source stream remains at 100 msgs as it filters for bar
checkSync(300, 100)
// as it is a new source (never been sourced before) it starts sourcing at the start of TEST
// and therefore sources the message on "bar" that is in TEST
checkSync(200, 200)

sendBatch("bar", 100)
checkSync(400, 200)

// test unsuspected re delivery by sending to filtered subject
// new messages on "foo" are being filtered as it's not being currently sourced
sendBatch("foo", 100)
checkSync(500, 200)
checkSync(300, 200)
// new messages on "bar" are being sourced
sendBatch("bar", 100)
checkSync(400, 300)

// change filter subject to foo, as the internal sequence number does not cover the previously filtered tail end
// re-add the source for "foo" keep the source on "bar"
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "M",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "foo"}},
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "bar"}, {Name: "TEST", FilterSubject: "foo"}},
Replicas: 2,
})
require_NoError(t, err)
// The filter was completely switched, which is why we only receive new messages
checkSync(500, 200)
sendBatch("foo", 100)
checkSync(600, 300)

// check the 'backfill' of messages on "foo" that were published while the source was inactive
checkSync(400, 400)

// causes startingSequenceForSources() to be called
nc.Close()
c.stopAll()
c.restartAll()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnStreamLeader("$G", "M")

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// check that it restarted the sources' consumers at the right place
checkSync(400, 400)

// check both sources are still active
sendBatch("bar", 100)
checkSync(700, 300)
checkSync(500, 500)
sendBatch("foo", 100)
checkSync(600, 600)

// change filter subject to *, as the internal sequence number does not cover the previously filtered tail end
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "M",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "*"}},
Replicas: 2,
})
require_NoError(t, err)
// no send was necessary as we received previously filtered messages
checkSync(700, 400)
// Check that purging the stream and does not cause the sourcing of the messages
js.PurgeStream("M")
checkSync(600, 0)

// Even after a leader change or restart
nc.Close()
c.stopAll()
c.restartAll()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnStreamLeader("$G", "M")

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

checkSync(600, 0)
}

func TestJetStreamClusterSourcesUpdateOriginError(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Expand Up @@ -278,6 +278,9 @@ const (
// JSSourceConsumerSetupFailedErrF General source consumer setup failure string ({err})
JSSourceConsumerSetupFailedErrF ErrorIdentifier = 10045

// JSSourceDuplicateDetected source stream, filter and transform must form a unique combination (duplicate source configuration detected)
JSSourceDuplicateDetected ErrorIdentifier = 10140

// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046

Expand Down Expand Up @@ -513,6 +516,7 @@ var (
JSSequenceNotFoundErrF: {Code: 400, ErrCode: 10043, Description: "sequence {seq} not found"},
JSSnapshotDeliverSubjectInvalidErr: {Code: 400, ErrCode: 10015, Description: "deliver subject not valid"},
JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"},
JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"},
JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"},
Expand Down Expand Up @@ -1585,6 +1589,16 @@ func NewJSSourceConsumerSetupFailedError(err error, opts ...ErrorOption) *ApiErr
}
}

// NewJSSourceDuplicateDetectedError creates a new JSSourceDuplicateDetected error: "duplicate source configuration detected"
func NewJSSourceDuplicateDetectedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSSourceDuplicateDetected]
}

// NewJSSourceMaxMessageSizeTooBigError creates a new JSSourceMaxMessageSizeTooBigErr error: "stream source must have max message size >= target"
func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
16 changes: 8 additions & 8 deletions server/jetstream_test.go
Expand Up @@ -11415,12 +11415,12 @@ func TestJetStreamSourceBasics(t *testing.T) {
return nil
})

m, err := js.GetMsg("MS", 1)
ss, err := js.SubscribeSync("foo2.foo", nats.BindStream("MS"))
require_NoError(t, err)

if m.Subject != "foo2.foo" {
t.Fatalf("Expected message subject foo2.foo, got %s", m.Subject)
}
// we must have at least one message on the transformed subject name (ie no timeout)
_, err = ss.NextMsg(time.Millisecond)
require_NoError(t, err)
ss.Drain()

// Test Source Updates
ncfg := &nats.StreamConfig{
Expand Down Expand Up @@ -11461,11 +11461,11 @@ func TestJetStreamSourceBasics(t *testing.T) {
return nil
})
// Double check first starting.
m, err = js.GetMsg("FMS", 1)
m, err := js.GetMsg("FMS", 1)
require_NoError(t, err)
if shdr := m.Header.Get(JSStreamSource); shdr == _EMPTY_ {
t.Fatalf("Expected a header, got none")
} else if _, sseq := streamAndSeq(shdr); sseq != 26 {
} else if _, _, sseq := streamAndSeq(shdr); sseq != 26 {
t.Fatalf("Expected header sequence of 26, got %d", sseq)
}

Expand Down Expand Up @@ -11493,7 +11493,7 @@ func TestJetStreamSourceBasics(t *testing.T) {
}
if shdr := m.Header.Get(JSStreamSource); shdr == _EMPTY_ {
t.Fatalf("Expected a header, got none")
} else if _, sseq := streamAndSeq(shdr); sseq != 11 {
} else if _, _, sseq := streamAndSeq(shdr); sseq != 11 {
t.Fatalf("Expected header sequence of 11, got %d", sseq)
}
}
Expand Down