Skip to content

Commit

Permalink
[FIXED] Do not allow JetStream leaders to be placed on a lameduck ser…
Browse files Browse the repository at this point in the history
…ver. (#4002)

Set existing and any new raft assets to observer mode while in a
lameduck state.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 30, 2023
2 parents 873ab0f + 4646f4a commit 5359d23
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
103 changes: 103 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3403,3 +3403,106 @@ func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) {
t.Fatalf("unexpected dangling interests for JetStream assets after shutdown (%d $JSC, %d $NRG)", danglingJSC, danglingRaft)
}
}

func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Grab the first server and set lameduck option directly.
s := c.servers[0]
s.optsMu.Lock()
s.opts.LameDuckDuration = 5 * time.Second
s.opts.LameDuckGracePeriod = -5 * time.Second
s.optsMu.Unlock()

// Connect to the third server.
nc, js := jsClientConnect(t, c.servers[2])
defer nc.Close()

allServersHaveLeaders := func() bool {
haveLeader := make(map[*Server]bool)
for _, s := range c.servers {
s.rnMu.RLock()
for _, n := range s.raftNodes {
if n.Leader() {
haveLeader[s] = true
break
}
}
s.rnMu.RUnlock()
}
return len(haveLeader) == len(c.servers)
}

// Create streams until we have a leader on all the servers.
var index int
checkFor(t, 10*time.Second, time.Millisecond, func() error {
if allServersHaveLeaders() {
return nil
}
index++
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST_%d", index),
Subjects: []string{fmt.Sprintf("foo.%d", index)},
Replicas: 3,
})
require_NoError(t, err)
return fmt.Errorf("All servers do not have at least one leader")
})

// Put our server into lameduck mode.
// Need a client.
dummy, _ := jsClientConnect(t, s)
defer dummy.Close()
go s.lameDuckMode()

// Wait for all leaders to move off.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
s.rnMu.RLock()
defer s.rnMu.RUnlock()
for _, n := range s.raftNodes {
if n.Leader() {
return fmt.Errorf("Server still has a leader")
}
}
return nil
})

// All leader evacuated.

// Create a go routine that will create streams constantly.
qch := make(chan bool)
go func() {
var index int
for {
select {
case <-time.After(time.Millisecond):
index++
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("NEW_TEST_%d", index),
Subjects: []string{fmt.Sprintf("bar.%d", index)},
Replicas: 3,
})
if err != nil {
return
}
case <-qch:
return
}
}
}()
defer close(qch)

// Make sure we do not have any leaders placed on the lameduck server.
for s.isRunning() {
var hasLeader bool
s.rnMu.RLock()
for _, n := range s.raftNodes {
hasLeader = hasLeader || n.Leader()
}
s.rnMu.RUnlock()
if hasLeader {
t.Fatalf("Server had a leader when it should not due to lameduck mode")
}
}
}
10 changes: 10 additions & 0 deletions server/raft.go
Expand Up @@ -485,6 +485,12 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error

n.debug("Started")

// Check if we need to start in observer mode due to lame duck status.
if s.isLameDuckMode() {
n.debug("Will start in observer mode due to lame duck status")
n.SetObserver(true)
}

n.Lock()
n.resetElectionTimeout()
n.llqrt = time.Now()
Expand Down Expand Up @@ -611,6 +617,9 @@ func (s *Server) shutdownRaftNodes() {
}
}

// Used in lameduck mode to move off the leaders.
// We also put all nodes in observer mode so new leaders
// can not be placed on this server.
func (s *Server) transferRaftLeaders() bool {
if s == nil {
return false
Expand All @@ -631,6 +640,7 @@ func (s *Server) transferRaftLeaders() bool {
node.StepDown()
didTransfer = true
}
node.SetObserver(true)
}
return didTransfer
}
Expand Down

0 comments on commit 5359d23

Please sign in to comment.