Skip to content

Commit

Permalink
Fixed cluster permissions configuration reload (#4183)
Browse files Browse the repository at this point in the history
This is a rework of incorrect changes made in PR #4001. Changes did not
work for changes to export permissions. Test was modified to add export
changes.

This affects only the `dev` branch.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
derekcollison committed May 19, 2023
2 parents 646ae9e + 7cf00c8 commit c9d6900
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 84 deletions.
136 changes: 73 additions & 63 deletions server/reload.go
Expand Up @@ -2029,11 +2029,8 @@ func (s *Server) clientHasMovedToDifferentAccount(c *client) bool {
// import subjects.
func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
s.mu.Lock()
var (
infoJSON []byte
newPerms = s.getOpts().Cluster.Permissions
routes = make(map[uint64]*client, s.numRoutes())
)
newPerms := s.getOpts().Cluster.Permissions
routes := make(map[uint64]*client, s.numRoutes())
// Get all connected routes
s.forEachRoute(func(route *client) {
route.mu.Lock()
Expand All @@ -2048,8 +2045,7 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
s.routeInfo.Import = newPerms.Import
s.routeInfo.Export = newPerms.Export
}
// Copy the current route's INFO struct. We will need to modify it per-account
routeInfo := s.routeInfo
infoJSON := generateInfoJSON(&s.routeInfo)
s.mu.Unlock()

// Close connections for routes that don't understand async INFO.
Expand All @@ -2075,17 +2071,46 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
newPermsTester := &client{}
newPermsTester.setRoutePermissions(newPerms)

// For a given account and list of remotes, will send an INFO protocol so
// that remote updates its route permissions and will also send the RS+ or
// RS- for subscriptions that become or are no longer permitted.
update := func(accName string, sl *Sublist, remotes []*client) {
var (
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
subsNeedSUB []*subscription
subsNeedUNSUB []*subscription
deleteRoutedSubs []*subscription
)
var (
_localSubs [4096]*subscription
subsNeedSUB = map[*client][]*subscription{}
subsNeedUNSUB = map[*client][]*subscription{}
deleteRoutedSubs []*subscription
)

getRouteForAccount := func(accName string, poolIdx int) *client {
for _, r := range routes {
r.mu.Lock()
ok := (poolIdx >= 0 && poolIdx == r.route.poolIdx) || (string(r.route.accName) == accName) || r.route.noPool
r.mu.Unlock()
if ok {
return r
}
}
return nil
}

// First set the new permissions on all routes.
for _, route := range routes {
route.mu.Lock()
route.setRoutePermissions(newPerms)
route.mu.Unlock()
}

// Then, go over all accounts and gather local subscriptions that need to be
// sent over as SUB or removed as UNSUB, and routed subscriptions that need
// to be dropped due to export permissions.
s.accounts.Range(func(_, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
accName, sl, poolIdx := acc.Name, acc.sl, acc.routePoolIdx
acc.mu.RUnlock()
// Get the route handling this account. If no route or sublist, bail out.
route := getRouteForAccount(accName, poolIdx)
if route == nil || sl == nil {
return true
}
localSubs := _localSubs[:0]
sl.localSubs(&localSubs, false)

// Go through all local subscriptions
Expand All @@ -2097,64 +2122,49 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
if canImportNow {
// If we could not before, then will need to send a SUB protocol.
if !couldImportThen {
subsNeedSUB = append(subsNeedSUB, sub)
subsNeedSUB[route] = append(subsNeedSUB[route], sub)
}
} else if couldImportThen {
// We were previously able to import this sub, but now
// we can't so we need to send an UNSUB protocol
subsNeedUNSUB = append(subsNeedUNSUB, sub)
subsNeedUNSUB[route] = append(subsNeedUNSUB[route], sub)
}
}

for _, route := range remotes {
// We do this manually here, not invoke generateRouteInfoJSON()
routeInfo.RemoteAccount = accName
infoJSON = generateInfoJSON(&routeInfo)

route.mu.Lock()
route.setRoutePermissions(newPerms)
for _, sub := range route.subs {
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
subj := string(sub.subject)
if !route.canExport(subj) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
deleteRoutedSubs = deleteRoutedSubs[:0]
route.mu.Lock()
for key, sub := range route.subs {
if an := strings.Fields(key)[0]; an != accName {
continue
}
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
subj := string(sub.subject)
if !route.canExport(subj) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
route.enqueueProto(infoJSON)
// Now send SUB and UNSUB protocols as needed.
route.sendRouteSubProtos(subsNeedSUB, false, nil)
route.sendRouteUnSubProtos(subsNeedUNSUB, false, nil)
route.mu.Unlock()
}
route.mu.Unlock()
// Remove as a batch all the subs that we have removed from each route.
sl.RemoveBatch(deleteRoutedSubs)
}
return true
})

// Now go over all accounts and invoke the update function defined above.
s.accounts.Range(func(_, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
accName, sl, poolIdx := acc.Name, acc.sl, acc.routePoolIdx
acc.mu.RUnlock()
if sl == nil {
return true
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
for _, route := range routes {
route.mu.Lock()
route.enqueueProto(infoJSON)
// Now send SUB and UNSUB protocols as needed.
if subs, ok := subsNeedSUB[route]; ok && len(subs) > 0 {
route.sendRouteSubProtos(subs, false, nil)
}
var remotes []*client
for _, r := range routes {
r.mu.Lock()
if (poolIdx >= 0 && poolIdx == r.route.poolIdx) || (string(r.route.accName) == accName) {
remotes = append(remotes, r)
}
r.mu.Unlock()
if unsubs, ok := subsNeedUNSUB[route]; ok && len(unsubs) > 0 {
route.sendRouteUnSubProtos(unsubs, false, nil)
}
update(accName, sl, remotes)
return true
})
route.mu.Unlock()
}
}

func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options) {
Expand Down
160 changes: 157 additions & 3 deletions server/reload_test.go
Expand Up @@ -4525,12 +4525,12 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
poolSize string
accounts string
}{
{"regular", _EMPTY_, _EMPTY_},
{"regular", "pool_size: -1", _EMPTY_},
{"pooling", "pool_size: 5", _EMPTY_},
{"per-account", _EMPTY_, "accounts: [\"A\"]"},
{"pool and per-account", "pool_size: 3", "accounts: [\"A\"]"},
} {
t.Run(test.name, func(t *testing.T) {
t.Run("import "+test.name, func(t *testing.T) {
confATemplate := `
server_name: "A"
port: -1
Expand Down Expand Up @@ -4604,6 +4604,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
natsFlush(t, ncA)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)

ncB := natsConnect(t, srvb.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncB.Close()
Expand All @@ -4613,7 +4614,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
if expected {
natsNexMsg(t, sub, time.Second)
} else {
if msg, err := sub.NextMsg(250 * time.Millisecond); err == nil {
if msg, err := sub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Should not have gotten the message, got %s/%s", msg.Subject, msg.Data)
}
}
Expand All @@ -4634,6 +4635,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {

checkClusterFormed(t, srva, srvb)

checkSubNoInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubInterest(t, srvb, "A", "bar", 2*time.Second)

// Should not receive on foo
Expand All @@ -4653,6 +4655,158 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
checkClusterFormed(t, srva, srvb)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)

// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo3"))
check(sub1Foo, true)
check(sub2Foo, true)
// But make sure there are no more than what we expect
check(sub1Foo, false)
check(sub2Foo, false)

// And now "bar" should fail
natsPub(t, ncB, "bar", []byte("bar3"))
check(sub1Bar, false)
check(sub2Bar, false)
})
}
// Check export now
for _, test := range []struct {
name string
poolSize string
accounts string
}{
{"regular", "pool_size: -1", _EMPTY_},
{"pooling", "pool_size: 5", _EMPTY_},
{"per-account", _EMPTY_, "accounts: [\"A\"]"},
{"pool and per-account", "pool_size: 3", "accounts: [\"A\"]"},
} {
t.Run("export "+test.name, func(t *testing.T) {
confATemplate := `
server_name: "A"
port: -1
accounts {
A { users: [{user: "user1", password: "pwd"}] }
B { users: [{user: "user2", password: "pwd"}] }
C { users: [{user: "user3", password: "pwd"}] }
D { users: [{user: "user4", password: "pwd"}] }
}
cluster {
name: "local"
listen: 127.0.0.1:-1
permissions {
import {
allow: ">"
}
export {
allow: %s
}
}
%s
%s
}
`
confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `"foo"`, test.poolSize, test.accounts)))
srva, optsA := RunServerWithConfig(confA)
defer srva.Shutdown()

confBTemplate := `
server_name: "B"
port: -1
accounts {
A { users: [{user: "user1", password: "pwd"}] }
B { users: [{user: "user2", password: "pwd"}] }
C { users: [{user: "user3", password: "pwd"}] }
D { users: [{user: "user4", password: "pwd"}] }
}
cluster {
listen: 127.0.0.1:-1
name: "local"
permissions {
import {
allow: ">"
}
export {
allow: %s
}
}
routes = [
"nats://127.0.0.1:%d"
]
%s
%s
}
`
confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, `"foo"`, optsA.Cluster.Port, test.poolSize, test.accounts)))
srvb, _ := RunServerWithConfig(confB)
defer srvb.Shutdown()

checkClusterFormed(t, srva, srvb)

ncA := natsConnect(t, srva.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncA.Close()

sub1Foo := natsSubSync(t, ncA, "foo")
sub2Foo := natsSubSync(t, ncA, "foo")

sub1Bar := natsSubSync(t, ncA, "bar")
sub2Bar := natsSubSync(t, ncA, "bar")

natsFlush(t, ncA)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)

ncB := natsConnect(t, srvb.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncB.Close()

check := func(sub *nats.Subscription, expected bool) {
t.Helper()
if expected {
natsNexMsg(t, sub, time.Second)
} else {
if msg, err := sub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Should not have gotten the message, got %s/%s", msg.Subject, msg.Data)
}
}
}

// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo1"))
check(sub1Foo, true)
check(sub2Foo, true)

// But not on "bar"
natsPub(t, ncB, "bar", []byte("bar1"))
check(sub1Bar, false)
check(sub2Bar, false)

reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`, test.poolSize, test.accounts))
reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(confBTemplate, `["foo", "bar"]`, optsA.Cluster.Port, test.poolSize, test.accounts))

checkClusterFormed(t, srva, srvb)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubInterest(t, srvb, "A", "bar", 2*time.Second)

// Should receive on foo and bar
natsPub(t, ncB, "foo", []byte("foo2"))
check(sub1Foo, true)
check(sub2Foo, true)

natsPub(t, ncB, "bar", []byte("bar2"))
check(sub1Bar, true)
check(sub2Bar, true)

// Remove "bar"
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"foo"`, test.poolSize, test.accounts))
reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(confBTemplate, `"foo"`, optsA.Cluster.Port, test.poolSize, test.accounts))

checkClusterFormed(t, srva, srvb)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)

// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo3"))
Expand Down

0 comments on commit c9d6900

Please sign in to comment.