Skip to content

Commit

Permalink
Fix for deleting consumers on restarts and non-fatal update errors.
Browse files Browse the repository at this point in the history
If there was a spurious error on restart, or possibly on an update, we could delete a consumer which was the incorrect behavior.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 18, 2023
1 parent 689d521 commit 01fa89a
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 33 deletions.
80 changes: 47 additions & 33 deletions server/jetstream_cluster.go
Expand Up @@ -3466,15 +3466,17 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
if err != nil {
ll := fmt.Sprintf("Account [%s] lookup for consumer create failed: %v", accName, err)
if isMember {
// If we can not lookup the account and we are a member, send this result back to the metacontroller leader.
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumerName,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
if !js.isMetaRecovering() {
// If we can not lookup the account and we are a member, send this result back to the metacontroller leader.
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumerName,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSNoAccountError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
}
result.Response.Error = NewJSNoAccountError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
s.Warnf(ll)
} else {
s.Debugf(ll)
Expand Down Expand Up @@ -3598,18 +3600,20 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Go ahead and create or update the consumer.
mset, err := acc.lookupStream(ca.Stream)
if err != nil {
js.mu.Lock()
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.serviceAccount(), ca.Stream)
ca.err = NewJSStreamNotFoundError()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSStreamNotFoundError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
if !js.isMetaRecovering() {
js.mu.Lock()
s.Warnf("Consumer create failed, could not locate stream '%s > %s > %s'", ca.Client.serviceAccount(), ca.Stream, ca.Name)
ca.err = NewJSStreamNotFoundError()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSStreamNotFoundError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
}
return
}

Expand Down Expand Up @@ -3639,20 +3643,26 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false)
didCreate = true
} else {
if err := o.updateConfig(ca.Config); err != nil {
// This is essentially an update that has failed.
js.mu.Lock()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
// Call into update, ignore consumer exists error here since this means an old deliver subject is bound
// which can happen on restart etc.
if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() {
// This is essentially an update that has failed. Respond back to metaleader if we are not recovering.
js.mu.RLock()
if !js.metaRecovering {
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
s.Warnf("Consumer create failed during update for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err)
js.mu.RUnlock()
return
}

// Check if we already had a consumer assignment and its still pending.
cca, oca := ca, o.consumerAssignment()
o.mu.RLock()
Expand Down Expand Up @@ -3715,7 +3725,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}

var result *consumerAssignmentResult
if !hasResponded {
if !hasResponded && !js.metaRecovering {
result = &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Expand Down Expand Up @@ -4606,12 +4616,16 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie
if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded {
js.srv.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(result.Response))
ca.responded = true

// Check if this failed.
// TODO(dlc) - Could have mixed results, should track per peer.
if result.Response.Error != nil {
// Make sure this is recent response, do not delete existing consumers.
if result.Response.Error != nil && result.Response.Error != NewJSConsumerNameExistError() && time.Since(ca.Created) < 2*time.Second {
// So while we are deleting we will not respond to list/names requests.
ca.err = NewJSClusterNotAssignedError()
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
s.Warnf("Proposing to delete consumer `%s > %s > %s' due to assignment response error: %v",
result.Account, result.Stream, result.Consumer, result.Response.Error)
}
}
}
Expand Down
54 changes: 54 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -2621,3 +2621,57 @@ func TestJetStreamClusterActiveActiveSourcedStreams(t *testing.T) {
})
require_NoError(t, err)
}

func TestJetStreamClusterUpdateConsumerShouldNotForceDeleteOnRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R7S", 7)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)

ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "D",
DeliverSubject: "_no_bind_",
})
require_NoError(t, err)

// Shutdown a consumer follower.
nc.Close()
s := c.serverByName(ci.Cluster.Replicas[0].Name)
s.Shutdown()

c.waitOnLeader()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Change delivery subject.
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Durable: "D",
DeliverSubject: "_d_",
})
require_NoError(t, err)

// Create interest in new and old deliver subject.
_, err = nc.SubscribeSync("_d_")
require_NoError(t, err)
_, err = nc.SubscribeSync("_no_bind_")
require_NoError(t, err)
nc.Flush()

c.restartServer(s)
c.waitOnAllCurrent()

// Wait on bad error that would cleanup consumer.
time.Sleep(time.Second)

_, err = js.ConsumerInfo("TEST", "D")
require_NoError(t, err)
}

0 comments on commit 01fa89a

Please sign in to comment.