Skip to content

Commit

Permalink
Send shutdown event on LDM so that R1 assets do not get assigned to t…
Browse files Browse the repository at this point in the history
…he LDM node

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Aug 18, 2023
1 parent ff688ab commit 95da6e6
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
13 changes: 13 additions & 0 deletions server/events.go
Expand Up @@ -533,6 +533,19 @@ RESET:
}
}

// Will send a shutdown message for lame-duck. Unlike sendShutdownEvent, this will
// not close off the send queue or reply handler, as we may still have a workload
// that needs migrating off.
// Lock should be held.
func (s *Server) sendLDMShutdownEventLocked() {
if s.sys == nil || s.sys.sendq == nil {
return
}
subj := fmt.Sprintf(shutdownEventSubj, s.info.ID)
si := &ServerInfo{}
s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, si, nil, si, noCompression, false, true))
}

// Will send a shutdown message.
func (s *Server) sendShutdownEvent() {
s.mu.Lock()
Expand Down
84 changes: 84 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3379,6 +3379,90 @@ func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) {
}
}

func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Grab the first server and set lameduck option directly.
s := c.servers[0]
s.optsMu.Lock()
s.opts.LameDuckDuration = 5 * time.Second
s.opts.LameDuckGracePeriod = -5 * time.Second
s.optsMu.Unlock()

// Connect to the server to keep it alive when we go into LDM.
dummy, _ := jsClientConnect(t, s)
defer dummy.Close()

// Connect to the third server.
nc, js := jsClientConnect(t, c.servers[2])
defer nc.Close()

// Now put the first server into lame duck mode.
go s.lameDuckMode()

// Wait for news to arrive that the first server has gone into
// lame duck mode and been marked offline.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
id := s.info.ID
s := c.servers[2]
s.mu.RLock()
defer s.mu.RUnlock()

var isOffline bool
s.nodeToInfo.Range(func(_, v any) bool {
ni := v.(nodeInfo)
if ni.id == id {
isOffline = ni.offline
return false
}
return true
})

if !isOffline {
return fmt.Errorf("first node is still online unexpectedly")
}
return nil
})

// Create a go routine that will create streams constantly.
qch := make(chan bool)
go func() {
var index int
for {
select {
case <-time.After(time.Millisecond * 25):
index++
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("NEW_TEST_%d", index),
Subjects: []string{fmt.Sprintf("bar.%d", index)},
Replicas: 1,
})
if err != nil {
return
}
case <-qch:
return
}
}
}()
defer close(qch)

// Make sure we do not have any R1 assets placed on the lameduck server.
for s.isRunning() {
s.rnMu.RLock()
if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil {
s.rnMu.RUnlock()
break
}
hasAsset := len(s.js.srv.gacc.streams()) > 0
s.rnMu.RUnlock()
if hasAsset {
t.Fatalf("Server had an R1 asset when it should not due to lameduck mode")
}
}
}

// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Expand Up @@ -3557,6 +3557,7 @@ func (s *Server) lameDuckMode() {
}
s.Noticef("Entering lame duck mode, stop accepting new clients")
s.ldm = true
s.sendLDMShutdownEventLocked()
expected := 1
s.listener.Close()
s.listener = nil
Expand Down

0 comments on commit 95da6e6

Please sign in to comment.