Skip to content

Commit

Permalink
Adds a new 'LDM' SYS server API service that listens on `$SYS.REQ.SER…
Browse files Browse the repository at this point in the history
…VER.%s.LDM` (where %s is the server_id) and takes a JSON payload containing either an "id" or a "name" field. "id" sends an LDM Info message to the client connection id, "name" sends an LDM Info message to _all_ of the clients connected to the server with that name.

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
  • Loading branch information
jnmoyne committed Jul 8, 2023
1 parent f24f068 commit 218ad85
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
24 changes: 24 additions & 0 deletions server/events.go
Expand Up @@ -55,6 +55,7 @@ const (
accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
clientKickReqSubj = "$SYS.REQ.SERVER.%s.KICK"
clientLDMReqSubj = "$SYS.REQ.SERVER.%s.LDM"
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
authErrorAccountEventSubj = "$SYS.ACCOUNT.CLIENT.AUTH.ERR"
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
Expand Down Expand Up @@ -1213,6 +1214,11 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.kickClient)); err != nil {
s.Errorf("Error setting up client kick service: %v", err)
}
// Client connection LDM
subject = fmt.Sprintf(clientLDMReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.ldmClient)); err != nil {
s.Errorf("Error setting up client LDM service: %v", err)
}
}

// UserInfo returns basic information to a user about bound account and user permissions.
Expand Down Expand Up @@ -2690,6 +2696,11 @@ type kickClientReq struct {
Id uint64 `json:"id"`
}

type ldmClientReq struct {
Name string `json:"name"`
Id uint64 `json:"id"`
}

func (s *Server) kickClient(_ *subscription, _ *client, _ *Account, subject, reply string, hdr, msg []byte) {
var req kickClientReq
if err := json.Unmarshal(msg, &req); err != nil {
Expand All @@ -2703,6 +2714,19 @@ func (s *Server) kickClient(_ *subscription, _ *client, _ *Account, subject, rep
}
}

func (s *Server) ldmClient(_ *subscription, _ *client, _ *Account, subject, reply string, hdr, msg []byte) {
var req ldmClientReq
if err := json.Unmarshal(msg, &req); err != nil {
s.sys.client.Errorf("Error unmarshalling kick client request: %v", err)
return
}
if req.Id != 0 {
s.LDMClientByID(req.Id)
} else if req.Name != _EMPTY_ {
s.LDMClientsByName(req.Name)
}
}

// Helper to grab account name for a client.
func accForClient(c *client) string {
if c.acc != nil {
Expand Down
34 changes: 29 additions & 5 deletions server/server.go
Expand Up @@ -4170,17 +4170,41 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) {
}
}

// Disconnects a client by cid
func (s *Server) DisconnectClientByID(id uint64) {
client := s.clients[id]
if client != nil {
client.closeConnection(Kicked)
}
}

// Disconnects a client by name
func (s *Server) DisconnectClientsByName(name string) {
for _, c := range s.clients {
if c.GetName() == name {
c.closeConnection(Kicked)
s.DisconnectClientByID(c.cid)
}
}
}

// Disconnects a client by cid
func (s *Server) DisconnectClientByID(id uint64) {
client := s.clients[id]
client.closeConnection(Kicked)
func (s *Server) LDMClientByID(id uint64) {
info := s.copyInfo()
info.LameDuckMode = true

c := s.clients[id]
if c != nil && c.opts.Protocol >= ClientProtoInfo &&
c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
s.Debugf("sending LDM info to cid=%d", id)
c.enqueueProto(c.generateClientInfoJSON(info))
}
}

func (s *Server) LDMClientsByName(name string) {
for _, c := range s.clients {
if c.GetName() == name {
s.LDMClientByID(c.cid)
}
}
}

0 comments on commit 218ad85

Please sign in to comment.