Skip to content

Commit

Permalink
Added: reload server config by sending it a message (#4307)
Browse files Browse the repository at this point in the history
 - [ ] Link to issue, e.g. `Resolves #NNN`
 - [ ] Documentation added (if applicable)
 - [x] Tests added
- [x] Branch rebased on top of current main (`git pull --rebase origin
main`)
- [x] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [x] Build is green in Travis CI
- [ ] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)

Resolves #

### Changes proposed in this pull request:

- Reload server's configuration by sending a message to
`$SYS.REQ.SERVER.{server-id}.RELOAD`
  • Loading branch information
derekcollison committed Jul 26, 2023
2 parents 2cb1512 + a5aaf78 commit f4f5a84
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
19 changes: 19 additions & 0 deletions server/events.go
Expand Up @@ -60,6 +60,7 @@ const (
serverDirectReqSubj = "$SYS.REQ.SERVER.%s.%s"
serverPingReqSubj = "$SYS.REQ.SERVER.PING.%s"
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" // use $SYS.REQ.SERVER.PING.STATSZ instead
serverReloadReqSubj = "$SYS.REQ.SERVER.%s.RELOAD" // with server ID
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
inboxRespSubj = "$SYS._INBOX.%s.%s"
Expand Down Expand Up @@ -1187,6 +1188,12 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil {
s.Errorf("Error setting up internal debug service for subscribers: %v", err)
}

// Listen for requests to reload the server configuration.
subject = fmt.Sprintf(serverReloadReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.reloadConfig)); err != nil {
s.Errorf("Error setting up server reload handler: %v", err)
}
}

// UserInfo returns basic information to a user about bound account and user permissions.
Expand Down Expand Up @@ -2659,6 +2666,18 @@ func (s *Server) nsubsRequest(sub *subscription, c *client, _ *Account, subject,
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nsubs)
}

func (s *Server) reloadConfig(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.eventsRunning() {
return
}

optz := &EventFilterOptions{}
s.zReq(c, reply, hdr, msg, optz, optz, func() (interface{}, error) {
// Reload the server config, as requested.
return nil, s.Reload()
})
}

// Helper to grab account name for a client.
func accForClient(c *client) string {
if c.acc != nil {
Expand Down
77 changes: 76 additions & 1 deletion server/events_test.go
Expand Up @@ -1666,7 +1666,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 52, sa)
checkExpectedSubs(t, 53, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down Expand Up @@ -2535,6 +2535,81 @@ func TestServerEventsStatszSingleServer(t *testing.T) {
checkSubsPending(t, sub, 1)
}

func TestServerEventsReload(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: "127.0.0.1:-1"
accounts: {
$SYS { users [{user: "admin", password: "p1d"}]}
test { users [{user: "foo", password: "bar"}]}
}
ping_interval: "100ms"
`))
opts := LoadConfig(conf)
s := RunServer(opts)
defer s.Shutdown()
subject := fmt.Sprintf(serverReloadReqSubj, s.info.ID)

// Connect as a test user and make sure the reload endpoint is not
// accessible.
ncTest, _ := jsClientConnect(t, s, nats.UserInfo("foo", "bar"))
defer ncTest.Close()
testReply := ncTest.NewRespInbox()
sub, err := ncTest.SubscribeSync(testReply)
require_NoError(t, err)
err = ncTest.PublishRequest(subject, testReply, nil)
require_NoError(t, err)
_, err = sub.NextMsg(time.Second)
require_Error(t, err)

require_True(t, s.getOpts().PingInterval == 100*time.Millisecond)

// Connect as a system user.
nc, _ := jsClientConnect(t, s, nats.UserInfo("admin", "p1d"))
defer nc.Close()

// rewrite the config file with a different ping interval
err = os.WriteFile(conf, []byte(`
listen: "127.0.0.1:-1"
accounts: {
$SYS { users [{user: "admin", password: "p1d"}]}
test { users [{user: "foo", password: "bar"}]}
}
ping_interval: "200ms"
`), 0666)
require_NoError(t, err)

msg, err := nc.Request(subject, nil, time.Second)
require_NoError(t, err)

var apiResp = ServerAPIResponse{}
err = json.Unmarshal(msg.Data, &apiResp)
require_NoError(t, err)

require_True(t, apiResp.Data == nil)
require_True(t, apiResp.Error == nil)

// See that the ping interval has changed.
require_True(t, s.getOpts().PingInterval == 200*time.Millisecond)

// rewrite the config file with a different ping interval
err = os.WriteFile(conf, []byte(`garbage and nonsense`), 0666)
require_NoError(t, err)

// Request the server to reload and wait for the response.
msg, err = nc.Request(subject, nil, time.Second)
require_NoError(t, err)

apiResp = ServerAPIResponse{}
err = json.Unmarshal(msg.Data, &apiResp)
require_NoError(t, err)

require_True(t, apiResp.Data == nil)
require_Error(t, apiResp.Error, fmt.Errorf("Parse error on line 1: 'Expected a top-level value to end with a new line, comment or EOF, but got 'n' instead.'"))

// See that the ping interval has not changed.
require_True(t, s.getOpts().PingInterval == 200*time.Millisecond)
}

func Benchmark_GetHash(b *testing.B) {
b.StopTimer()
// Get 100 random names
Expand Down
2 changes: 1 addition & 1 deletion server/monitor_test.go
Expand Up @@ -3945,7 +3945,7 @@ func TestMonitorAccountz(t *testing.T) {
body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath)))
require_Contains(t, body, `"account_detail": {`)
require_Contains(t, body, `"account_name": "$SYS",`)
require_Contains(t, body, `"subscriptions": 46,`)
require_Contains(t, body, `"subscriptions": 47,`)
require_Contains(t, body, `"is_system": true,`)
require_Contains(t, body, `"system_account": "$SYS"`)

Expand Down

0 comments on commit f4f5a84

Please sign in to comment.