Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Subscription interest issue due to configuration reload #4130

Merged
merged 4 commits into from May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
92 changes: 86 additions & 6 deletions server/accounts.go
Expand Up @@ -262,8 +262,7 @@ func (a *Account) String() string {

// Used to create shallow copies of accounts for transfer
// from opts to real accounts in server struct.
func (a *Account) shallowCopy() *Account {
na := NewAccount(a.Name)
func (a *Account) shallowCopy(na *Account) {
na.Nkey = a.Nkey
na.Issuer = a.Issuer

Expand Down Expand Up @@ -303,12 +302,14 @@ func (a *Account) shallowCopy() *Account {
}
}
}
na.mappings = a.mappings
if len(na.mappings) > 0 && na.prand == nil {
na.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// JetStream
na.jsLimits = a.jsLimits
// Server config account limits.
na.limits = a.limits

return na
}

// nextEventID uses its own lock for better concurrency.
Expand Down Expand Up @@ -2834,7 +2835,12 @@ func (a *Account) checkStreamImportsEqual(b *Account) bool {
return true
}

// Returns true if `a` and `b` stream exports are the same.
// Acquires `a` read lock, but `b` is assumed to not be accessed
// by anyone but the caller (`b` is not registered anywhere).
func (a *Account) checkStreamExportsEqual(b *Account) bool {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.exports.streams) != len(b.exports.streams) {
return false
}
Expand All @@ -2843,14 +2849,29 @@ func (a *Account) checkStreamExportsEqual(b *Account) bool {
if !ok {
return false
}
if !reflect.DeepEqual(aea, bea) {
if !isStreamExportEqual(aea, bea) {
return false
}
}
return true
}

func isStreamExportEqual(a, b *streamExport) bool {
if a == nil && b == nil {
return true
}
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
return isExportAuthEqual(&a.exportAuth, &b.exportAuth)
}

// Returns true if `a` and `b` service exports are the same.
// Acquires `a` read lock, but `b` is assumed to not be accessed
// by anyone but the caller (`b` is not registered anywhere).
func (a *Account) checkServiceExportsEqual(b *Account) bool {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.exports.services) != len(b.exports.services) {
return false
}
Expand All @@ -2859,7 +2880,66 @@ func (a *Account) checkServiceExportsEqual(b *Account) bool {
if !ok {
return false
}
if !reflect.DeepEqual(aea, bea) {
if !isServiceExportEqual(aea, bea) {
return false
}
}
return true
}

func isServiceExportEqual(a, b *serviceExport) bool {
if a == nil && b == nil {
return true
}
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
if !isExportAuthEqual(&a.exportAuth, &b.exportAuth) {
return false
}
if a.acc.Name != b.acc.Name {
return false
}
if a.respType != b.respType {
return false
}
if a.latency != nil || b.latency != nil {
if (a.latency != nil && b.latency == nil) || (a.latency == nil && b.latency != nil) {
return false
}
if a.latency.sampling != b.latency.sampling {
return false
}
if a.latency.subject != b.latency.subject {
return false
}
}
return true
}

// Returns true if `a` and `b` exportAuth structures are equal.
// Both `a` and `b` are guaranteed to be non-nil.
// Locking is handled by the caller.
func isExportAuthEqual(a, b *exportAuth) bool {
if a.tokenReq != b.tokenReq {
return false
}
if a.accountPos != b.accountPos {
return false
}
if len(a.approved) != len(b.approved) {
return false
}
for ak, av := range a.approved {
if bv, ok := b.approved[ak]; !ok || av.Name != bv.Name {
return false
}
}
if len(a.actsRevoked) != len(b.actsRevoked) {
return false
}
for ak, av := range a.actsRevoked {
if bv, ok := b.actsRevoked[ak]; !ok || av != bv {
return false
}
}
Expand Down
5 changes: 3 additions & 2 deletions server/auth.go
Expand Up @@ -171,13 +171,14 @@ func (p *Permissions) clone() *Permissions {
// Lock is assumed held.
func (s *Server) checkAuthforWarnings() {
warn := false
if s.opts.Password != _EMPTY_ && !isBcrypt(s.opts.Password) {
opts := s.getOpts()
if opts.Password != _EMPTY_ && !isBcrypt(opts.Password) {
warn = true
}
for _, u := range s.users {
// Skip warn if using TLS certs based auth
// unless a password has been left in the config.
if u.Password == _EMPTY_ && s.opts.TLSMap {
if u.Password == _EMPTY_ && opts.TLSMap {
continue
}
// Check if this is our internal sys client created on the fly.
Expand Down
15 changes: 11 additions & 4 deletions server/client.go
Expand Up @@ -4732,12 +4732,19 @@ func (c *client) kindString() string {
// an older one.
func (c *client) swapAccountAfterReload() {
c.mu.Lock()
defer c.mu.Unlock()
if c.srv == nil {
srv := c.srv
an := c.acc.GetName()
c.mu.Unlock()
if srv == nil {
return
}
acc, _ := c.srv.LookupAccount(c.acc.Name)
c.acc = acc
if acc, _ := srv.LookupAccount(an); acc != nil {
c.mu.Lock()
if c.acc != acc {
c.acc = acc
}
c.mu.Unlock()
}
}

// processSubsOnConfigReload removes any subscriptions the client has that are no
Expand Down
65 changes: 0 additions & 65 deletions server/client_test.go
Expand Up @@ -1543,71 +1543,6 @@ func TestClientOutboundQueueCoalesce(t *testing.T) {
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

var before runtime.MemStats
var after runtime.MemStats

var err error
clients := make([]*nats.Conn, 50000)
wait := &sync.WaitGroup{}
wait.Add(len(clients))

for i := 0; i < len(clients); i++ {
clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer clients[i].Close()

clients[i].Subscribe("test", func(m *nats.Msg) {
wait.Done()
})
}

runtime.GC()
runtime.ReadMemStats(&before)

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

var m [48000]byte
if err = nc.Publish("test", m[:]); err != nil {
t.Fatal(err)
}

wait.Wait()

runtime.GC()
runtime.ReadMemStats(&after)

hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc)
ms := float64(len(m))
diff := float64(ha) - float64(hb)
inc := (diff / float64(hb)) * 100

fmt.Printf("Message size: %.1fKB\n", ms/1024)
fmt.Printf("Subscribed clients: %d\n", len(clients))
fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024)
fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024)
fmt.Printf("Heap allocs delta: %.1f%%\n", inc)

// TODO: What threshold makes sense here for a failure?
/*
if inc > 10 {
t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc)
}
*/
}

func TestClientTraceRace(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
Expand Down
2 changes: 1 addition & 1 deletion server/events.go
Expand Up @@ -1717,7 +1717,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub

// This will import any system level exports.
func (s *Server) registerSystemImports(a *Account) {
if a == nil || !s.eventsEnabled() {
if a == nil || !s.EventsEnabled() {
return
}
sacc := s.SystemAccount()
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -725,7 +725,7 @@ func (js *jetStream) setupMetaGroup() error {

// If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint,
// we want to move to observer mode so that we extend the solicited cluster or supercluster but do not form our own.
cfg.Observer = s.canExtendOtherDomain() && s.opts.JetStreamExtHint != jsNoExtend
cfg.Observer = s.canExtendOtherDomain() && s.getOpts().JetStreamExtHint != jsNoExtend

var bootstrap bool
if ps, err := readPeerState(storeDir); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/leafnode.go
Expand Up @@ -697,7 +697,7 @@ func (s *Server) startLeafNodeAcceptLoop() {
s.leafNodeInfo = info
// Possibly override Host/Port and set IP based on Cluster.Advertise
if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", s.opts.LeafNode.Advertise, err)
s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
l.Close()
s.mu.Unlock()
return
Expand Down
2 changes: 1 addition & 1 deletion server/nkey.go
Expand Up @@ -34,7 +34,7 @@ func (s *Server) NonceRequired() bool {
// nonceRequired tells us if we should send a nonce.
// Lock should be held on entry.
func (s *Server) nonceRequired() bool {
return s.opts.AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil
return s.getOpts().AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil
}

// Generate a nonce for INFO challenge.
Expand Down
69 changes: 67 additions & 2 deletions server/norace_test.go
Expand Up @@ -5839,7 +5839,7 @@ func TestNoRaceEncodeConsumerStateBug(t *testing.T) {
}

// Performance impact on stream ingress with large number of consumers.
func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
func TestNoRaceJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
skip(t)

s := RunBasicJetStreamServer(t)
Expand Down Expand Up @@ -5931,7 +5931,7 @@ func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
}

// Performance impact on large number of consumers but sparse delivery.
func TestJetStreamLargeNumConsumersSparseDelivery(t *testing.T) {
func TestNoRaceJetStreamLargeNumConsumersSparseDelivery(t *testing.T) {
skip(t)

s := RunBasicJetStreamServer(t)
Expand Down Expand Up @@ -7864,3 +7864,68 @@ func TestNoRaceJetStreamClusterLeafnodeConnectPerf(t *testing.T) {
nc.Close()
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestNoRaceClientOutboundQueueMemory(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

var before runtime.MemStats
var after runtime.MemStats

var err error
clients := make([]*nats.Conn, 50000)
wait := &sync.WaitGroup{}
wait.Add(len(clients))

for i := 0; i < len(clients); i++ {
clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer clients[i].Close()

clients[i].Subscribe("test", func(m *nats.Msg) {
wait.Done()
})
}

runtime.GC()
runtime.ReadMemStats(&before)

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

var m [48000]byte
if err = nc.Publish("test", m[:]); err != nil {
t.Fatal(err)
}

wait.Wait()

runtime.GC()
runtime.ReadMemStats(&after)

hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc)
ms := float64(len(m))
diff := float64(ha) - float64(hb)
inc := (diff / float64(hb)) * 100

fmt.Printf("Message size: %.1fKB\n", ms/1024)
fmt.Printf("Subscribed clients: %d\n", len(clients))
fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024)
fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024)
fmt.Printf("Heap allocs delta: %.1f%%\n", inc)

// TODO: What threshold makes sense here for a failure?
/*
if inc > 10 {
t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc)
}
*/
}