Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Do not allow JetStream leaders to be placed on a lameduck server. #4002

Merged
merged 1 commit into from Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 103 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3403,3 +3403,106 @@ func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) {
t.Fatalf("unexpected dangling interests for JetStream assets after shutdown (%d $JSC, %d $NRG)", danglingJSC, danglingRaft)
}
}

func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Grab the first server and set lameduck option directly.
s := c.servers[0]
s.optsMu.Lock()
s.opts.LameDuckDuration = 5 * time.Second
s.opts.LameDuckGracePeriod = -5 * time.Second
s.optsMu.Unlock()

// Connect to the third server.
nc, js := jsClientConnect(t, c.servers[2])
defer nc.Close()

allServersHaveLeaders := func() bool {
haveLeader := make(map[*Server]bool)
for _, s := range c.servers {
s.rnMu.RLock()
for _, n := range s.raftNodes {
if n.Leader() {
haveLeader[s] = true
break
}
}
s.rnMu.RUnlock()
}
return len(haveLeader) == len(c.servers)
}

// Create streams until we have a leader on all the servers.
var index int
checkFor(t, 10*time.Second, time.Millisecond, func() error {
if allServersHaveLeaders() {
return nil
}
index++
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST_%d", index),
Subjects: []string{fmt.Sprintf("foo.%d", index)},
Replicas: 3,
})
require_NoError(t, err)
return fmt.Errorf("All servers do not have at least one leader")
})

// Put our server into lameduck mode.
// Need a client.
dummy, _ := jsClientConnect(t, s)
defer dummy.Close()
go s.lameDuckMode()

// Wait for all leaders to move off.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
s.rnMu.RLock()
defer s.rnMu.RUnlock()
for _, n := range s.raftNodes {
if n.Leader() {
return fmt.Errorf("Server still has a leader")
}
}
return nil
})

// All leader evacuated.

// Create a go routine that will create streams constantly.
qch := make(chan bool)
go func() {
var index int
for {
select {
case <-time.After(time.Millisecond):
index++
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("NEW_TEST_%d", index),
Subjects: []string{fmt.Sprintf("bar.%d", index)},
Replicas: 3,
})
if err != nil {
return
}
case <-qch:
return
}
}
}()
defer close(qch)

// Make sure we do not have any leaders placed on the lameduck server.
for s.isRunning() {
var hasLeader bool
s.rnMu.RLock()
for _, n := range s.raftNodes {
hasLeader = hasLeader || n.Leader()
}
s.rnMu.RUnlock()
if hasLeader {
t.Fatalf("Server had a leader when it should not due to lameduck mode")
}
}
}
10 changes: 10 additions & 0 deletions server/raft.go
Expand Up @@ -485,6 +485,12 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error

n.debug("Started")

// Check if we need to start in observer mode due to lame duck status.
if s.isLameDuckMode() {
n.debug("Will start in observer mode due to lame duck status")
n.SetObserver(true)
}

n.Lock()
n.resetElectionTimeout()
n.llqrt = time.Now()
Expand Down Expand Up @@ -611,6 +617,9 @@ func (s *Server) shutdownRaftNodes() {
}
}

// Used in lameduck mode to move off the leaders.
// We also put all nodes in observer mode so new leaders
// can not be placed on this server.
func (s *Server) transferRaftLeaders() bool {
if s == nil {
return false
Expand All @@ -631,6 +640,7 @@ func (s *Server) transferRaftLeaders() bool {
node.StepDown()
didTransfer = true
}
node.SetObserver(true)
}
return didTransfer
}
Expand Down