Skip to content

Commit

Permalink
[IMPROVED] ServiceImport Reply Optimizations (#4591)
Browse files Browse the repository at this point in the history
We added some small performance tweak to the func
checkForReverseEntries. In addition, we move the shutdown bool for the
server to an atomic so we could efficiently check it when doing unsubs.
If the server is going away there is really no need since the other side
will do its own thing when the connection goes away. And finally we do
not have to range over the account rrMap if the subscription going away
is a reserved reply.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 27, 2023
2 parents 1700f56 + 75236a5 commit 4c17eeb
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 78 deletions.
19 changes: 11 additions & 8 deletions server/accounts.go
Expand Up @@ -1702,21 +1702,19 @@ func (a *Account) addReverseRespMapEntry(acc *Account, reply, from string) {
// This will be called from checkForReverseEntry when the reply arg is a wildcard subject.
// This will usually be called in a go routine since we need to walk all the entries.
func (a *Account) checkForReverseEntries(reply string, checkInterest, recursed bool) {
a.mu.RLock()
if len(a.imports.rrMap) == 0 {
a.mu.RUnlock()
if subjectIsLiteral(reply) {
a._checkForReverseEntry(reply, nil, checkInterest, recursed)
return
}

if subjectIsLiteral(reply) {
a.mu.RLock()
if len(a.imports.rrMap) == 0 {
a.mu.RUnlock()
a._checkForReverseEntry(reply, nil, checkInterest, recursed)
return
}

var _rs [64]string
rs := _rs[:0]

if n := len(a.imports.rrMap); n > cap(rs) {
rs = make([]string, 0, n)
}
Expand All @@ -1726,9 +1724,14 @@ func (a *Account) checkForReverseEntries(reply string, checkInterest, recursed b
}
a.mu.RUnlock()

// subjectIsSubsetMatch is heavy so make sure we do this without the lock.
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], reply)

rsa := [32]string{}
for _, r := range rs {
if subjectIsSubsetMatch(r, reply) {
rts := tokenizeSubjectIntoSlice(rsa[:0], r)
// isSubsetMatchTokenized is heavy so make sure we do this without the lock.
if isSubsetMatchTokenized(rts, tts) {
a._checkForReverseEntry(r, nil, checkInterest, recursed)
}
}
Expand Down
30 changes: 23 additions & 7 deletions server/client.go
Expand Up @@ -3014,6 +3014,10 @@ func queueMatches(queue string, qsubs [][]*subscription) bool {

// Low level unsubscribe for a given client.
func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool) {
if s := c.srv; s != nil && s.isShuttingDown() {
return
}

c.mu.Lock()
if !force && sub.max > 0 && sub.nm < sub.max {
c.Debugf(
Expand Down Expand Up @@ -3067,7 +3071,8 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
}

// Now check to see if this was part of a respMap entry for service imports.
if acc != nil {
// We can skip subscriptions on reserved replies.
if acc != nil && !isReservedReply(sub.subject) {
acc.checkForReverseEntry(string(sub.subject), nil, true)
}
}
Expand Down Expand Up @@ -5077,6 +5082,23 @@ func (c *client) closeConnection(reason ClosedState) {
c.out.stc = nil
}

// If we have remote latency tracking running shut that down.
if c.rrTracking != nil {
c.rrTracking.ptmr.Stop()
c.rrTracking = nil
}

// If we are shutting down, no need to do all the accounting on subs, etc.
if reason == ServerShutdown {
s := c.srv
c.mu.Unlock()
if s != nil {
// Unregister
s.removeClient(c)
}
return
}

var (
kind = c.kind
srv = c.srv
Expand All @@ -5101,12 +5123,6 @@ func (c *client) closeConnection(reason ClosedState) {
spoke = c.isSpokeLeafNode()
}

// If we have remote latency tracking running shut that down.
if c.rrTracking != nil {
c.rrTracking.ptmr.Stop()
c.rrTracking = nil
}

c.mu.Unlock()

// Remove client's or leaf node or jetstream subscriptions.
Expand Down
10 changes: 5 additions & 5 deletions server/gateway.go
Expand Up @@ -473,6 +473,10 @@ func (s *Server) startGateways() {
// This starts the gateway accept loop in a go routine, unless it
// is detected that the server has already been shutdown.
func (s *Server) startGatewayAcceptLoop() {
if s.isShuttingDown() {
return
}

// Snapshot server options.
opts := s.getOpts()

Expand All @@ -482,10 +486,6 @@ func (s *Server) startGatewayAcceptLoop() {
}

s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
hp := net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(port))
l, e := natsListen("tcp", hp)
s.gatewayListenerErr = e
Expand Down Expand Up @@ -1575,7 +1575,7 @@ func (s *Server) addGatewayURL(urlStr string) bool {
// Returns true if the URL has been removed, false otherwise.
// Server lock held on entry
func (s *Server) removeGatewayURL(urlStr string) bool {
if s.shutdown {
if s.isShuttingDown() {
return false
}
s.gateway.Lock()
Expand Down
8 changes: 6 additions & 2 deletions server/jetstream_cluster.go
Expand Up @@ -198,11 +198,15 @@ func (s *Server) trackedJetStreamServers() (js, total int) {
}

func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
if s.isShuttingDown() {
return nil, nil
}

s.mu.RLock()
shutdown, js := s.shutdown, s.js
js := s.js
s.mu.RUnlock()

if shutdown || js == nil {
if js == nil {
return nil, nil
}

Expand Down
8 changes: 4 additions & 4 deletions server/leafnode.go
Expand Up @@ -681,11 +681,11 @@ func (s *Server) startLeafNodeAcceptLoop() {
port = 0
}

s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
if s.isShuttingDown() {
return
}

s.mu.Lock()
hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
l, e := natsListen("tcp", hp)
s.leafNodeListenerErr = e
Expand Down Expand Up @@ -878,7 +878,7 @@ func (s *Server) addLeafNodeURL(urlStr string) bool {
func (s *Server) removeLeafNodeURL(urlStr string) bool {
// Don't need to do this if we are removing the route connection because
// we are shuting down...
if s.shutdown {
if s.isShuttingDown() {
return false
}
if s.leafURLsMap.removeUrl(urlStr) {
Expand Down
10 changes: 5 additions & 5 deletions server/mqtt.go
Expand Up @@ -425,6 +425,10 @@ type mqttParsedPublishNATSHeader struct {
}

func (s *Server) startMQTT() {
if s.isShuttingDown() {
return
}

sopts := s.getOpts()
o := &sopts.MQTT

Expand All @@ -437,10 +441,6 @@ func (s *Server) startMQTT() {
}
hp := net.JoinHostPort(o.Host, strconv.Itoa(port))
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
s.mqtt.sessmgr.sessions = make(map[string]*mqttAccountSessionManager)
hl, err = net.Listen("tcp", hp)
s.mqtt.listenerErr = err
Expand Down Expand Up @@ -500,7 +500,7 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client {

s.mu.Lock()
if !s.isRunning() || s.ldm {
if s.shutdown {
if s.isShuttingDown() {
conn.Close()
}
s.mu.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions server/route.go
Expand Up @@ -977,7 +977,7 @@ func (s *Server) updateRemoteRoutePerms(c *client, info *Info) {
func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) {
// If there are no clients supporting async INFO protocols, we are done.
// Also don't send if we are shutting down...
if s.cproto == 0 || s.shutdown {
if s.cproto == 0 || s.isShuttingDown() {
return
}
info := s.copyInfo()
Expand Down Expand Up @@ -2302,6 +2302,10 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
// is detected that the server has already been shutdown.
// It will also start soliciting explicit routes.
func (s *Server) startRouteAcceptLoop() {
if s.isShuttingDown() {
return
}

// Snapshot server options.
opts := s.getOpts()

Expand All @@ -2316,10 +2320,6 @@ func (s *Server) startRouteAcceptLoop() {
clusterName := s.ClusterName()

s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
s.Noticef("Cluster name is %s", clusterName)
if s.isClusterNameDynamic() {
s.Warnf("Cluster name was dynamically generated, consider setting one")
Expand Down
2 changes: 0 additions & 2 deletions server/routes_test.go
Expand Up @@ -2777,9 +2777,7 @@ func TestRoutePoolAndPerAccountWithServiceLatencyNoDataRace(t *testing.T) {
defer nc.Close()

// The service listener.
// serviceTime := 25 * time.Millisecond
natsSub(t, nc, "req.echo", func(msg *nats.Msg) {
// time.Sleep(serviceTime)
msg.Respond(msg.Data)
})

Expand Down

0 comments on commit 4c17eeb

Please sign in to comment.