Skip to content

Commit

Permalink
Send shutdown event on LDM (#4405)
Browse files Browse the repository at this point in the history
If we send an event when entering lame duck mode, other nodes will mark
the server as offline immediately, therefore R1 assets will not be
placed onto that node. This is not a problem with R3 or higher because
an LDM server operates as a Raft observer only and therefore cannot take
the leadership role from an election, but R1 assets can in theory be
placed onto any node that is not marked as offline.

A final shutdown event will still be sent when the server actually shuts
down so there is no change there.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Aug 21, 2023
2 parents 0712628 + d720a69 commit 3377f04
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 2 deletions.
21 changes: 21 additions & 0 deletions server/events.go
Expand Up @@ -56,6 +56,7 @@ const (
connsRespSubj = "$SYS._INBOX_.%s"
accConnsEventSubjNew = "$SYS.ACCOUNT.%s.SERVER.CONNS"
accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility
lameDuckEventSubj = "$SYS.SERVER.%s.LAMEDUCK"
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
Expand Down Expand Up @@ -533,6 +534,19 @@ RESET:
}
}

// Will send a shutdown message for lame-duck. Unlike sendShutdownEvent, this will
// not close off the send queue or reply handler, as we may still have a workload
// that needs migrating off.
// Lock should be held.
func (s *Server) sendLDMShutdownEventLocked() {
if s.sys == nil || s.sys.sendq == nil {
return
}
subj := fmt.Sprintf(lameDuckEventSubj, s.info.ID)
si := &ServerInfo{}
s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, si, nil, si, noCompression, false, true))
}

// Will send a shutdown message.
func (s *Server) sendShutdownEvent() {
s.mu.Lock()
Expand Down Expand Up @@ -944,6 +958,13 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
// Listen for servers entering lame-duck mode.
// NOTE: This currently is handled in the same way as a server shutdown, but has
// a different subject in case we need to handle differently in future.
subject = fmt.Sprintf(lameDuckEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
// Listen for account claims updates.
subscribeToUpdate := true
if s.accResolver != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/events_test.go
Expand Up @@ -1666,7 +1666,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 45, sa)
checkExpectedSubs(t, 46, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down
84 changes: 84 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3379,6 +3379,90 @@ func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) {
}
}

func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Grab the first server and set lameduck option directly.
s := c.servers[0]
s.optsMu.Lock()
s.opts.LameDuckDuration = 5 * time.Second
s.opts.LameDuckGracePeriod = -5 * time.Second
s.optsMu.Unlock()

// Connect to the server to keep it alive when we go into LDM.
dummy, _ := jsClientConnect(t, s)
defer dummy.Close()

// Connect to the third server.
nc, js := jsClientConnect(t, c.servers[2])
defer nc.Close()

// Now put the first server into lame duck mode.
go s.lameDuckMode()

// Wait for news to arrive that the first server has gone into
// lame duck mode and been marked offline.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
id := s.info.ID
s := c.servers[2]
s.mu.RLock()
defer s.mu.RUnlock()

var isOffline bool
s.nodeToInfo.Range(func(_, v any) bool {
ni := v.(nodeInfo)
if ni.id == id {
isOffline = ni.offline
return false
}
return true
})

if !isOffline {
return fmt.Errorf("first node is still online unexpectedly")
}
return nil
})

// Create a go routine that will create streams constantly.
qch := make(chan bool)
go func() {
var index int
for {
select {
case <-time.After(time.Millisecond * 25):
index++
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("NEW_TEST_%d", index),
Subjects: []string{fmt.Sprintf("bar.%d", index)},
Replicas: 1,
})
if err != nil {
return
}
case <-qch:
return
}
}
}()
defer close(qch)

// Make sure we do not have any R1 assets placed on the lameduck server.
for s.isRunning() {
s.rnMu.RLock()
if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil {
s.rnMu.RUnlock()
break
}
hasAsset := len(s.js.srv.gacc.streams()) > 0
s.rnMu.RUnlock()
if hasAsset {
t.Fatalf("Server had an R1 asset when it should not due to lameduck mode")
}
}
}

// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and
Expand Down
2 changes: 1 addition & 1 deletion server/monitor_test.go
Expand Up @@ -3942,7 +3942,7 @@ func TestMonitorAccountz(t *testing.T) {
body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath)))
require_Contains(t, body, `"account_detail": {`)
require_Contains(t, body, `"account_name": "$SYS",`)
require_Contains(t, body, `"subscriptions": 40,`)
require_Contains(t, body, `"subscriptions": 41,`)
require_Contains(t, body, `"is_system": true,`)
require_Contains(t, body, `"system_account": "$SYS"`)

Expand Down
1 change: 1 addition & 0 deletions server/server.go
Expand Up @@ -3557,6 +3557,7 @@ func (s *Server) lameDuckMode() {
}
s.Noticef("Entering lame duck mode, stop accepting new clients")
s.ldm = true
s.sendLDMShutdownEventLocked()
expected := 1
s.listener.Close()
s.listener = nil
Expand Down

0 comments on commit 3377f04

Please sign in to comment.