Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added: reload server config by sending it a message #4307

Merged
merged 8 commits into from Jul 26, 2023
19 changes: 19 additions & 0 deletions server/events.go
Expand Up @@ -60,6 +60,7 @@ const (
serverDirectReqSubj = "$SYS.REQ.SERVER.%s.%s"
serverPingReqSubj = "$SYS.REQ.SERVER.PING.%s"
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" // use $SYS.REQ.SERVER.PING.STATSZ instead
serverReloadReqSubj = "$SYS.REQ.SERVER.%s.RELOAD" // with server ID
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
inboxRespSubj = "$SYS._INBOX.%s.%s"
Expand Down Expand Up @@ -1207,6 +1208,12 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil {
s.Errorf("Error setting up internal debug service for subscribers: %v", err)
}

// Listen for requests to reload the server configuration.
subject = fmt.Sprintf(serverReloadReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.reloadConfig)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this error log can probably be improved, I dont think internal tracking will tell people what this is about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ripienaar thx for catching the incomplete cut&paste, improved the message.

}
}

// UserInfo returns basic information to a user about bound account and user permissions.
Expand Down Expand Up @@ -2679,6 +2686,18 @@ func (s *Server) nsubsRequest(sub *subscription, c *client, _ *Account, subject,
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nsubs)
}

func (s *Server) reloadConfig(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.eventsRunning() {
return
}

optz := &EventFilterOptions{}
s.zReq(c, reply, hdr, msg, optz, optz, func() (interface{}, error) {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
// Reload the server config, as requested.
return nil, s.Reload()
})
}

// Helper to grab account name for a client.
func accForClient(c *client) string {
if c.acc != nil {
Expand Down
91 changes: 90 additions & 1 deletion server/events_test.go
Expand Up @@ -1666,7 +1666,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 52, sa)
checkExpectedSubs(t, 53, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down Expand Up @@ -2535,6 +2535,95 @@ func TestServerEventsStatszSingleServer(t *testing.T) {
checkSubsPending(t, sub, 1)
}

func TestServerEventsReload(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: "127.0.0.1:-1"
accounts: {
$SYS { users [{user: "admin", password: "p1d"}]}
test { users [{user: "foo", password: "bar"}]}
}
ping_interval: "100ms"
`))
opts := LoadConfig(conf)
opts.Trace = true
levb marked this conversation as resolved.
Show resolved Hide resolved
opts.Debug = true
opts.TraceVerbose = true
s := RunServer(opts)
defer s.Shutdown()
subject := fmt.Sprintf(serverReloadReqSubj, s.info.ID)

// Connect as a test user and make sure the reload endpoint is not
// accessible.
ncTest, _ := jsClientConnect(t, s, nats.UserInfo("foo", "bar"))
defer ncTest.Close()
testReply := ncTest.NewRespInbox()
sub, err := ncTest.SubscribeSync(testReply)
require_NoError(t, err)
err = ncTest.PublishRequest(subject, testReply, nil)
require_NoError(t, err)
_, err = sub.NextMsg(time.Second)
require_Error(t, err)

require_True(t, s.getOpts().PingInterval == 100*time.Millisecond)

// Connect as a system user.
nc, _ := jsClientConnect(t, s, nats.UserInfo("admin", "p1d"))
defer nc.Close()

// rewrite the config file with a different ping interval
err = os.WriteFile(conf, []byte(`
listen: "127.0.0.1:-1"
accounts: {
$SYS { users [{user: "admin", password: "p1d"}]}
test { users [{user: "foo", password: "bar"}]}
}
ping_interval: "200ms"
`), 0666)
require_NoError(t, err)

// Request the server to reload and wait for the response.
reply := nc.NewRespInbox()
sub, err = nc.SubscribeSync(reply)
require_NoError(t, err)
err = nc.PublishRequest(subject, reply, nil)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
levb marked this conversation as resolved.
Show resolved Hide resolved
require_NoError(t, err)
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)

apiResp := ServerAPIResponse{}
err = json.Unmarshal(msg.Data, &apiResp)
require_NoError(t, err)

require_True(t, apiResp.Data == nil)
require_True(t, apiResp.Error == nil)

// See that the ping interval has changed.
require_True(t, s.getOpts().PingInterval == 200*time.Millisecond)

// rewrite the config file with a different ping interval
err = os.WriteFile(conf, []byte(`garbage and nonsense`), 0666)
require_NoError(t, err)

// Request the server to reload and wait for the response.
reply = nc.NewRespInbox()
sub, err = nc.SubscribeSync(reply)
require_NoError(t, err)
err = nc.PublishRequest(subject, reply, nil)
require_NoError(t, err)
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)

apiResp = ServerAPIResponse{}
levb marked this conversation as resolved.
Show resolved Hide resolved
err = json.Unmarshal(msg.Data, &apiResp)
require_NoError(t, err)

require_True(t, apiResp.Data == nil)
require_Error(t, apiResp.Error, fmt.Errorf("Parse error on line 1: 'Expected a top-level value to end with a new line, comment or EOF, but got 'n' instead.'"))

// See that the ping interval has not changed.
require_True(t, s.getOpts().PingInterval == 200*time.Millisecond)
}

func Benchmark_GetHash(b *testing.B) {
b.StopTimer()
// Get 100 random names
Expand Down
2 changes: 1 addition & 1 deletion server/monitor_test.go
Expand Up @@ -3945,7 +3945,7 @@ func TestMonitorAccountz(t *testing.T) {
body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath)))
require_Contains(t, body, `"account_detail": {`)
require_Contains(t, body, `"account_name": "$SYS",`)
require_Contains(t, body, `"subscriptions": 46,`)
require_Contains(t, body, `"subscriptions": 47,`)
require_Contains(t, body, `"is_system": true,`)
require_Contains(t, body, `"system_account": "$SYS"`)

Expand Down