Skip to content

Commit

Permalink
[ADDED] Multiple routes and ability to have per-account routes (#4001)
Browse files Browse the repository at this point in the history
New configuration fields:
```
cluster {
   ...
   pool_size: 5
   accounts: ["A", "B"]
}
```

The configuration `pool_size` in the example above means that this
server will create 5 routes to a remote server, assuming that that
server has the same `pool_size` setting.

Accounts (which are not part of the `accounts[]` configuration)
are assigned a specific route in this pool, and this will be the
same route on all servers in the cluster.

Accounts that are defined in the `accounts` field will each have
a dedicated route connection. This will allow suppression of the
account name in some of the route protocols, reducing bytes transmitted
which may increase performance.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
derekcollison committed Apr 3, 2023
2 parents 01a2c04 + 83c5c01 commit 1ae51b2
Show file tree
Hide file tree
Showing 40 changed files with 5,015 additions and 949 deletions.
104 changes: 95 additions & 9 deletions server/accounts.go
Expand Up @@ -91,8 +91,14 @@ type Account struct {
tags jwt.TagList
nameTag string
lastLimErr int64
routePoolIdx int
}

const (
accDedicatedRoute = -1
accTransitioningToDedicatedRoute = -2
)

// Account based limits.
type limits struct {
mpay int32
Expand Down Expand Up @@ -238,8 +244,7 @@ func (a *Account) String() string {

// Used to create shallow copies of accounts for transfer
// from opts to real accounts in server struct.
func (a *Account) shallowCopy() *Account {
na := NewAccount(a.Name)
func (a *Account) shallowCopy(na *Account) {
na.Nkey = a.Nkey
na.Issuer = a.Issuer

Expand Down Expand Up @@ -279,12 +284,14 @@ func (a *Account) shallowCopy() *Account {
}
}
}
na.mappings = a.mappings
if len(na.mappings) > 0 && na.prand == nil {
na.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// JetStream
na.jsLimits = a.jsLimits
// Server config account limits.
na.limits = a.limits

return na
}

// nextEventID uses its own lock for better concurrency.
Expand Down Expand Up @@ -1331,19 +1338,19 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
// FIXME(dlc) - We need to clean these up but this should happen
// already with the auto-expire logic.
if responder != nil && responder.kind != CLIENT {
si.acc.mu.Lock()
a.mu.Lock()
if si.m1 != nil {
m1, m2 := sl, si.m1
m1.merge(m2)
si.acc.mu.Unlock()
a.mu.Unlock()
a.srv.sendInternalAccountMsg(a, si.latency.subject, m1)
a.mu.Lock()
si.rc = nil
a.mu.Unlock()
return true
}
si.m1 = sl
si.acc.mu.Unlock()
a.mu.Unlock()
return false
} else {
a.srv.sendInternalAccountMsg(a, si.latency.subject, sl)
Expand Down Expand Up @@ -2747,7 +2754,12 @@ func (a *Account) checkStreamImportsEqual(b *Account) bool {
return true
}

// Returns true if `a` and `b` stream exports are the same.
// Acquires `a` read lock, but `b` is assumed to not be accessed
// by anyone but the caller (`b` is not registered anywhere).
func (a *Account) checkStreamExportsEqual(b *Account) bool {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.exports.streams) != len(b.exports.streams) {
return false
}
Expand All @@ -2756,14 +2768,29 @@ func (a *Account) checkStreamExportsEqual(b *Account) bool {
if !ok {
return false
}
if !reflect.DeepEqual(aea, bea) {
if !isStreamExportEqual(aea, bea) {
return false
}
}
return true
}

func isStreamExportEqual(a, b *streamExport) bool {
if a == nil && b == nil {
return true
}
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
return isExportAuthEqual(&a.exportAuth, &b.exportAuth)
}

// Returns true if `a` and `b` service exports are the same.
// Acquires `a` read lock, but `b` is assumed to not be accessed
// by anyone but the caller (`b` is not registered anywhere).
func (a *Account) checkServiceExportsEqual(b *Account) bool {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.exports.services) != len(b.exports.services) {
return false
}
Expand All @@ -2772,7 +2799,66 @@ func (a *Account) checkServiceExportsEqual(b *Account) bool {
if !ok {
return false
}
if !reflect.DeepEqual(aea, bea) {
if !isServiceExportEqual(aea, bea) {
return false
}
}
return true
}

func isServiceExportEqual(a, b *serviceExport) bool {
if a == nil && b == nil {
return true
}
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
if !isExportAuthEqual(&a.exportAuth, &b.exportAuth) {
return false
}
if a.acc.Name != b.acc.Name {
return false
}
if a.respType != b.respType {
return false
}
if a.latency != nil || b.latency != nil {
if (a.latency != nil && b.latency == nil) || (a.latency == nil && b.latency != nil) {
return false
}
if a.latency.sampling != b.latency.sampling {
return false
}
if a.latency.subject != b.latency.subject {
return false
}
}
return true
}

// Returns true if `a` and `b` exportAuth structures are equal.
// Both `a` and `b` are guaranteed to be non-nil.
// Locking is handled by the caller.
func isExportAuthEqual(a, b *exportAuth) bool {
if a.tokenReq != b.tokenReq {
return false
}
if a.accountPos != b.accountPos {
return false
}
if len(a.approved) != len(b.approved) {
return false
}
for ak, av := range a.approved {
if bv, ok := b.approved[ak]; !ok || av.Name != bv.Name {
return false
}
}
if len(a.actsRevoked) != len(b.actsRevoked) {
return false
}
for ak, av := range a.actsRevoked {
if bv, ok := b.actsRevoked[ak]; !ok || av != bv {
return false
}
}
Expand Down
11 changes: 7 additions & 4 deletions server/auth.go
Expand Up @@ -180,13 +180,14 @@ func (p *Permissions) clone() *Permissions {
// Lock is assumed held.
func (s *Server) checkAuthforWarnings() {
warn := false
if s.opts.Password != _EMPTY_ && !isBcrypt(s.opts.Password) {
opts := s.getOpts()
if opts.Password != _EMPTY_ && !isBcrypt(opts.Password) {
warn = true
}
for _, u := range s.users {
// Skip warn if using TLS certs based auth
// unless a password has been left in the config.
if u.Password == _EMPTY_ && s.opts.TLSMap {
if u.Password == _EMPTY_ && opts.TLSMap {
continue
}
// Check if this is our internal sys client created on the fly.
Expand Down Expand Up @@ -277,8 +278,10 @@ func (s *Server) configureAuthorization() {

// Check for server configured auth callouts.
if opts.AuthCallout != nil {
s.mu.Unlock()
// Make sure we have a valid account and auth_users.
_, err := s.lookupAccount(opts.AuthCallout.Account)
s.mu.Lock()
if err != nil {
s.Errorf("Authorization callout account %q not valid", opts.AuthCallout.Account)
}
Expand Down Expand Up @@ -1198,8 +1201,8 @@ func (s *Server) isRouterAuthorized(c *client) bool {

// Check custom auth first, then TLS map if enabled
// then single user/pass.
if s.opts.CustomRouterAuthentication != nil {
return s.opts.CustomRouterAuthentication.Check(c)
if opts.CustomRouterAuthentication != nil {
return opts.CustomRouterAuthentication.Check(c)
}

if opts.Cluster.TLSMap || opts.Cluster.TLSCheckKnownURLs {
Expand Down
3 changes: 3 additions & 0 deletions server/auth_callout_test.go
Expand Up @@ -558,6 +558,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
}

ac := NewAuthTest(t, conf, handler, nats.UserCredentials(creds))
defer ac.Cleanup()
resp, err := ac.authClient.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
response := ServerAPIResponse{Data: &UserInfo{}}
Expand Down Expand Up @@ -795,6 +796,7 @@ func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
}

ac := NewAuthTest(t, conf, handler, nats.UserCredentials(creds))
defer ac.Cleanup()

// Bearer token etc..
// This is used by all users, and the customization will be in other connect args.
Expand Down Expand Up @@ -1382,6 +1384,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {
}

ac := NewAuthTest(t, conf, handler, nats.UserCredentials(creds))
defer ac.Cleanup()
resp, err := ac.authClient.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
response := ServerAPIResponse{Data: &UserInfo{}}
Expand Down

0 comments on commit 1ae51b2

Please sign in to comment.