Skip to content

Commit

Permalink
resolver: improve signaling for missing account lookups (nats-io#4151)
Browse files Browse the repository at this point in the history
When using the nats account resolver and a JWT is not found, the client could
often get an i/o timeout error due to not receiving a timely response
before the account resolver fetch request times out. Now instead
of waiting for the fetch request to timeout, a resolver without JWTs
will notify as well that it could not find a matching JWT, waiting for a
response from all active servers.

Also included in this PR is some cleanup to the logs emitted by the
resolver.

Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs authored and ReubenMathew committed May 30, 2023
1 parent 4925af0 commit 6d614b0
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 28 deletions.
73 changes: 50 additions & 23 deletions server/accounts.go
Expand Up @@ -21,6 +21,7 @@ import (
"hash/fnv"
"hash/maphash"
"io"
"io/fs"
"math"
"math/rand"
"net/http"
Expand Down Expand Up @@ -3993,17 +3994,19 @@ func (dr *DirAccResolver) Start(s *Server) error {
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); ok {
if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
s.Errorf("DirResolver - Update got error on load: %v", err)
} else {
acc := v.(*Account)
if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
s.Errorf("DirResolver - Update for account %q resulted in error %v", pubKey, err)
} else {
if _, jsa, err := acc.checkForJetStream(); err != nil {
s.Warnf("error checking for JetStream enabled error %v", err)
if !IsNatsErr(err, JSNotEnabledForAccountErr) {
s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err)
}
} else if jsa == nil {
if err = s.configJetStream(acc); err != nil {
s.Errorf("updated resulted in error when configuring JetStream %v", err)
s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err)
}
}
}
Expand All @@ -4024,7 +4027,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("jwt update skipped due to bad subject %q", subj)
s.Debugf("DirResolver - jwt update skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down Expand Up @@ -4074,31 +4077,38 @@ func (dr *DirAccResolver) Start(s *Server) error {
if len(tk) != accLookupReqTokens {
return
}
if theJWT, err := dr.DirJWTStore.LoadAcc(tk[accReqAccIndex]); err != nil {
s.Errorf("Merging resulted in error: %v", err)
accName := tk[accReqAccIndex]
if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil {
if errors.Is(err, fs.ErrNotExist) {
s.Debugf("DirResolver - Could not find account %q", accName)
// Reply with empty response to signal absence of JWT to others.
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil)
} else {
s.Errorf("DirResolver - Error looking up account %q: %v", accName, err)
}
} else {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(theJWT))
}
}); err != nil {
return fmt.Errorf("error setting up lookup request handling: %v", err)
}
// respond to pack requests with one or more pack messages
// an empty message signifies the end of the response responder
// an empty message signifies the end of the response responder.
if _, err := s.sysSubscribeQ(accPackReqSubj, "responder", func(_ *subscription, _ *client, _ *Account, _, reply string, theirHash []byte) {
if reply == _EMPTY_ {
return
}
ourHash := dr.DirJWTStore.Hash()
if bytes.Equal(theirHash, ourHash[:]) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
s.Debugf("pack request matches hash %x", ourHash[:])
s.Debugf("DirResolver - Pack request matches hash %x", ourHash[:])
} else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(partialPackMsg))
}); err != nil {
// let them timeout
s.Errorf("pack request error: %v", err)
s.Errorf("DirResolver - Pack request error: %v", err)
} else {
s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.Debugf("DirResolver - Pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
}
}); err != nil {
Expand All @@ -4119,12 +4129,12 @@ func (dr *DirAccResolver) Start(s *Server) error {
if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) {
hash := dr.DirJWTStore.Hash()
if len(msg) == 0 { // end of response stream
s.Debugf("Merging Finished and resulting in: %x", dr.DirJWTStore.Hash())
s.Debugf("DirResolver - Merging finished and resulting in: %x", dr.DirJWTStore.Hash())
return
} else if err := dr.DirJWTStore.Merge(string(msg)); err != nil {
s.Errorf("Merging resulted in error: %v", err)
s.Errorf("DirResolver - Merging resulted in error: %v", err)
} else {
s.Debugf("Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
s.Debugf("DirResolver - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
}
}); err != nil {
return fmt.Errorf("error setting up pack response handling: %v", err)
Expand All @@ -4142,7 +4152,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
case <-ticker.C:
}
ourHash := dr.DirJWTStore.Hash()
s.Debugf("Checking store state: %x", ourHash)
s.Debugf("DirResolver - Checking store state: %x", ourHash)
s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:])
}
})
Expand Down Expand Up @@ -4227,20 +4237,35 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
s.mu.Unlock()
return _EMPTY_, fmt.Errorf("eventing shut down")
}
// Resolver will wait for detected active servers to reply
// before serving an error in case there weren't any found.
expectedServers := len(s.sys.servers)
replySubj := s.newRespInbox()
replies := s.sys.replies

// Store our handler.
replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
clone := make([]byte, len(msg))
copy(clone, msg)
var clone []byte
isEmpty := len(msg) == 0
if !isEmpty {
clone = make([]byte, len(msg))
copy(clone, msg)
}
s.mu.Lock()
defer s.mu.Unlock()
expectedServers--
// Skip empty responses until getting all the available servers.
if isEmpty && expectedServers > 0 {
return
}
// Use the first valid response if there is still interest or
// one of the empty responses to signal that it was not found.
if _, ok := replies[replySubj]; ok {
select {
case respC <- clone: // only use first response and only if there is still interest
case respC <- clone:
default:
}
}
s.mu.Unlock()
}
s.sendInternalMsg(accountLookupRequest, replySubj, nil, []byte{})
quit := s.quitCh
Expand All @@ -4253,7 +4278,9 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
case <-time.After(timeout):
err = errors.New("fetching jwt timed out")
case m := <-respC:
if err = res.Store(name, string(m)); err == nil {
if len(m) == 0 {
err = errors.New("account jwt not found")
} else if err = res.Store(name, string(m)); err == nil {
theJWT = string(m)
}
}
Expand Down Expand Up @@ -4291,9 +4318,9 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); !ok {
} else if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
s.Errorf("DirResolver - Update got error on load: %v", err)
} else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
s.Errorf("DirResolver - Update resulted in error %v", err)
}
}
dr.DirJWTStore.deleted = func(pubKey string) {
Expand All @@ -4309,7 +4336,7 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("jwt update cache skipped due to bad subject %q", subj)
s.Debugf("DirResolver - jwt update cache skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions server/dirstore.go
Expand Up @@ -288,6 +288,10 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string))
if err != nil {
return err
}
if len(jwtBytes) == 0 {
// Skip if no contents in the JWT.
return nil
}
if exp != nil {
claim, err := jwt.DecodeGeneric(string(jwtBytes))
if err == nil && claim.Expires > 0 && claim.Expires < time.Now().Unix() {
Expand Down Expand Up @@ -406,6 +410,9 @@ func (store *DirJWTStore) load(publicKey string) (string, error) {
// write that keeps hash of all jwt in sync
// Assumes the lock is held. Does return true or an error never both.
func (store *DirJWTStore) write(path string, publicKey string, theJWT string) (bool, error) {
if len(theJWT) == 0 {
return false, fmt.Errorf("invalid JWT")
}
var newHash *[sha256.Size]byte
if store.expiration != nil {
h := sha256.Sum256([]byte(theJWT))
Expand Down

0 comments on commit 6d614b0

Please sign in to comment.