Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to stop forwarding proposals in consumers after scaling down a stream #4556

Merged
merged 4 commits into from Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions server/consumer.go
Expand Up @@ -2001,8 +2001,12 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
return
}

forwardProposals := func() {
forwardProposals := func() error {
o.mu.Lock()
if o.node != node || node.State() != Leader {
o.mu.Unlock()
return errors.New("no longer leader")
}
proposal := o.phead
o.phead, o.ptail = nil, nil
o.mu.Unlock()
Expand All @@ -2024,6 +2028,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
if len(entries) > 0 {
node.ProposeDirect(entries)
}
return nil
}

// In case we have anything pending on entry.
Expand All @@ -2035,7 +2040,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
forwardProposals()
return
case <-pch:
forwardProposals()
if err := forwardProposals(); err != nil {
return
}
}
}
}
Expand Down
127 changes: 127 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -18,6 +18,7 @@ package server

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -5682,3 +5683,129 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}

func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
t.Skip("This test takes too long, need to make shorter")

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

nc2, producer := jsClientConnect(t, s)
defer nc2.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

end := time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i))
require_NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}

msgs, err := sub.Fetch(1)
if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) {
t.Logf("Pull Error: %v", err)
}
for _, msg := range msgs {
msg.Ack()
}
}
}()
}
c.lameDuckRestartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")

// Swap the logger to try to detect the condition after the restart.
loggers := make([]*captureDebugLogger, 3)
for i, srv := range c.servers {
l := &captureDebugLogger{dbgCh: make(chan string, 10)}
loggers[i] = l
srv.SetLogger(l, true, false)
}
condition := `Direct proposal ignored, not leader (state: CLOSED)`
errCh := make(chan error, 10)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case dl := <-loggers[0].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[1].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[2].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case <-ctx.Done():
return
}
}
}()

// Start publishing again for a while.
end = time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

// Try to do a stream edit back to R=1 after doing all the upgrade.
info, _ := js.StreamInfo("TEST")
sconfig := info.Config
sconfig.Replicas = 1
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

// Leave running for some time after the update.
time.Sleep(2 * time.Second)

info, _ = js.StreamInfo("TEST")
sconfig = info.Config
sconfig.Replicas = 3
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

select {
case e := <-errCh:
t.Fatalf("Bad condition on raft node: %v", e)
case <-time.After(2 * time.Second):
// Done
}

// Stop goroutines and wait for them to exit.
cancel()
wg.Wait()
}
15 changes: 15 additions & 0 deletions server/jetstream_helpers_test.go
Expand Up @@ -1541,6 +1541,21 @@ func (c *cluster) restartAll() {
c.waitOnClusterReady()
}

func (c *cluster) lameDuckRestartAll() {
c.t.Helper()
for i, s := range c.servers {
s.lameDuckMode()
s.WaitForShutdown()
if !s.Running() {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
}
c.waitOnClusterReady()
}

func (c *cluster) restartAllSamePorts() {
c.t.Helper()
for i, s := range c.servers {
Expand Down