Skip to content

Commit

Permalink
[IMPROVED] Use new server read locks. (#4053)
Browse files Browse the repository at this point in the history
 Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 14, 2023
2 parents 89fc7e3 + 0fe48fe commit e881656
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions server/route.go
@@ -1,4 +1,4 @@
// Copyright 2013-2022 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -530,7 +530,7 @@ func (c *client) processRouteInfo(info *Info) {
remoteID := c.route.remoteID

// Check if this is an INFO for gateways...
if info.Gateway != "" {
if info.Gateway != _EMPTY_ {
c.mu.Unlock()
// If this server has no gateway configured, report error and return.
if !s.gateway.enabled {
Expand All @@ -545,7 +545,7 @@ func (c *client) processRouteInfo(info *Info) {

// We receive an INFO from a server that informs us about another server,
// so the info.ID in the INFO protocol does not match the ID of this route.
if remoteID != "" && remoteID != info.ID {
if remoteID != _EMPTY_ && remoteID != info.ID {
c.mu.Unlock()

// Process this implicit route. We will check that it is not an explicit
Expand Down Expand Up @@ -653,7 +653,7 @@ func (c *client) processRouteInfo(info *Info) {
// The incoming INFO from the route will have IP set
// if it has Cluster.Advertise. In that case, use that
// otherwise construct it from the remote TCP address.
if info.IP == "" {
if info.IP == _EMPTY_ {
// Need to get the remote IP address.
c.mu.Lock()
switch conn := c.nc.(type) {
Expand Down Expand Up @@ -1035,7 +1035,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
acc = v.(*Account)
}
if acc == nil {
isNew := false
// if the option of retrieving accounts later exists, create an expired one.
// When a client comes along, expiration will prevent it from being used,
// cause a fetch and update the account to what is should be.
Expand All @@ -1045,6 +1044,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
}
c.Debugf("Unknown account %q for remote subject %q", accountName, sub.subject)

var isNew bool
if acc, isNew = srv.LookupOrRegisterAccount(accountName); isNew {
acc.mu.Lock()
acc.expired = true
Expand Down Expand Up @@ -1163,12 +1163,12 @@ func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *s
// complete interest for all subjects, both normal as a binary
// and queue group weights.
func (s *Server) sendSubsToRoute(route *client) {
s.mu.Lock()
// Estimated size of all protocols. It does not have to be accurate at all.
eSize := 0
// Send over our account subscriptions.
// copy accounts into array first
var eSize int
// Copy of accounts.
accs := make([]*Account, 0, 32)

s.mu.RLock()
s.accounts.Range(func(k, v interface{}) bool {
a := v.(*Account)
accs = append(accs, a)
Expand All @@ -1188,7 +1188,7 @@ func (s *Server) sendSubsToRoute(route *client) {
a.mu.RUnlock()
return true
})
s.mu.Unlock()
s.mu.RUnlock()

buf := make([]byte, 0, eSize)

Expand Down Expand Up @@ -1446,7 +1446,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
sendInfo = len(s.routes) > 1

// If the INFO contains a Gateway URL, add it to the list for our cluster.
if info.GatewayURL != "" && s.addGatewayURL(info.GatewayURL) {
if info.GatewayURL != _EMPTY_ && s.addGatewayURL(info.GatewayURL) {
s.sendAsyncGatewayInfo()
}
}
Expand Down Expand Up @@ -1589,12 +1589,12 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
var _routes [32]*client
routes := _routes[:0]

s.mu.Lock()
s.mu.RLock()
for _, route := range s.routes {
routes = append(routes, route)
}
trace := atomic.LoadInt32(&s.logging.trace) == 1
s.mu.Unlock()
s.mu.RUnlock()

// If we are a queue subscriber we need to make sure our updates are serialized from
// potential multiple connections. We want to make sure that the order above is preserved
Expand Down

0 comments on commit e881656

Please sign in to comment.