Skip to content

Commit

Permalink
Merge pull request #3881 from nats-io/bad-consumer-delete
Browse files Browse the repository at this point in the history
[FIXED] Do not delete consumers on restarts with non-fatal update errors.
  • Loading branch information
derekcollison committed Feb 18, 2023
2 parents 7f5bef4 + 6a62ac4 commit 35526b0
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
75 changes: 44 additions & 31 deletions server/jetstream_cluster.go
Expand Up @@ -3465,15 +3465,17 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
if err != nil {
ll := fmt.Sprintf("Account [%s] lookup for consumer create failed: %v", accName, err)
if isMember {
// If we can not lookup the account and we are a member, send this result back to the metacontroller leader.
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumerName,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
if !js.isMetaRecovering() {
// If we can not lookup the account and we are a member, send this result back to the metacontroller leader.
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumerName,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSNoAccountError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
}
result.Response.Error = NewJSNoAccountError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
s.Warnf(ll)
} else {
s.Debugf(ll)
Expand Down Expand Up @@ -3598,18 +3600,20 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Go ahead and create or update the consumer.
mset, err := acc.lookupStream(stream)
if err != nil {
js.mu.Lock()
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.serviceAccount(), ca.Stream)
ca.err = NewJSStreamNotFoundError()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSStreamNotFoundError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
if !js.isMetaRecovering() {
js.mu.Lock()
s.Warnf("Consumer create failed, could not locate stream '%s > %s > %s'", ca.Client.serviceAccount(), ca.Stream, ca.Name)
ca.err = NewJSStreamNotFoundError()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSStreamNotFoundError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
}
return
}

Expand Down Expand Up @@ -3644,17 +3648,22 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Only update if config is really different.
cfg := o.config()
if !reflect.DeepEqual(&cfg, ca.Config) {
if err := o.updateConfig(ca.Config); err != nil {
// This is essentially an update that has failed.
// Call into update, ignore consumer exists error here since this means an old deliver subject is bound
// which can happen on restart etc.
if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() {
// This is essentially an update that has failed. Respond back to metaleader if we are not recovering.
js.mu.RLock()
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumer,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
if !js.metaRecovering {
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumer,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
s.Warnf("Consumer create failed during update for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err)
js.mu.RUnlock()
return
}
Expand Down Expand Up @@ -3721,7 +3730,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}

var result *consumerAssignmentResult
if !hasResponded {
if !hasResponded && !js.metaRecovering {
result = &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Expand Down Expand Up @@ -4615,12 +4624,16 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie
if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded {
js.srv.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(result.Response))
ca.responded = true

// Check if this failed.
// TODO(dlc) - Could have mixed results, should track per peer.
if result.Response.Error != nil {
// Make sure this is recent response, do not delete existing consumers.
if result.Response.Error != nil && result.Response.Error != NewJSConsumerNameExistError() && time.Since(ca.Created) < 2*time.Second {
// So while we are deleting we will not respond to list/names requests.
ca.err = NewJSClusterNotAssignedError()
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
s.Warnf("Proposing to delete consumer `%s > %s > %s' due to assignment response error: %v",
result.Account, result.Stream, result.Consumer, result.Response.Error)
}
}
}
Expand Down
54 changes: 54 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -2628,3 +2628,57 @@ func TestJetStreamClusterActiveActiveSourcedStreams(t *testing.T) {
})
require_NoError(t, err)
}

func TestJetStreamClusterUpdateConsumerShouldNotForceDeleteOnRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R7S", 7)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)

ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "D",
DeliverSubject: "_no_bind_",
})
require_NoError(t, err)

// Shutdown a consumer follower.
nc.Close()
s := c.serverByName(ci.Cluster.Replicas[0].Name)
s.Shutdown()

c.waitOnLeader()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Change delivery subject.
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Durable: "D",
DeliverSubject: "_d_",
})
require_NoError(t, err)

// Create interest in new and old deliver subject.
_, err = nc.SubscribeSync("_d_")
require_NoError(t, err)
_, err = nc.SubscribeSync("_no_bind_")
require_NoError(t, err)
nc.Flush()

c.restartServer(s)
c.waitOnAllCurrent()

// Wait on bad error that would cleanup consumer.
time.Sleep(time.Second)

_, err = js.ConsumerInfo("TEST", "D")
require_NoError(t, err)
}

0 comments on commit 35526b0

Please sign in to comment.