Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] ServiceImport Reply Optimizations #4591

Merged
merged 4 commits into from Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 11 additions & 8 deletions server/accounts.go
Expand Up @@ -1702,21 +1702,19 @@ func (a *Account) addReverseRespMapEntry(acc *Account, reply, from string) {
// This will be called from checkForReverseEntry when the reply arg is a wildcard subject.
// This will usually be called in a go routine since we need to walk all the entries.
func (a *Account) checkForReverseEntries(reply string, checkInterest, recursed bool) {
a.mu.RLock()
if len(a.imports.rrMap) == 0 {
a.mu.RUnlock()
if subjectIsLiteral(reply) {
a._checkForReverseEntry(reply, nil, checkInterest, recursed)
return
}

if subjectIsLiteral(reply) {
a.mu.RLock()
if len(a.imports.rrMap) == 0 {
a.mu.RUnlock()
a._checkForReverseEntry(reply, nil, checkInterest, recursed)
return
}

var _rs [64]string
rs := _rs[:0]

if n := len(a.imports.rrMap); n > cap(rs) {
rs = make([]string, 0, n)
}
Expand All @@ -1726,9 +1724,14 @@ func (a *Account) checkForReverseEntries(reply string, checkInterest, recursed b
}
a.mu.RUnlock()

// subjectIsSubsetMatch is heavy so make sure we do this without the lock.
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], reply)

rsa := [32]string{}
for _, r := range rs {
if subjectIsSubsetMatch(r, reply) {
rts := tokenizeSubjectIntoSlice(rsa[:0], r)
// isSubsetMatchTokenized is heavy so make sure we do this without the lock.
if isSubsetMatchTokenized(rts, tts) {
a._checkForReverseEntry(r, nil, checkInterest, recursed)
}
}
Expand Down
30 changes: 23 additions & 7 deletions server/client.go
Expand Up @@ -3014,6 +3014,10 @@ func queueMatches(queue string, qsubs [][]*subscription) bool {

// Low level unsubscribe for a given client.
func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool) {
if s := c.srv; s != nil && s.isShuttingDown() {
return
}

c.mu.Lock()
if !force && sub.max > 0 && sub.nm < sub.max {
c.Debugf(
Expand Down Expand Up @@ -3067,7 +3071,8 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
}

// Now check to see if this was part of a respMap entry for service imports.
if acc != nil {
// We can skip subscriptions on reserved replies.
if acc != nil && !isReservedReply(sub.subject) {
acc.checkForReverseEntry(string(sub.subject), nil, true)
}
}
Expand Down Expand Up @@ -5077,6 +5082,23 @@ func (c *client) closeConnection(reason ClosedState) {
c.out.stc = nil
}

// If we have remote latency tracking running shut that down.
if c.rrTracking != nil {
c.rrTracking.ptmr.Stop()
c.rrTracking = nil
}

// If we are shutting down, no need to do all the accounting on subs, etc.
if reason == ServerShutdown {
s := c.srv
c.mu.Unlock()
if s != nil {
// Unregister
s.removeClient(c)
}
return
}

var (
kind = c.kind
srv = c.srv
Expand All @@ -5101,12 +5123,6 @@ func (c *client) closeConnection(reason ClosedState) {
spoke = c.isSpokeLeafNode()
}

// If we have remote latency tracking running shut that down.
if c.rrTracking != nil {
c.rrTracking.ptmr.Stop()
c.rrTracking = nil
}

c.mu.Unlock()

// Remove client's or leaf node or jetstream subscriptions.
Expand Down
10 changes: 5 additions & 5 deletions server/gateway.go
Expand Up @@ -473,6 +473,10 @@ func (s *Server) startGateways() {
// This starts the gateway accept loop in a go routine, unless it
// is detected that the server has already been shutdown.
func (s *Server) startGatewayAcceptLoop() {
if s.isShuttingDown() {
return
}

// Snapshot server options.
opts := s.getOpts()

Expand All @@ -482,10 +486,6 @@ func (s *Server) startGatewayAcceptLoop() {
}

s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
hp := net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(port))
l, e := natsListen("tcp", hp)
s.gatewayListenerErr = e
Expand Down Expand Up @@ -1575,7 +1575,7 @@ func (s *Server) addGatewayURL(urlStr string) bool {
// Returns true if the URL has been removed, false otherwise.
// Server lock held on entry
func (s *Server) removeGatewayURL(urlStr string) bool {
if s.shutdown {
if s.isShuttingDown() {
return false
}
s.gateway.Lock()
Expand Down
8 changes: 6 additions & 2 deletions server/jetstream_cluster.go
Expand Up @@ -198,11 +198,15 @@ func (s *Server) trackedJetStreamServers() (js, total int) {
}

func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
if s.isShuttingDown() {
return nil, nil
}

s.mu.RLock()
shutdown, js := s.shutdown, s.js
js := s.js
s.mu.RUnlock()

if shutdown || js == nil {
if js == nil {
return nil, nil
}

Expand Down
8 changes: 4 additions & 4 deletions server/leafnode.go
Expand Up @@ -681,11 +681,11 @@ func (s *Server) startLeafNodeAcceptLoop() {
port = 0
}

s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
if s.isShuttingDown() {
return
}

s.mu.Lock()
hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
l, e := natsListen("tcp", hp)
s.leafNodeListenerErr = e
Expand Down Expand Up @@ -878,7 +878,7 @@ func (s *Server) addLeafNodeURL(urlStr string) bool {
func (s *Server) removeLeafNodeURL(urlStr string) bool {
// Don't need to do this if we are removing the route connection because
// we are shuting down...
if s.shutdown {
if s.isShuttingDown() {
return false
}
if s.leafURLsMap.removeUrl(urlStr) {
Expand Down
10 changes: 5 additions & 5 deletions server/mqtt.go
Expand Up @@ -425,6 +425,10 @@ type mqttParsedPublishNATSHeader struct {
}

func (s *Server) startMQTT() {
if s.isShuttingDown() {
return
}

sopts := s.getOpts()
o := &sopts.MQTT

Expand All @@ -437,10 +441,6 @@ func (s *Server) startMQTT() {
}
hp := net.JoinHostPort(o.Host, strconv.Itoa(port))
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
s.mqtt.sessmgr.sessions = make(map[string]*mqttAccountSessionManager)
hl, err = net.Listen("tcp", hp)
s.mqtt.listenerErr = err
Expand Down Expand Up @@ -500,7 +500,7 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client {

s.mu.Lock()
if !s.isRunning() || s.ldm {
if s.shutdown {
if s.isShuttingDown() {
conn.Close()
}
s.mu.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions server/route.go
Expand Up @@ -977,7 +977,7 @@ func (s *Server) updateRemoteRoutePerms(c *client, info *Info) {
func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) {
// If there are no clients supporting async INFO protocols, we are done.
// Also don't send if we are shutting down...
if s.cproto == 0 || s.shutdown {
if s.cproto == 0 || s.isShuttingDown() {
return
}
info := s.copyInfo()
Expand Down Expand Up @@ -2302,6 +2302,10 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
// is detected that the server has already been shutdown.
// It will also start soliciting explicit routes.
func (s *Server) startRouteAcceptLoop() {
if s.isShuttingDown() {
return
}

// Snapshot server options.
opts := s.getOpts()

Expand All @@ -2316,10 +2320,6 @@ func (s *Server) startRouteAcceptLoop() {
clusterName := s.ClusterName()

s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
s.Noticef("Cluster name is %s", clusterName)
if s.isClusterNameDynamic() {
s.Warnf("Cluster name was dynamically generated, consider setting one")
Expand Down
2 changes: 0 additions & 2 deletions server/routes_test.go
Expand Up @@ -2777,9 +2777,7 @@ func TestRoutePoolAndPerAccountWithServiceLatencyNoDataRace(t *testing.T) {
defer nc.Close()

// The service listener.
// serviceTime := 25 * time.Millisecond
natsSub(t, nc, "req.echo", func(msg *nats.Msg) {
// time.Sleep(serviceTime)
msg.Respond(msg.Data)
})

Expand Down