Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolver: improve signaling for missing account lookups #4151

Merged
merged 6 commits into from May 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 54 additions & 21 deletions server/accounts.go
Expand Up @@ -21,6 +21,7 @@ import (
"hash/fnv"
"hash/maphash"
"io"
"io/fs"
"math"
"math/rand"
"net/http"
Expand Down Expand Up @@ -3990,20 +3991,23 @@ func (dr *DirAccResolver) Start(s *Server) error {
defer dr.Unlock()
dr.Server = s
dr.operator = opKeys
fetchTimeout := dr.fetchTimeout
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); ok {
if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
s.Errorf("DirResolver - Update got error on load: %v", err)
} else {
acc := v.(*Account)
if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
s.Errorf("DirResolver - Update for account %q resulted in error %v", pubKey, err)
} else {
if _, jsa, err := acc.checkForJetStream(); err != nil {
s.Warnf("error checking for JetStream enabled error %v", err)
if !IsNatsErr(err, JSNotEnabledForAccountErr) {
s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err)
}
} else if jsa == nil {
if err = s.configJetStream(acc); err != nil {
s.Errorf("updated resulted in error when configuring JetStream %v", err)
s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err)
}
}
}
Expand All @@ -4024,7 +4028,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("jwt update skipped due to bad subject %q", subj)
s.Debugf("DirResolver - jwt update skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down Expand Up @@ -4074,31 +4078,40 @@ func (dr *DirAccResolver) Start(s *Server) error {
if len(tk) != accLookupReqTokens {
return
}
if theJWT, err := dr.DirJWTStore.LoadAcc(tk[accReqAccIndex]); err != nil {
s.Errorf("Merging resulted in error: %v", err)
accName := tk[accReqAccIndex]
if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil {
if errors.Is(err, fs.ErrNotExist) {
s.Debugf("DirResolver - Could not find account %q", accName)
// Reply with empty response to signal absence of JWT to others,
// but not too fast to mitigate beating actual responses with content.
time.Sleep(fetchTimeout / 10)
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil)
} else {
s.Errorf("DirResolver - Error looking up account %q: %v", accName, err)
}
} else {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(theJWT))
}
}); err != nil {
return fmt.Errorf("error setting up lookup request handling: %v", err)
}
// respond to pack requests with one or more pack messages
// an empty message signifies the end of the response responder
// an empty message signifies the end of the response responder.
if _, err := s.sysSubscribeQ(accPackReqSubj, "responder", func(_ *subscription, _ *client, _ *Account, _, reply string, theirHash []byte) {
if reply == _EMPTY_ {
return
}
ourHash := dr.DirJWTStore.Hash()
if bytes.Equal(theirHash, ourHash[:]) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
s.Debugf("pack request matches hash %x", ourHash[:])
s.Debugf("DirResolver - Pack request matches hash %x", ourHash[:])
} else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(partialPackMsg))
}); err != nil {
// let them timeout
s.Errorf("pack request error: %v", err)
s.Errorf("DirResolver - Pack request error: %v", err)
} else {
s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.Debugf("DirResolver - Pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
}
}); err != nil {
Expand All @@ -4119,12 +4132,12 @@ func (dr *DirAccResolver) Start(s *Server) error {
if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) {
hash := dr.DirJWTStore.Hash()
if len(msg) == 0 { // end of response stream
s.Debugf("Merging Finished and resulting in: %x", dr.DirJWTStore.Hash())
s.Debugf("DirResolver - Merging finished and resulting in: %x", dr.DirJWTStore.Hash())
return
} else if err := dr.DirJWTStore.Merge(string(msg)); err != nil {
s.Errorf("Merging resulted in error: %v", err)
s.Errorf("DirResolver - Merging resulted in error: %v", err)
} else {
s.Debugf("Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
s.Debugf("DirResolver - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
}
}); err != nil {
return fmt.Errorf("error setting up pack response handling: %v", err)
Expand All @@ -4142,7 +4155,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
case <-ticker.C:
}
ourHash := dr.DirJWTStore.Hash()
s.Debugf("Checking store state: %x", ourHash)
s.Debugf("DirResolver - Checking store state: %x", ourHash)
s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:])
}
})
Expand Down Expand Up @@ -4223,20 +4236,38 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
respC := make(chan []byte, 1)
accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name)
s.mu.Lock()
// Resolver will wait for available routes + gateways to reply
// before serving an error in case there weren't any found.
expectedServers := len(s.routes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server maps all other servers in a different data struct when it received remote statsz updates etc.
Could use that since if this is an expanded supercluster with leafnodes sharing system accounts this will not account for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep in mind when merging to dev, one then should use s.numRoutes() because it accounts for pooled routes, per-account, etc...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using s.sys.servers instead for the check now.

if s.gateway != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is always non nil IIRC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, instead the test could be:

if s.gateway.enabled {
..

There is currently no API to get the number of remotes, you could add one, but otherwise you should lock:

    s.gateway.RLock()
    expectedServers += len(s.gateway.remotes)
    s.gateway.RUnlock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use s.sys.servers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to use s.sys.servers to use the detected active servers instead.

expectedServers += len(s.gateway.remotes)
}
if s.sys == nil || s.sys.replies == nil {
s.mu.Unlock()
return _EMPTY_, fmt.Errorf("eventing shut down")
}
replySubj := s.newRespInbox()
replies := s.sys.replies

// Store our handler.
replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
clone := make([]byte, len(msg))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for the nil responses here?

var clone []byte
if len(msg) > 0 {
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this check to do the copy when not empty.

copy(clone, msg)

s.mu.Lock()
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
expectedServers--
if len(msg) == 0 {
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
// Skip empty responses until getting all the available servers.
if expectedServers > 0 {
s.mu.Unlock()
return
}
}
// Use the first valid response if there is still interest or
// one of the empty responses to signal that it was not found.
if _, ok := replies[replySubj]; ok {
select {
case respC <- clone: // only use first response and only if there is still interest
case respC <- clone:
default:
}
}
Expand All @@ -4253,7 +4284,9 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
case <-time.After(timeout):
err = errors.New("fetching jwt timed out")
case m := <-respC:
if err = res.Store(name, string(m)); err == nil {
if len(m) == 0 {
err = errors.New("account jwt not found")
} else if err = res.Store(name, string(m)); err == nil {
theJWT = string(m)
}
}
Expand Down Expand Up @@ -4291,9 +4324,9 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); !ok {
} else if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
s.Errorf("DirResolver - Update got error on load: %v", err)
} else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
s.Errorf("DirResolver - Update resulted in error %v", err)
}
}
dr.DirJWTStore.deleted = func(pubKey string) {
Expand All @@ -4309,7 +4342,7 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("jwt update cache skipped due to bad subject %q", subj)
s.Debugf("DirResolver - jwt update cache skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions server/dirstore.go
Expand Up @@ -288,6 +288,10 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string))
if err != nil {
return err
}
if len(jwtBytes) == 0 {
// Skip if no contents in the JWT.
return nil
}
if exp != nil {
claim, err := jwt.DecodeGeneric(string(jwtBytes))
if err == nil && claim.Expires > 0 && claim.Expires < time.Now().Unix() {
Expand Down Expand Up @@ -406,6 +410,9 @@ func (store *DirJWTStore) load(publicKey string) (string, error) {
// write that keeps hash of all jwt in sync
// Assumes the lock is held. Does return true or an error never both.
func (store *DirJWTStore) write(path string, publicKey string, theJWT string) (bool, error) {
if len(theJWT) == 0 {
return false, fmt.Errorf("invalid JWT")
}
var newHash *[sha256.Size]byte
if store.expiration != nil {
h := sha256.Sum256([]byte(theJWT))
Expand Down