Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to stop forwarding proposals in consumers after scaling down a stream #4556

Merged
merged 4 commits into from Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions server/consumer.go
Expand Up @@ -1122,6 +1122,16 @@ func (o *consumer) isLeader() bool {
return true
}

func (o *consumer) clearLoopAndForward() {
o.mu.Lock()
defer o.mu.Unlock()
if o.qch != nil {
close(o.qch)
// Note can not close pch here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because of a panic when writing to the channel elsewhere?

o.qch, o.pch = nil, nil
}
}

func (o *consumer) setLeader(isLeader bool) {
o.mu.RLock()
mset := o.mset
Expand Down Expand Up @@ -2003,9 +2013,13 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {

forwardProposals := func() {
o.mu.Lock()
node, pch = o.node, o.pch
proposal := o.phead
o.phead, o.ptail = nil, nil
o.mu.Unlock()
if node == nil || pch == nil || node.State() != Leader {
return
}
// 256k max for now per batch.
const maxBatch = 256 * 1024
var entries []*Entry
Expand Down
2 changes: 2 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -4264,6 +4264,8 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
} else {
// Check for scale down to 1..
if rg.node != nil && len(rg.Peers) == 1 {
// Need to pop loopAndForward by closing qch and nil out both qch and pch.
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
o.clearLoopAndForward()
o.clearNode()
o.setLeader(true)
// Need to clear from rg too.
Expand Down
125 changes: 125 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -18,6 +18,7 @@ package server

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -5682,3 +5683,127 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}

func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

nc2, producer := jsClientConnect(t, s)
defer nc2.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

end := time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i))
require_NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}

msgs, err := sub.Fetch(1)
if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) {
t.Logf("Pull Error: %v", err)
}
for _, msg := range msgs {
msg.Ack()
}
}
}()
}
c.lameDuckRestartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")

// Swap the logger to try to detect the condition after the restart.
loggers := make([]*captureDebugLogger, 3)
for i, srv := range c.servers {
l := &captureDebugLogger{dbgCh: make(chan string, 10)}
loggers[i] = l
srv.SetLogger(l, true, false)
}
condition := `Direct proposal ignored, not leader (state: CLOSED)`
errCh := make(chan error, 10)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case dl := <-loggers[0].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[1].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[2].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case <-ctx.Done():
return
}
}
}()

// Start publishing again for a while.
end = time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

// Try to do a stream edit back to R=1 after doing all the upgrade.
info, _ := js.StreamInfo("TEST")
sconfig := info.Config
sconfig.Replicas = 1
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

// Leave running for some time after the update.
time.Sleep(2 * time.Second)

info, _ = js.StreamInfo("TEST")
sconfig = info.Config
sconfig.Replicas = 3
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

select {
case e := <-errCh:
t.Fatalf("Bad condition on raft node: %v", e)
case <-time.After(2 * time.Second):
// Done
}

// Stop goroutines and wait for them to exit.
cancel()
wg.Wait()
}
15 changes: 15 additions & 0 deletions server/jetstream_helpers_test.go
Expand Up @@ -1541,6 +1541,21 @@ func (c *cluster) restartAll() {
c.waitOnClusterReady()
}

func (c *cluster) lameDuckRestartAll() {
c.t.Helper()
for i, s := range c.servers {
s.lameDuckMode()
s.WaitForShutdown()
if !s.Running() {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
}
c.waitOnClusterReady()
}

func (c *cluster) restartAllSamePorts() {
c.t.Helper()
for i, s := range c.servers {
Expand Down