Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed cluster permissions configuration reload #4183

Merged
merged 2 commits into from May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
136 changes: 73 additions & 63 deletions server/reload.go
Expand Up @@ -2029,11 +2029,8 @@ func (s *Server) clientHasMovedToDifferentAccount(c *client) bool {
// import subjects.
func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
s.mu.Lock()
var (
infoJSON []byte
newPerms = s.getOpts().Cluster.Permissions
routes = make(map[uint64]*client, s.numRoutes())
)
newPerms := s.getOpts().Cluster.Permissions
routes := make(map[uint64]*client, s.numRoutes())
// Get all connected routes
s.forEachRoute(func(route *client) {
route.mu.Lock()
Expand All @@ -2048,8 +2045,7 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
s.routeInfo.Import = newPerms.Import
s.routeInfo.Export = newPerms.Export
}
// Copy the current route's INFO struct. We will need to modify it per-account
routeInfo := s.routeInfo
infoJSON := generateInfoJSON(&s.routeInfo)
s.mu.Unlock()

// Close connections for routes that don't understand async INFO.
Expand All @@ -2075,17 +2071,46 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
newPermsTester := &client{}
newPermsTester.setRoutePermissions(newPerms)

// For a given account and list of remotes, will send an INFO protocol so
// that remote updates its route permissions and will also send the RS+ or
// RS- for subscriptions that become or are no longer permitted.
update := func(accName string, sl *Sublist, remotes []*client) {
var (
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
subsNeedSUB []*subscription
subsNeedUNSUB []*subscription
deleteRoutedSubs []*subscription
)
var (
_localSubs [4096]*subscription
subsNeedSUB = map[*client][]*subscription{}
subsNeedUNSUB = map[*client][]*subscription{}
deleteRoutedSubs []*subscription
)

getRouteForAccount := func(accName string, poolIdx int) *client {
for _, r := range routes {
r.mu.Lock()
ok := (poolIdx >= 0 && poolIdx == r.route.poolIdx) || (string(r.route.accName) == accName) || r.route.noPool
r.mu.Unlock()
if ok {
return r
}
}
return nil
}

// First set the new permissions on all routes.
for _, route := range routes {
route.mu.Lock()
route.setRoutePermissions(newPerms)
route.mu.Unlock()
}

// Then, go over all accounts and gather local subscriptions that need to be
// sent over as SUB or removed as UNSUB, and routed subscriptions that need
// to be dropped due to export permissions.
s.accounts.Range(func(_, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
accName, sl, poolIdx := acc.Name, acc.sl, acc.routePoolIdx
acc.mu.RUnlock()
// Get the route handling this account. If no route or sublist, bail out.
route := getRouteForAccount(accName, poolIdx)
if route == nil || sl == nil {
return true
}
localSubs := _localSubs[:0]
sl.localSubs(&localSubs, false)

// Go through all local subscriptions
Expand All @@ -2097,64 +2122,49 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
if canImportNow {
// If we could not before, then will need to send a SUB protocol.
if !couldImportThen {
subsNeedSUB = append(subsNeedSUB, sub)
subsNeedSUB[route] = append(subsNeedSUB[route], sub)
}
} else if couldImportThen {
// We were previously able to import this sub, but now
// we can't so we need to send an UNSUB protocol
subsNeedUNSUB = append(subsNeedUNSUB, sub)
subsNeedUNSUB[route] = append(subsNeedUNSUB[route], sub)
}
}

for _, route := range remotes {
// We do this manually here, not invoke generateRouteInfoJSON()
routeInfo.RemoteAccount = accName
infoJSON = generateInfoJSON(&routeInfo)

route.mu.Lock()
route.setRoutePermissions(newPerms)
for _, sub := range route.subs {
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
subj := string(sub.subject)
if !route.canExport(subj) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
deleteRoutedSubs = deleteRoutedSubs[:0]
route.mu.Lock()
for key, sub := range route.subs {
if an := strings.Fields(key)[0]; an != accName {
continue
}
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
subj := string(sub.subject)
if !route.canExport(subj) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
route.enqueueProto(infoJSON)
// Now send SUB and UNSUB protocols as needed.
route.sendRouteSubProtos(subsNeedSUB, false, nil)
route.sendRouteUnSubProtos(subsNeedUNSUB, false, nil)
route.mu.Unlock()
}
route.mu.Unlock()
// Remove as a batch all the subs that we have removed from each route.
sl.RemoveBatch(deleteRoutedSubs)
}
return true
})

// Now go over all accounts and invoke the update function defined above.
s.accounts.Range(func(_, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
accName, sl, poolIdx := acc.Name, acc.sl, acc.routePoolIdx
acc.mu.RUnlock()
if sl == nil {
return true
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
for _, route := range routes {
route.mu.Lock()
route.enqueueProto(infoJSON)
// Now send SUB and UNSUB protocols as needed.
if subs, ok := subsNeedSUB[route]; ok && len(subs) > 0 {
route.sendRouteSubProtos(subs, false, nil)
}
var remotes []*client
for _, r := range routes {
r.mu.Lock()
if (poolIdx >= 0 && poolIdx == r.route.poolIdx) || (string(r.route.accName) == accName) {
remotes = append(remotes, r)
}
r.mu.Unlock()
if unsubs, ok := subsNeedUNSUB[route]; ok && len(unsubs) > 0 {
route.sendRouteUnSubProtos(unsubs, false, nil)
}
update(accName, sl, remotes)
return true
})
route.mu.Unlock()
}
}

func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options) {
Expand Down
160 changes: 157 additions & 3 deletions server/reload_test.go
Expand Up @@ -4525,12 +4525,12 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
poolSize string
accounts string
}{
{"regular", _EMPTY_, _EMPTY_},
{"regular", "pool_size: -1", _EMPTY_},
{"pooling", "pool_size: 5", _EMPTY_},
{"per-account", _EMPTY_, "accounts: [\"A\"]"},
{"pool and per-account", "pool_size: 3", "accounts: [\"A\"]"},
} {
t.Run(test.name, func(t *testing.T) {
t.Run("import "+test.name, func(t *testing.T) {
confATemplate := `
server_name: "A"
port: -1
Expand Down Expand Up @@ -4604,6 +4604,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
natsFlush(t, ncA)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)

ncB := natsConnect(t, srvb.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncB.Close()
Expand All @@ -4613,7 +4614,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
if expected {
natsNexMsg(t, sub, time.Second)
} else {
if msg, err := sub.NextMsg(250 * time.Millisecond); err == nil {
if msg, err := sub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Should not have gotten the message, got %s/%s", msg.Subject, msg.Data)
}
}
Expand All @@ -4634,6 +4635,7 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {

checkClusterFormed(t, srva, srvb)

checkSubNoInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubInterest(t, srvb, "A", "bar", 2*time.Second)

// Should not receive on foo
Expand All @@ -4653,6 +4655,158 @@ func TestConfigReloadRouteImportPermissionsWithAccounts(t *testing.T) {
checkClusterFormed(t, srva, srvb)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)

// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo3"))
check(sub1Foo, true)
check(sub2Foo, true)
// But make sure there are no more than what we expect
check(sub1Foo, false)
check(sub2Foo, false)

// And now "bar" should fail
natsPub(t, ncB, "bar", []byte("bar3"))
check(sub1Bar, false)
check(sub2Bar, false)
})
}
// Check export now
for _, test := range []struct {
name string
poolSize string
accounts string
}{
{"regular", "pool_size: -1", _EMPTY_},
{"pooling", "pool_size: 5", _EMPTY_},
{"per-account", _EMPTY_, "accounts: [\"A\"]"},
{"pool and per-account", "pool_size: 3", "accounts: [\"A\"]"},
} {
t.Run("export "+test.name, func(t *testing.T) {
confATemplate := `
server_name: "A"
port: -1
accounts {
A { users: [{user: "user1", password: "pwd"}] }
B { users: [{user: "user2", password: "pwd"}] }
C { users: [{user: "user3", password: "pwd"}] }
D { users: [{user: "user4", password: "pwd"}] }
}
cluster {
name: "local"
listen: 127.0.0.1:-1
permissions {
import {
allow: ">"
}
export {
allow: %s
}
}
%s
%s
}
`
confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `"foo"`, test.poolSize, test.accounts)))
srva, optsA := RunServerWithConfig(confA)
defer srva.Shutdown()

confBTemplate := `
server_name: "B"
port: -1
accounts {
A { users: [{user: "user1", password: "pwd"}] }
B { users: [{user: "user2", password: "pwd"}] }
C { users: [{user: "user3", password: "pwd"}] }
D { users: [{user: "user4", password: "pwd"}] }
}
cluster {
listen: 127.0.0.1:-1
name: "local"
permissions {
import {
allow: ">"
}
export {
allow: %s
}
}
routes = [
"nats://127.0.0.1:%d"
]
%s
%s
}
`
confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, `"foo"`, optsA.Cluster.Port, test.poolSize, test.accounts)))
srvb, _ := RunServerWithConfig(confB)
defer srvb.Shutdown()

checkClusterFormed(t, srva, srvb)

ncA := natsConnect(t, srva.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncA.Close()

sub1Foo := natsSubSync(t, ncA, "foo")
sub2Foo := natsSubSync(t, ncA, "foo")

sub1Bar := natsSubSync(t, ncA, "bar")
sub2Bar := natsSubSync(t, ncA, "bar")

natsFlush(t, ncA)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)

ncB := natsConnect(t, srvb.ClientURL(), nats.UserInfo("user1", "pwd"))
defer ncB.Close()

check := func(sub *nats.Subscription, expected bool) {
t.Helper()
if expected {
natsNexMsg(t, sub, time.Second)
} else {
if msg, err := sub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Should not have gotten the message, got %s/%s", msg.Subject, msg.Data)
}
}
}

// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo1"))
check(sub1Foo, true)
check(sub2Foo, true)

// But not on "bar"
natsPub(t, ncB, "bar", []byte("bar1"))
check(sub1Bar, false)
check(sub2Bar, false)

reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`, test.poolSize, test.accounts))
reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(confBTemplate, `["foo", "bar"]`, optsA.Cluster.Port, test.poolSize, test.accounts))

checkClusterFormed(t, srva, srvb)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubInterest(t, srvb, "A", "bar", 2*time.Second)

// Should receive on foo and bar
natsPub(t, ncB, "foo", []byte("foo2"))
check(sub1Foo, true)
check(sub2Foo, true)

natsPub(t, ncB, "bar", []byte("bar2"))
check(sub1Bar, true)
check(sub2Bar, true)

// Remove "bar"
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"foo"`, test.poolSize, test.accounts))
reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(confBTemplate, `"foo"`, optsA.Cluster.Port, test.poolSize, test.accounts))

checkClusterFormed(t, srva, srvb)

checkSubInterest(t, srvb, "A", "foo", 2*time.Second)
checkSubNoInterest(t, srvb, "A", "bar", 2*time.Second)

// Should receive on "foo"
natsPub(t, ncB, "foo", []byte("foo3"))
Expand Down