Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Fixed an issue with consumer states growing and causing instability. #3980

Merged
merged 2 commits into from Mar 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 4 additions & 8 deletions server/filestore.go
Expand Up @@ -6685,21 +6685,15 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
delete(o.state.Pending, sseq)
dseq = p.Sequence // Use the original.
}
// Now remove from redelivered.
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, sseq)
}

if len(o.state.Pending) == 0 {
o.state.AckFloor.Consumer = o.state.Delivered.Consumer
o.state.AckFloor.Stream = o.state.Delivered.Stream
} else if dseq == o.state.AckFloor.Consumer+1 {
first := o.state.AckFloor.Consumer == 0
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq

if !first && o.state.Delivered.Consumer > dseq {
for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ {
if o.state.Delivered.Consumer > dseq {
for ss := sseq + 1; ss <= o.state.Delivered.Stream; ss++ {
if p, ok := o.state.Pending[ss]; ok {
if p.Sequence > 0 {
o.state.AckFloor.Consumer = p.Sequence - 1
Expand All @@ -6710,6 +6704,8 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
}
}
}
// We do these regardless.
delete(o.state.Redelivered, sseq)

o.kickFlusher()
return nil
Expand Down
4 changes: 2 additions & 2 deletions server/filestore_test.go
Expand Up @@ -2776,8 +2776,8 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) {
}
}

testAck(1, 100, 1, 100)
testAck(3, 130, 1, 100)
testAck(1, 100, 1, 109)
testAck(3, 130, 1, 109)
testAck(2, 110, 3, 149) // We do not track explicit state on previous stream floors, so we take last known -1
testAck(5, 165, 3, 149)
testAck(4, 150, 5, 165)
Expand Down
31 changes: 31 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -281,6 +281,37 @@ func (s *Server) JetStreamStepdownStream(account, stream string) error {
return nil
}

func (s *Server) JetStreamStepdownConsumer(account, stream, consumer string) error {
js, cc := s.getJetStreamCluster()
if js == nil {
return NewJSNotEnabledError()
}
if cc == nil {
return NewJSClusterNotActiveError()
}
// Grab account
acc, err := s.LookupAccount(account)
if err != nil {
return err
}
// Grab stream
mset, err := acc.lookupStream(stream)
if err != nil {
return err
}

o := mset.lookupConsumer(consumer)
if o == nil {
return NewJSConsumerNotFoundError()
}

if node := o.raftNode(); node != nil && node.Leader() {
node.StepDown()
}

return nil
}

func (s *Server) JetStreamSnapshotStream(account, stream string) error {
js, cc := s.getJetStreamCluster()
if js == nil {
Expand Down
119 changes: 118 additions & 1 deletion server/jetstream_cluster_3_test.go
@@ -1,4 +1,4 @@
// Copyright 2022 The NATS Authors
// Copyright 2022-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -3112,3 +3112,120 @@ func TestJetStreamClusterInterestBasedStreamAndConsumerSnapshots(t *testing.T) {
require_NoError(t, err)
require_True(t, si.State.Msgs == 0)
}

func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 3,
Subjects: []string{"foo"},
})
require_NoError(t, err)

sub, err := js.PullSubscribe(_EMPTY_, "C", nats.BindStream("TEST"), nats.ManualAck())
require_NoError(t, err)

num := 100
for i := 0; i < num; i++ {
sendStreamMsg(t, nc, "foo", "data")
}

// This one prevents the state for pending from reaching 0 and resetting, which would not show the bug.
sendStreamMsg(t, nc, "foo", "data")

// Ack all but one and out of order and make sure all consumers have the same stored state.
msgs, err := sub.Fetch(num, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == num)

_, err = sub.Fetch(1, nats.MaxWait(time.Second))
require_NoError(t, err)

rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
for _, m := range msgs {
m.AckSync()
}

checkConsumerState := func(delivered, ackFloor nats.SequenceInfo, numAckPending int) {
expectedDelivered := uint64(num) + 1
if delivered.Stream != expectedDelivered || delivered.Consumer != expectedDelivered {
t.Fatalf("Wrong delivered, expected %d got %+v", expectedDelivered, delivered)
}
expectedAck := uint64(num)
if ackFloor.Stream != expectedAck || ackFloor.Consumer != expectedAck {
t.Fatalf("Wrong ackFloor, expected %d got %+v", expectedAck, ackFloor)
}
require_True(t, numAckPending == 1)
}

ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending)

// Check each consumer on each server for it's store state and make sure it matches as well.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
require_NotNil(t, mset)
o := mset.lookupConsumer("C")
require_NotNil(t, o)

state, err := o.store.State()
require_NoError(t, err)

delivered := nats.SequenceInfo{Stream: state.Delivered.Stream, Consumer: state.Delivered.Consumer}
ackFloor := nats.SequenceInfo{Stream: state.AckFloor.Stream, Consumer: state.AckFloor.Consumer}
checkConsumerState(delivered, ackFloor, len(state.Pending))
}

// Now stepdown the consumer and move its leader and check the state after transition.
// Make the restarted server the eventual leader.
seen := make(map[*Server]bool)
cl := c.consumerLeader(globalAccountName, "TEST", "C")
require_NotNil(t, cl)
seen[cl] = true

allSeen := func() bool {
for _, s := range c.servers {
if !seen[s] {
return false
}
}
return true
}

checkAllLeaders := func() {
t.Helper()
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "TEST", "C")
if allSeen() {
return nil
}
cl := c.consumerLeader(globalAccountName, "TEST", "C")
seen[cl] = true
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending)
cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C")
return fmt.Errorf("Not all servers have been consumer leader yet")
})
}

checkAllLeaders()

// No restart all servers and check again.
c.stopAll()
c.restartAll()
c.waitOnLeader()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

seen = make(map[*Server]bool)
checkAllLeaders()
}
60 changes: 45 additions & 15 deletions server/jetstream_helpers_test.go
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -43,6 +43,15 @@ func init() {
lostQuorumCheck = 4 * hbInterval
}

// Used to setup clusters of clusters for tests.
type cluster struct {
servers []*Server
opts []*Options
name string
t testing.TB
nproxies []*netProxy
}

// Used to setup superclusters for tests.
type supercluster struct {
t *testing.T
Expand Down Expand Up @@ -351,6 +360,9 @@ type gwProxy struct {
down int
}

// For use in normal clusters.
type clusterProxy = gwProxy

// Maps cluster names to proxy settings.
type gwProxyMap map[string]*gwProxy

Expand Down Expand Up @@ -692,9 +704,19 @@ func createJetStreamCluster(t testing.TB, tmpl string, clusterName, snPre string

type modifyCb func(serverName, clusterName, storeDir, conf string) string

func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, snPre string, numServers int, portStart int, waitOnReady bool, modify modifyCb) *cluster {
func createJetStreamClusterAndModHook(t testing.TB, tmpl, cName, snPre string, numServers int, portStart int, waitOnReady bool, modify modifyCb) *cluster {
return createJetStreamClusterEx(t, tmpl, cName, snPre, numServers, portStart, waitOnReady, modify, nil)
}

func createJetStreamClusterWithNetProxy(t testing.TB, cName string, numServers int, cnp *clusterProxy) *cluster {
startPorts := []int{7_122, 9_122, 11_122, 15_122}
port := startPorts[rand.Intn(len(startPorts))]
return createJetStreamClusterEx(t, jsClusterTempl, cName, _EMPTY_, numServers, port, true, nil, cnp)
}

func createJetStreamClusterEx(t testing.TB, tmpl, cName, snPre string, numServers int, portStart int, wait bool, modify modifyCb, cnp *clusterProxy) *cluster {
t.Helper()
if clusterName == _EMPTY_ || numServers < 1 {
if cName == _EMPTY_ || numServers < 1 {
t.Fatalf("Bad params")
}

Expand All @@ -715,20 +737,32 @@ func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, sn

// Build out the routes that will be shared with all configs.
var routes []string
var nproxies []*netProxy
for cp := portStart; cp < portStart+numServers; cp++ {
routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp))
routeURL := fmt.Sprintf("nats-route://127.0.0.1:%d", cp)
if cnp != nil {
np := createNetProxy(cnp.rtt, cnp.up, cnp.down, routeURL, false)
nproxies = append(nproxies, np)
routeURL = np.routeURL()
}
routes = append(routes, routeURL)
}
routeConfig := strings.Join(routes, ",")

// Go ahead and build configurations and start servers.
c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName}
c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: cName, nproxies: nproxies}

// Start any proxies.
for _, np := range nproxies {
np.start()
}

for cp := portStart; cp < portStart+numServers; cp++ {
storeDir := t.TempDir()
sn := fmt.Sprintf("%sS-%d", snPre, cp-portStart+1)
conf := fmt.Sprintf(tmpl, sn, storeDir, clusterName, cp, routeConfig)
conf := fmt.Sprintf(tmpl, sn, storeDir, cName, cp, routeConfig)
if modify != nil {
conf = modify(sn, clusterName, storeDir, conf)
conf = modify(sn, cName, storeDir, conf)
}
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
c.servers = append(c.servers, s)
Expand All @@ -738,7 +772,7 @@ func createJetStreamClusterAndModHook(t testing.TB, tmpl string, clusterName, sn

// Wait til we are formed and have a leader.
c.checkClusterFormed()
if waitOnReady {
if wait {
c.waitOnClusterReady()
}

Expand Down Expand Up @@ -1621,18 +1655,14 @@ func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) {

rl := rate.NewLimiter(rate.Limit(tbw), rbl)

for fr := true; ; {
sr := time.Now()
for {
n, err := r.Read(buf[:])
if err != nil {
return
}
// RTT delays
if fr || time.Since(sr) > 250*time.Millisecond {
fr = false
if delay > 0 {
time.Sleep(delay)
}
if delay > 0 {
time.Sleep(delay)
}
if err := rl.WaitN(ctx, n); err != nil {
return
Expand Down