Skip to content

Commit

Permalink
Changes based on code review
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Mar 30, 2023
1 parent 9cb6af2 commit bf0df0d
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 28 deletions.
2 changes: 1 addition & 1 deletion server/events_test.go
Expand Up @@ -1883,7 +1883,7 @@ func TestServerEventsStatsZ(t *testing.T) {
if m.Stats.Received.Msgs < 1 {
t.Fatalf("Did not match received msgs of >=1, got %d", m.Stats.Received.Msgs)
}
// Default pool size + 1 for systemt account
// Default pool size + 1 for system account
expectedRoutes := DEFAULT_ROUTE_POOL_SIZE + 1
if lr := len(m.Stats.Routes); lr != expectedRoutes {
t.Fatalf("Expected %d routes, but got %d", expectedRoutes, lr)
Expand Down
4 changes: 2 additions & 2 deletions server/gateway_test.go
Expand Up @@ -5113,7 +5113,7 @@ func TestGatewaySendReplyAcrossGateways(t *testing.T) {
oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
oa1.Cluster.PoolSize = test.poolSize
if test.peracc {
oa1.Cluster.Accounts = []string{"ACC"}
oa1.Cluster.PinnedAccounts = []string{"ACC"}
}
oa1.Accounts = []*Account{NewAccount("ACC")}
oa1.Users = []*User{{Username: "user", Password: "pwd", Account: oa1.Accounts[0]}}
Expand All @@ -5132,7 +5132,7 @@ func TestGatewaySendReplyAcrossGateways(t *testing.T) {
oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
oa2.Cluster.PoolSize = test.poolSize
if test.peracc {
oa2.Cluster.Accounts = []string{"ACC"}
oa2.Cluster.PinnedAccounts = []string{"ACC"}
}
oa2.Accounts = []*Account{NewAccount("ACC")}
oa2.Users = []*User{{Username: "user", Password: "pwd", Account: oa2.Accounts[0]}}
Expand Down
8 changes: 4 additions & 4 deletions server/opts.go
Expand Up @@ -77,7 +77,7 @@ type ClusterOpts struct {
NoAdvertise bool `json:"-"`
ConnectRetries int `json:"-"`
PoolSize int `json:"-"`
Accounts []string `json:"-"`
PinnedAccounts []string `json:"-"`

// Not exported (used in tests)
resolver netResolver
Expand Down Expand Up @@ -1600,7 +1600,7 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err
case "pool_size":
opts.Cluster.PoolSize = int(mv.(int64))
case "accounts":
opts.Cluster.Accounts, _ = parseStringArray("accounts", tk, &lt, mv, errors, warnings)
opts.Cluster.PinnedAccounts, _ = parseStringArray("accounts", tk, &lt, mv, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down Expand Up @@ -4692,14 +4692,14 @@ func setBaselineOptions(opts *Options) {
}
if sysAccName != _EMPTY_ {
var found bool
for _, acc := range opts.Cluster.Accounts {
for _, acc := range opts.Cluster.PinnedAccounts {
if acc == sysAccName {
found = true
break
}
}
if !found {
opts.Cluster.Accounts = append(opts.Cluster.Accounts, sysAccName)
opts.Cluster.PinnedAccounts = append(opts.Cluster.PinnedAccounts, sysAccName)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions server/reload.go
Expand Up @@ -400,17 +400,17 @@ func (c *clusterOption) IsClusterPoolSizeOrAccountsChange() bool {
func (c *clusterOption) diffPoolAndAccounts(old *ClusterOpts) {
c.poolSizeChanged = c.newValue.PoolSize != old.PoolSize
addLoop:
for _, na := range c.newValue.Accounts {
for _, oa := range old.Accounts {
for _, na := range c.newValue.PinnedAccounts {
for _, oa := range old.PinnedAccounts {
if na == oa {
continue addLoop
}
}
c.accsAdded = append(c.accsAdded, na)
}
removeLoop:
for _, oa := range old.Accounts {
for _, na := range c.newValue.Accounts {
for _, oa := range old.PinnedAccounts {
for _, na := range c.newValue.PinnedAccounts {
if oa == na {
continue removeLoop
}
Expand Down Expand Up @@ -463,7 +463,7 @@ func (r *routesOption) Apply(server *Server) {

// Add routes.
server.mu.Lock()
server.solicitRoutes(r.add, server.getOpts().Cluster.Accounts)
server.solicitRoutes(r.add, server.getOpts().Cluster.PinnedAccounts)
server.mu.Unlock()

server.Noticef("Reloaded: cluster routes")
Expand Down Expand Up @@ -2135,7 +2135,7 @@ func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options)
// We should always have at least the system account with a dedicated route,
// but in case we have a configuration that disables pooling and without
// a system account, possibly set the accRoutes to nil.
if len(opts.Cluster.Accounts) == 0 {
if len(opts.Cluster.PinnedAccounts) == 0 {
s.accRoutes = nil
}
// Now deal with pool size updates.
Expand Down
13 changes: 10 additions & 3 deletions server/route.go
Expand Up @@ -1056,7 +1056,7 @@ func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) {
case subjIdx + 2:
queue = args[subjIdx+1]
default:
return "", nil, nil, fmt.Errorf("parse error: '%s'", arg)
return _EMPTY_, nil, nil, fmt.Errorf("parse error: '%s'", arg)
}
if accountName == _EMPTY_ {
accountName = string(args[0])
Expand Down Expand Up @@ -1238,6 +1238,10 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
if accInProto {
sub.sid = arg[accPos : accPos+len(accountName)+1+len(sub.subject)+1+len(sub.queue)]
} else {
// It is unfortunate that we have to do this, but the gain of not
// having the account name in message protocols outweight the
// penalty of having to do this here for the processing of a
// subscription.
sub.sid = append(sub.sid, accountName...)
sub.sid = append(sub.sid, ' ')
sub.sid = append(sub.sid, sub.subject...)
Expand Down Expand Up @@ -1382,7 +1386,7 @@ func (s *Server) sendSubsToRoute(route *client, idx int, account string) {
}
}
// Send over our account subscriptions.
accs := make([]*Account, 0, 32)
accs := make([]*Account, 0, 1024)
if idx < 0 || account != _EMPTY_ {
if ai, ok := s.accounts.Load(account); ok {
a := ai.(*Account)
Expand Down Expand Up @@ -1868,6 +1872,9 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
select {
case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
case <-s.quitCh:
// Doing this here and not as a defer because connectToRoute is also
// calling s.grWG.Done() on exit, so we do this only if we don't
// invoke connectToRoute().
s.grWG.Done()
return
}
Expand Down Expand Up @@ -2238,7 +2245,7 @@ func (s *Server) startRouteAcceptLoop() {
go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, _EMPTY_) }, nil)

// Solicit Routes if applicable. This will not block.
s.solicitRoutes(opts.Routes, opts.Cluster.Accounts)
s.solicitRoutes(opts.Routes, opts.Cluster.PinnedAccounts)

s.mu.Unlock()
}
Expand Down
2 changes: 1 addition & 1 deletion server/routes_test.go
Expand Up @@ -2457,7 +2457,7 @@ func TestRoutePerAccountConnectRace(t *testing.T) {
o.Accounts = []*Account{NewAccount("A")}
o.NoSystemAccount = true
o.Cluster.PoolSize = 1
o.Cluster.Accounts = []string{"A"}
o.Cluster.PinnedAccounts = []string{"A"}
o.Cluster.Name = "local"
o.Cluster.Port = port
o.Routes = RoutesFromStr("nats://127.0.0.1:1234,nats://127.0.0.1:1235,nats://127.0.0.1:1236")
Expand Down
20 changes: 9 additions & 11 deletions server/server.go
Expand Up @@ -590,9 +590,9 @@ func (s *Server) initRouteStructures(opts *Options) {
// If we have per-account routes, we create accRoutes and initialize it
// with nil values. The presence of an account as the key will allow us
// to know if a given account is supposed to have dedicated routes.
if l := len(opts.Cluster.Accounts); l > 0 {
if l := len(opts.Cluster.PinnedAccounts); l > 0 {
s.accRoutes = make(map[string]map[string]*client, l)
for _, acc := range opts.Cluster.Accounts {
for _, acc := range opts.Cluster.PinnedAccounts {
s.accRoutes[acc] = make(map[string]*client)
}
}
Expand Down Expand Up @@ -701,14 +701,14 @@ func validateCluster(o *Options) error {
// Set this here so we do not consider it dynamic.
o.Cluster.Name = o.Gateway.Name
}
if l := len(o.Cluster.Accounts); l > 0 {
if l := len(o.Cluster.PinnedAccounts); l > 0 {
if o.Cluster.PoolSize < 0 {
return fmt.Errorf("pool_size cannot be negative if accounts are specified")
return fmt.Errorf("pool_size cannot be negative if pinned accounts are specified")
}
m := make(map[string]struct{}, l)
for _, a := range o.Cluster.Accounts {
for _, a := range o.Cluster.PinnedAccounts {
if _, exists := m[a]; exists {
return fmt.Errorf("found duplicate account name %q in accounts list %q", a, o.Cluster.Accounts)
return fmt.Errorf("found duplicate account name %q in pinned accounts list %q", a, o.Cluster.PinnedAccounts)
}
m[a] = struct{}{}
}
Expand Down Expand Up @@ -912,10 +912,8 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
for _, si := range acc.imports.services {
if v, ok := s.accounts.Load(si.acc.Name); ok {
si.acc = v.(*Account)
// TODO: Not sure if it is possible for an account to have a
// service import from itself, but if that is the case,
// we are already lock, otherwise use locking to protect
// the call to si.acc.getServiceExport().
// It is possible to allow for latency tracking inside your
// own account, so lock only when not the same account.
if si.acc == acc {
si.se = si.acc.getServiceExport(si.to)
continue
Expand Down Expand Up @@ -1876,7 +1874,7 @@ func (s *Server) Start() {
if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
opts := s.getOpts()
_, isMemResolver := ar.(*MemAccResolver)
if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == "" {
if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == _EMPTY_ {
s.Noticef("Using bootstrapping system account")
s.startGoRoutine(func() {
defer s.grWG.Done()
Expand Down

0 comments on commit bf0df0d

Please sign in to comment.