Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] When R1 consumers were recreated with the same name when they became inactive. #4216

Merged
merged 1 commit into from Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.16.5
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.4.1
github.com/nats-io/nats.go v1.24.0
github.com/nats-io/nats.go v1.26.0
github.com/nats-io/nkeys v0.4.4
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ=
github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA=
github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE=
github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
5 changes: 3 additions & 2 deletions server/consumer.go
Expand Up @@ -1417,9 +1417,10 @@ func (o *consumer) deleteNotActive() {
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
ca := js.consumerAssignment(acc, stream, name)
nca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
if ca != nil {
// Make sure this is not a new consumer with the same name.
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
} else {
Expand Down
8 changes: 6 additions & 2 deletions server/jetstream_cluster.go
Expand Up @@ -3902,8 +3902,12 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
var needDelete bool
if accStreams := cc.streams[ca.Client.serviceAccount()]; accStreams != nil {
if sa := accStreams[ca.Stream]; sa != nil && sa.consumers != nil && sa.consumers[ca.Name] != nil {
needDelete = true
delete(sa.consumers, ca.Name)
oca := sa.consumers[ca.Name]
// Make sure this removal is for what we have, otherwise ignore.
if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name {
needDelete = true
delete(sa.consumers, ca.Name)
}
}
}
js.mu.Unlock()
Expand Down
97 changes: 97 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -4398,3 +4399,99 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) {
si.State.FirstSeq, si.State.LastSeq)
}
}

func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "UPDATES",
Subjects: []string{"DEVICE.*"},
Replicas: 3,
})
require_NoError(t, err)

// Create a consumer that will be an R1 that we will auto-recreate but using the same name.
// We want to make sure that the system does not continually try to cleanup the new one from the old one.

// Track the sequence for restart etc.
var seq atomic.Uint64

msgCB := func(msg *nats.Msg) {
msg.AckSync()
meta, err := msg.Metadata()
require_NoError(t, err)
seq.Store(meta.Sequence.Stream)
}

waitOnSeqDelivered := func(expected uint64) {
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
received := seq.Load()
if received == expected {
return nil
}
return fmt.Errorf("Seq is %d, want %d", received, expected)
})
}

doSub := func() {
_, err = js.Subscribe(
"DEVICE.22",
msgCB,
nats.ConsumerName("dlc"),
nats.SkipConsumerLookup(),
nats.StartSequence(seq.Load()+1),
nats.MaxAckPending(1), // One at a time.
nats.ManualAck(),
nats.ConsumerReplicas(1),
nats.ConsumerMemoryStorage(),
nats.MaxDeliver(1),
nats.InactiveThreshold(time.Second),
nats.IdleHeartbeat(250*time.Millisecond),
)
require_NoError(t, err)
}

// Track any errors for consumer not active so we can recreate the consumer.
errCh := make(chan error, 10)
nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
if errors.Is(err, nats.ErrConsumerNotActive) {
s.Unsubscribe()
errCh <- err
doSub()
}
})

doSub()

sendStreamMsg(t, nc, "DEVICE.22", "update-1")
sendStreamMsg(t, nc, "DEVICE.22", "update-2")
sendStreamMsg(t, nc, "DEVICE.22", "update-3")
waitOnSeqDelivered(3)

// Shutdown the consumer's leader.
s := c.consumerLeader(globalAccountName, "UPDATES", "dlc")
s.Shutdown()
c.waitOnStreamLeader(globalAccountName, "UPDATES")

// In case our client connection was to the same server.
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()

sendStreamMsg(t, nc, "DEVICE.22", "update-4")
sendStreamMsg(t, nc, "DEVICE.22", "update-5")
sendStreamMsg(t, nc, "DEVICE.22", "update-6")

// Wait for the consumer not active error.
<-errCh
// Now restart server with the old consumer.
c.restartServer(s)
// Wait on all messages delivered.
waitOnSeqDelivered(6)
// Make sure no other errors showed up
require_True(t, len(errCh) == 0)
}