Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolver: improve signaling for missing account lookups #4151

Merged
merged 6 commits into from May 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 50 additions & 23 deletions server/accounts.go
Expand Up @@ -21,6 +21,7 @@ import (
"hash/fnv"
"hash/maphash"
"io"
"io/fs"
"math"
"math/rand"
"net/http"
Expand Down Expand Up @@ -3993,17 +3994,19 @@ func (dr *DirAccResolver) Start(s *Server) error {
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); ok {
if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
s.Errorf("DirResolver - Update got error on load: %v", err)
} else {
acc := v.(*Account)
if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
s.Errorf("DirResolver - Update for account %q resulted in error %v", pubKey, err)
} else {
if _, jsa, err := acc.checkForJetStream(); err != nil {
s.Warnf("error checking for JetStream enabled error %v", err)
if !IsNatsErr(err, JSNotEnabledForAccountErr) {
s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err)
}
} else if jsa == nil {
if err = s.configJetStream(acc); err != nil {
s.Errorf("updated resulted in error when configuring JetStream %v", err)
s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err)
}
}
}
Expand All @@ -4024,7 +4027,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("jwt update skipped due to bad subject %q", subj)
s.Debugf("DirResolver - jwt update skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down Expand Up @@ -4074,31 +4077,38 @@ func (dr *DirAccResolver) Start(s *Server) error {
if len(tk) != accLookupReqTokens {
return
}
if theJWT, err := dr.DirJWTStore.LoadAcc(tk[accReqAccIndex]); err != nil {
s.Errorf("Merging resulted in error: %v", err)
accName := tk[accReqAccIndex]
if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil {
if errors.Is(err, fs.ErrNotExist) {
s.Debugf("DirResolver - Could not find account %q", accName)
// Reply with empty response to signal absence of JWT to others.
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil)
} else {
s.Errorf("DirResolver - Error looking up account %q: %v", accName, err)
}
} else {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(theJWT))
}
}); err != nil {
return fmt.Errorf("error setting up lookup request handling: %v", err)
}
// respond to pack requests with one or more pack messages
// an empty message signifies the end of the response responder
// an empty message signifies the end of the response responder.
if _, err := s.sysSubscribeQ(accPackReqSubj, "responder", func(_ *subscription, _ *client, _ *Account, _, reply string, theirHash []byte) {
if reply == _EMPTY_ {
return
}
ourHash := dr.DirJWTStore.Hash()
if bytes.Equal(theirHash, ourHash[:]) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
s.Debugf("pack request matches hash %x", ourHash[:])
s.Debugf("DirResolver - Pack request matches hash %x", ourHash[:])
} else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(partialPackMsg))
}); err != nil {
// let them timeout
s.Errorf("pack request error: %v", err)
s.Errorf("DirResolver - Pack request error: %v", err)
} else {
s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.Debugf("DirResolver - Pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
}
}); err != nil {
Expand All @@ -4119,12 +4129,12 @@ func (dr *DirAccResolver) Start(s *Server) error {
if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) {
hash := dr.DirJWTStore.Hash()
if len(msg) == 0 { // end of response stream
s.Debugf("Merging Finished and resulting in: %x", dr.DirJWTStore.Hash())
s.Debugf("DirResolver - Merging finished and resulting in: %x", dr.DirJWTStore.Hash())
return
} else if err := dr.DirJWTStore.Merge(string(msg)); err != nil {
s.Errorf("Merging resulted in error: %v", err)
s.Errorf("DirResolver - Merging resulted in error: %v", err)
} else {
s.Debugf("Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
s.Debugf("DirResolver - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
}
}); err != nil {
return fmt.Errorf("error setting up pack response handling: %v", err)
Expand All @@ -4142,7 +4152,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
case <-ticker.C:
}
ourHash := dr.DirJWTStore.Hash()
s.Debugf("Checking store state: %x", ourHash)
s.Debugf("DirResolver - Checking store state: %x", ourHash)
s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:])
}
})
Expand Down Expand Up @@ -4227,20 +4237,35 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
s.mu.Unlock()
return _EMPTY_, fmt.Errorf("eventing shut down")
}
// Resolver will wait for detected active servers to reply
// before serving an error in case there weren't any found.
expectedServers := len(s.sys.servers)
replySubj := s.newRespInbox()
replies := s.sys.replies

// Store our handler.
replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
clone := make([]byte, len(msg))
copy(clone, msg)
var clone []byte
isEmpty := len(msg) == 0
if !isEmpty {
clone = make([]byte, len(msg))
copy(clone, msg)
}
s.mu.Lock()
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
defer s.mu.Unlock()
expectedServers--
// Skip empty responses until getting all the available servers.
if isEmpty && expectedServers > 0 {
return
}
// Use the first valid response if there is still interest or
// one of the empty responses to signal that it was not found.
if _, ok := replies[replySubj]; ok {
select {
case respC <- clone: // only use first response and only if there is still interest
case respC <- clone:
default:
}
}
s.mu.Unlock()
}
s.sendInternalMsg(accountLookupRequest, replySubj, nil, []byte{})
quit := s.quitCh
Expand All @@ -4253,7 +4278,9 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
case <-time.After(timeout):
err = errors.New("fetching jwt timed out")
case m := <-respC:
if err = res.Store(name, string(m)); err == nil {
if len(m) == 0 {
err = errors.New("account jwt not found")
} else if err = res.Store(name, string(m)); err == nil {
theJWT = string(m)
}
}
Expand Down Expand Up @@ -4291,9 +4318,9 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); !ok {
} else if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
s.Errorf("DirResolver - Update got error on load: %v", err)
} else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
s.Errorf("DirResolver - Update resulted in error %v", err)
}
}
dr.DirJWTStore.deleted = func(pubKey string) {
Expand All @@ -4309,7 +4336,7 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("jwt update cache skipped due to bad subject %q", subj)
s.Debugf("DirResolver - jwt update cache skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions server/dirstore.go
Expand Up @@ -288,6 +288,10 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string))
if err != nil {
return err
}
if len(jwtBytes) == 0 {
// Skip if no contents in the JWT.
return nil
}
if exp != nil {
claim, err := jwt.DecodeGeneric(string(jwtBytes))
if err == nil && claim.Expires > 0 && claim.Expires < time.Now().Unix() {
Expand Down Expand Up @@ -406,6 +410,9 @@ func (store *DirJWTStore) load(publicKey string) (string, error) {
// write that keeps hash of all jwt in sync
// Assumes the lock is held. Does return true or an error never both.
func (store *DirJWTStore) write(path string, publicKey string, theJWT string) (bool, error) {
if len(theJWT) == 0 {
return false, fmt.Errorf("invalid JWT")
}
var newHash *[sha256.Size]byte
if store.expiration != nil {
h := sha256.Sum256([]byte(theJWT))
Expand Down