Skip to content

Commit

Permalink
[FIXED] Subscription interest issue due to configuration reload (#4130)
Browse files Browse the repository at this point in the history
This would impact only cases with accounts defined in configuration file
(as opposed to operator mode). During the configuration reload, new
accounts and sublists were created to later be replaced with existing
ones. That left a window of time where a subscription could have been
added (or attempted to be removed) from the "wrong" sublist. This could
lead to route subscriptions seemingly not being forwarded.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
derekcollison committed May 3, 2023
2 parents b61e411 + 8a4ead2 commit 793db74
Show file tree
Hide file tree
Showing 15 changed files with 444 additions and 236 deletions.
92 changes: 86 additions & 6 deletions server/accounts.go
Expand Up @@ -262,8 +262,7 @@ func (a *Account) String() string {

// Used to create shallow copies of accounts for transfer
// from opts to real accounts in server struct.
func (a *Account) shallowCopy() *Account {
na := NewAccount(a.Name)
func (a *Account) shallowCopy(na *Account) {
na.Nkey = a.Nkey
na.Issuer = a.Issuer

Expand Down Expand Up @@ -303,12 +302,14 @@ func (a *Account) shallowCopy() *Account {
}
}
}
na.mappings = a.mappings
if len(na.mappings) > 0 && na.prand == nil {
na.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// JetStream
na.jsLimits = a.jsLimits
// Server config account limits.
na.limits = a.limits

return na
}

// nextEventID uses its own lock for better concurrency.
Expand Down Expand Up @@ -2834,7 +2835,12 @@ func (a *Account) checkStreamImportsEqual(b *Account) bool {
return true
}

// Returns true if `a` and `b` stream exports are the same.
// Acquires `a` read lock, but `b` is assumed to not be accessed
// by anyone but the caller (`b` is not registered anywhere).
func (a *Account) checkStreamExportsEqual(b *Account) bool {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.exports.streams) != len(b.exports.streams) {
return false
}
Expand All @@ -2843,14 +2849,29 @@ func (a *Account) checkStreamExportsEqual(b *Account) bool {
if !ok {
return false
}
if !reflect.DeepEqual(aea, bea) {
if !isStreamExportEqual(aea, bea) {
return false
}
}
return true
}

func isStreamExportEqual(a, b *streamExport) bool {
if a == nil && b == nil {
return true
}
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
return isExportAuthEqual(&a.exportAuth, &b.exportAuth)
}

// Returns true if `a` and `b` service exports are the same.
// Acquires `a` read lock, but `b` is assumed to not be accessed
// by anyone but the caller (`b` is not registered anywhere).
func (a *Account) checkServiceExportsEqual(b *Account) bool {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.exports.services) != len(b.exports.services) {
return false
}
Expand All @@ -2859,7 +2880,66 @@ func (a *Account) checkServiceExportsEqual(b *Account) bool {
if !ok {
return false
}
if !reflect.DeepEqual(aea, bea) {
if !isServiceExportEqual(aea, bea) {
return false
}
}
return true
}

func isServiceExportEqual(a, b *serviceExport) bool {
if a == nil && b == nil {
return true
}
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
if !isExportAuthEqual(&a.exportAuth, &b.exportAuth) {
return false
}
if a.acc.Name != b.acc.Name {
return false
}
if a.respType != b.respType {
return false
}
if a.latency != nil || b.latency != nil {
if (a.latency != nil && b.latency == nil) || (a.latency == nil && b.latency != nil) {
return false
}
if a.latency.sampling != b.latency.sampling {
return false
}
if a.latency.subject != b.latency.subject {
return false
}
}
return true
}

// Returns true if `a` and `b` exportAuth structures are equal.
// Both `a` and `b` are guaranteed to be non-nil.
// Locking is handled by the caller.
func isExportAuthEqual(a, b *exportAuth) bool {
if a.tokenReq != b.tokenReq {
return false
}
if a.accountPos != b.accountPos {
return false
}
if len(a.approved) != len(b.approved) {
return false
}
for ak, av := range a.approved {
if bv, ok := b.approved[ak]; !ok || av.Name != bv.Name {
return false
}
}
if len(a.actsRevoked) != len(b.actsRevoked) {
return false
}
for ak, av := range a.actsRevoked {
if bv, ok := b.actsRevoked[ak]; !ok || av != bv {
return false
}
}
Expand Down
5 changes: 3 additions & 2 deletions server/auth.go
Expand Up @@ -171,13 +171,14 @@ func (p *Permissions) clone() *Permissions {
// Lock is assumed held.
func (s *Server) checkAuthforWarnings() {
warn := false
if s.opts.Password != _EMPTY_ && !isBcrypt(s.opts.Password) {
opts := s.getOpts()
if opts.Password != _EMPTY_ && !isBcrypt(opts.Password) {
warn = true
}
for _, u := range s.users {
// Skip warn if using TLS certs based auth
// unless a password has been left in the config.
if u.Password == _EMPTY_ && s.opts.TLSMap {
if u.Password == _EMPTY_ && opts.TLSMap {
continue
}
// Check if this is our internal sys client created on the fly.
Expand Down
15 changes: 11 additions & 4 deletions server/client.go
Expand Up @@ -4732,12 +4732,19 @@ func (c *client) kindString() string {
// an older one.
func (c *client) swapAccountAfterReload() {
c.mu.Lock()
defer c.mu.Unlock()
if c.srv == nil {
srv := c.srv
an := c.acc.GetName()
c.mu.Unlock()
if srv == nil {
return
}
acc, _ := c.srv.LookupAccount(c.acc.Name)
c.acc = acc
if acc, _ := srv.LookupAccount(an); acc != nil {
c.mu.Lock()
if c.acc != acc {
c.acc = acc
}
c.mu.Unlock()
}
}

// processSubsOnConfigReload removes any subscriptions the client has that are no
Expand Down
65 changes: 0 additions & 65 deletions server/client_test.go
Expand Up @@ -1543,71 +1543,6 @@ func TestClientOutboundQueueCoalesce(t *testing.T) {
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

var before runtime.MemStats
var after runtime.MemStats

var err error
clients := make([]*nats.Conn, 50000)
wait := &sync.WaitGroup{}
wait.Add(len(clients))

for i := 0; i < len(clients); i++ {
clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer clients[i].Close()

clients[i].Subscribe("test", func(m *nats.Msg) {
wait.Done()
})
}

runtime.GC()
runtime.ReadMemStats(&before)

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

var m [48000]byte
if err = nc.Publish("test", m[:]); err != nil {
t.Fatal(err)
}

wait.Wait()

runtime.GC()
runtime.ReadMemStats(&after)

hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc)
ms := float64(len(m))
diff := float64(ha) - float64(hb)
inc := (diff / float64(hb)) * 100

fmt.Printf("Message size: %.1fKB\n", ms/1024)
fmt.Printf("Subscribed clients: %d\n", len(clients))
fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024)
fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024)
fmt.Printf("Heap allocs delta: %.1f%%\n", inc)

// TODO: What threshold makes sense here for a failure?
/*
if inc > 10 {
t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc)
}
*/
}

func TestClientTraceRace(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
Expand Down
2 changes: 1 addition & 1 deletion server/events.go
Expand Up @@ -1717,7 +1717,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub

// This will import any system level exports.
func (s *Server) registerSystemImports(a *Account) {
if a == nil || !s.eventsEnabled() {
if a == nil || !s.EventsEnabled() {
return
}
sacc := s.SystemAccount()
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -725,7 +725,7 @@ func (js *jetStream) setupMetaGroup() error {

// If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint,
// we want to move to observer mode so that we extend the solicited cluster or supercluster but do not form our own.
cfg.Observer = s.canExtendOtherDomain() && s.opts.JetStreamExtHint != jsNoExtend
cfg.Observer = s.canExtendOtherDomain() && s.getOpts().JetStreamExtHint != jsNoExtend

var bootstrap bool
if ps, err := readPeerState(storeDir); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/leafnode.go
Expand Up @@ -697,7 +697,7 @@ func (s *Server) startLeafNodeAcceptLoop() {
s.leafNodeInfo = info
// Possibly override Host/Port and set IP based on Cluster.Advertise
if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", s.opts.LeafNode.Advertise, err)
s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
l.Close()
s.mu.Unlock()
return
Expand Down
2 changes: 1 addition & 1 deletion server/nkey.go
Expand Up @@ -34,7 +34,7 @@ func (s *Server) NonceRequired() bool {
// nonceRequired tells us if we should send a nonce.
// Lock should be held on entry.
func (s *Server) nonceRequired() bool {
return s.opts.AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil
return s.getOpts().AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil
}

// Generate a nonce for INFO challenge.
Expand Down
69 changes: 67 additions & 2 deletions server/norace_test.go
Expand Up @@ -5839,7 +5839,7 @@ func TestNoRaceEncodeConsumerStateBug(t *testing.T) {
}

// Performance impact on stream ingress with large number of consumers.
func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
func TestNoRaceJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
skip(t)

s := RunBasicJetStreamServer(t)
Expand Down Expand Up @@ -5931,7 +5931,7 @@ func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
}

// Performance impact on large number of consumers but sparse delivery.
func TestJetStreamLargeNumConsumersSparseDelivery(t *testing.T) {
func TestNoRaceJetStreamLargeNumConsumersSparseDelivery(t *testing.T) {
skip(t)

s := RunBasicJetStreamServer(t)
Expand Down Expand Up @@ -7864,3 +7864,68 @@ func TestNoRaceJetStreamClusterLeafnodeConnectPerf(t *testing.T) {
nc.Close()
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestNoRaceClientOutboundQueueMemory(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

var before runtime.MemStats
var after runtime.MemStats

var err error
clients := make([]*nats.Conn, 50000)
wait := &sync.WaitGroup{}
wait.Add(len(clients))

for i := 0; i < len(clients); i++ {
clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer clients[i].Close()

clients[i].Subscribe("test", func(m *nats.Msg) {
wait.Done()
})
}

runtime.GC()
runtime.ReadMemStats(&before)

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

var m [48000]byte
if err = nc.Publish("test", m[:]); err != nil {
t.Fatal(err)
}

wait.Wait()

runtime.GC()
runtime.ReadMemStats(&after)

hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc)
ms := float64(len(m))
diff := float64(ha) - float64(hb)
inc := (diff / float64(hb)) * 100

fmt.Printf("Message size: %.1fKB\n", ms/1024)
fmt.Printf("Subscribed clients: %d\n", len(clients))
fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024)
fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024)
fmt.Printf("Heap allocs delta: %.1f%%\n", inc)

// TODO: What threshold makes sense here for a failure?
/*
if inc > 10 {
t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc)
}
*/
}

0 comments on commit 793db74

Please sign in to comment.