Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Do not delete consumers on restarts with non-fatal update errors. #3881

Merged
merged 3 commits into from Feb 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 47 additions & 33 deletions server/jetstream_cluster.go
Expand Up @@ -3466,15 +3466,17 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
if err != nil {
ll := fmt.Sprintf("Account [%s] lookup for consumer create failed: %v", accName, err)
if isMember {
// If we can not lookup the account and we are a member, send this result back to the metacontroller leader.
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumerName,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
if !js.isMetaRecovering() {
// If we can not lookup the account and we are a member, send this result back to the metacontroller leader.
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumerName,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSNoAccountError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
}
result.Response.Error = NewJSNoAccountError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
s.Warnf(ll)
} else {
s.Debugf(ll)
Expand Down Expand Up @@ -3598,18 +3600,20 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Go ahead and create or update the consumer.
mset, err := acc.lookupStream(ca.Stream)
if err != nil {
js.mu.Lock()
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.serviceAccount(), ca.Stream)
ca.err = NewJSStreamNotFoundError()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSStreamNotFoundError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
if !js.isMetaRecovering() {
js.mu.Lock()
s.Warnf("Consumer create failed, could not locate stream '%s > %s > %s'", ca.Client.serviceAccount(), ca.Stream, ca.Name)
ca.err = NewJSStreamNotFoundError()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSStreamNotFoundError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
}
return
}

Expand Down Expand Up @@ -3639,20 +3643,26 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false)
didCreate = true
} else {
if err := o.updateConfig(ca.Config); err != nil {
// This is essentially an update that has failed.
js.mu.Lock()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
// Call into update, ignore consumer exists error here since this means an old deliver subject is bound
// which can happen on restart etc.
if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() {
// This is essentially an update that has failed. Respond back to metaleader if we are not recovering.
js.mu.RLock()
if !js.metaRecovering {
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
s.Warnf("Consumer create failed during update for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err)
js.mu.RUnlock()
return
}

// Check if we already had a consumer assignment and its still pending.
cca, oca := ca, o.consumerAssignment()
o.mu.RLock()
Expand Down Expand Up @@ -3715,7 +3725,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}

var result *consumerAssignmentResult
if !hasResponded {
if !hasResponded && !js.metaRecovering {
result = &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Expand Down Expand Up @@ -4606,12 +4616,16 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie
if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded {
js.srv.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(result.Response))
ca.responded = true

// Check if this failed.
// TODO(dlc) - Could have mixed results, should track per peer.
if result.Response.Error != nil {
// Make sure this is recent response, do not delete existing consumers.
if result.Response.Error != nil && result.Response.Error != NewJSConsumerNameExistError() && time.Since(ca.Created) < 2*time.Second {
// So while we are deleting we will not respond to list/names requests.
ca.err = NewJSClusterNotAssignedError()
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
s.Warnf("Proposing to delete consumer `%s > %s > %s' due to assignment response error: %v",
result.Account, result.Stream, result.Consumer, result.Response.Error)
}
}
}
Expand Down
54 changes: 54 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -2621,3 +2621,57 @@ func TestJetStreamClusterActiveActiveSourcedStreams(t *testing.T) {
})
require_NoError(t, err)
}

func TestJetStreamClusterUpdateConsumerShouldNotForceDeleteOnRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R7S", 7)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)

ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "D",
DeliverSubject: "_no_bind_",
})
require_NoError(t, err)

// Shutdown a consumer follower.
nc.Close()
s := c.serverByName(ci.Cluster.Replicas[0].Name)
s.Shutdown()

c.waitOnLeader()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Change delivery subject.
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Durable: "D",
DeliverSubject: "_d_",
})
require_NoError(t, err)

// Create interest in new and old deliver subject.
_, err = nc.SubscribeSync("_d_")
require_NoError(t, err)
_, err = nc.SubscribeSync("_no_bind_")
require_NoError(t, err)
nc.Flush()

c.restartServer(s)
c.waitOnAllCurrent()

// Wait on bad error that would cleanup consumer.
time.Sleep(time.Second)

_, err = js.ConsumerInfo("TEST", "D")
require_NoError(t, err)
}