Skip to content

Commit

Permalink
Fix to stop forwarding proposals in consumers after scaling down a st…
Browse files Browse the repository at this point in the history
…ream (#4556)

Sometimes when scaling down a stream, a raft node could continue
forwarding proposals after already being closed, in the debug logs this
can be confirmed by many entries logging 'Direct proposal ignored, not
leader (state: CLOSED)'.
  • Loading branch information
wallyqs committed Sep 18, 2023
2 parents de76275 + 71b8a33 commit 2d23e9b
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 2 deletions.
11 changes: 9 additions & 2 deletions server/consumer.go
Expand Up @@ -2001,8 +2001,12 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
return
}

forwardProposals := func() {
forwardProposals := func() error {
o.mu.Lock()
if o.node != node || node.State() != Leader {
o.mu.Unlock()
return errors.New("no longer leader")
}
proposal := o.phead
o.phead, o.ptail = nil, nil
o.mu.Unlock()
Expand All @@ -2024,6 +2028,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
if len(entries) > 0 {
node.ProposeDirect(entries)
}
return nil
}

// In case we have anything pending on entry.
Expand All @@ -2035,7 +2040,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
forwardProposals()
return
case <-pch:
forwardProposals()
if err := forwardProposals(); err != nil {
return
}
}
}
}
Expand Down
127 changes: 127 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -18,6 +18,7 @@ package server

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -5682,3 +5683,129 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}

func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
t.Skip("This test takes too long, need to make shorter")

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

nc2, producer := jsClientConnect(t, s)
defer nc2.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

end := time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i))
require_NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}

msgs, err := sub.Fetch(1)
if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) {
t.Logf("Pull Error: %v", err)
}
for _, msg := range msgs {
msg.Ack()
}
}
}()
}
c.lameDuckRestartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")

// Swap the logger to try to detect the condition after the restart.
loggers := make([]*captureDebugLogger, 3)
for i, srv := range c.servers {
l := &captureDebugLogger{dbgCh: make(chan string, 10)}
loggers[i] = l
srv.SetLogger(l, true, false)
}
condition := `Direct proposal ignored, not leader (state: CLOSED)`
errCh := make(chan error, 10)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case dl := <-loggers[0].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[1].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[2].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case <-ctx.Done():
return
}
}
}()

// Start publishing again for a while.
end = time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

// Try to do a stream edit back to R=1 after doing all the upgrade.
info, _ := js.StreamInfo("TEST")
sconfig := info.Config
sconfig.Replicas = 1
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

// Leave running for some time after the update.
time.Sleep(2 * time.Second)

info, _ = js.StreamInfo("TEST")
sconfig = info.Config
sconfig.Replicas = 3
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

select {
case e := <-errCh:
t.Fatalf("Bad condition on raft node: %v", e)
case <-time.After(2 * time.Second):
// Done
}

// Stop goroutines and wait for them to exit.
cancel()
wg.Wait()
}
15 changes: 15 additions & 0 deletions server/jetstream_helpers_test.go
Expand Up @@ -1541,6 +1541,21 @@ func (c *cluster) restartAll() {
c.waitOnClusterReady()
}

func (c *cluster) lameDuckRestartAll() {
c.t.Helper()
for i, s := range c.servers {
s.lameDuckMode()
s.WaitForShutdown()
if !s.Running() {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
}
c.waitOnClusterReady()
}

func (c *cluster) restartAllSamePorts() {
c.t.Helper()
for i, s := range c.servers {
Expand Down

0 comments on commit 2d23e9b

Please sign in to comment.