Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Use new server read locks. #4053

Merged
merged 1 commit into from Apr 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 13 additions & 13 deletions server/route.go
@@ -1,4 +1,4 @@
// Copyright 2013-2022 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -530,7 +530,7 @@ func (c *client) processRouteInfo(info *Info) {
remoteID := c.route.remoteID

// Check if this is an INFO for gateways...
if info.Gateway != "" {
if info.Gateway != _EMPTY_ {
c.mu.Unlock()
// If this server has no gateway configured, report error and return.
if !s.gateway.enabled {
Expand All @@ -545,7 +545,7 @@ func (c *client) processRouteInfo(info *Info) {

// We receive an INFO from a server that informs us about another server,
// so the info.ID in the INFO protocol does not match the ID of this route.
if remoteID != "" && remoteID != info.ID {
if remoteID != _EMPTY_ && remoteID != info.ID {
c.mu.Unlock()

// Process this implicit route. We will check that it is not an explicit
Expand Down Expand Up @@ -653,7 +653,7 @@ func (c *client) processRouteInfo(info *Info) {
// The incoming INFO from the route will have IP set
// if it has Cluster.Advertise. In that case, use that
// otherwise construct it from the remote TCP address.
if info.IP == "" {
if info.IP == _EMPTY_ {
// Need to get the remote IP address.
c.mu.Lock()
switch conn := c.nc.(type) {
Expand Down Expand Up @@ -1035,7 +1035,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
acc = v.(*Account)
}
if acc == nil {
isNew := false
// if the option of retrieving accounts later exists, create an expired one.
// When a client comes along, expiration will prevent it from being used,
// cause a fetch and update the account to what is should be.
Expand All @@ -1045,6 +1044,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
}
c.Debugf("Unknown account %q for remote subject %q", accountName, sub.subject)

var isNew bool
if acc, isNew = srv.LookupOrRegisterAccount(accountName); isNew {
acc.mu.Lock()
acc.expired = true
Expand Down Expand Up @@ -1163,12 +1163,12 @@ func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *s
// complete interest for all subjects, both normal as a binary
// and queue group weights.
func (s *Server) sendSubsToRoute(route *client) {
s.mu.Lock()
// Estimated size of all protocols. It does not have to be accurate at all.
eSize := 0
// Send over our account subscriptions.
// copy accounts into array first
var eSize int
// Copy of accounts.
accs := make([]*Account, 0, 32)

s.mu.RLock()
s.accounts.Range(func(k, v interface{}) bool {
a := v.(*Account)
accs = append(accs, a)
Expand All @@ -1188,7 +1188,7 @@ func (s *Server) sendSubsToRoute(route *client) {
a.mu.RUnlock()
return true
})
s.mu.Unlock()
s.mu.RUnlock()

buf := make([]byte, 0, eSize)

Expand Down Expand Up @@ -1446,7 +1446,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
sendInfo = len(s.routes) > 1

// If the INFO contains a Gateway URL, add it to the list for our cluster.
if info.GatewayURL != "" && s.addGatewayURL(info.GatewayURL) {
if info.GatewayURL != _EMPTY_ && s.addGatewayURL(info.GatewayURL) {
s.sendAsyncGatewayInfo()
}
}
Expand Down Expand Up @@ -1589,12 +1589,12 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
var _routes [32]*client
routes := _routes[:0]

s.mu.Lock()
s.mu.RLock()
for _, route := range s.routes {
routes = append(routes, route)
}
trace := atomic.LoadInt32(&s.logging.trace) == 1
s.mu.Unlock()
s.mu.RUnlock()

// If we are a queue subscriber we need to make sure our updates are serialized from
// potential multiple connections. We want to make sure that the order above is preserved
Expand Down