Skip to content

Commit

Permalink
Update not found delay to be 1/10th of fetch timeout
Browse files Browse the repository at this point in the history
Also update logs based on comments

Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed May 12, 2023
1 parent bffebd4 commit f58d3d0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
36 changes: 18 additions & 18 deletions server/accounts.go
Expand Up @@ -3995,19 +3995,19 @@ func (dr *DirAccResolver) Start(s *Server) error {
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); ok {
if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("RESOLVER - Update got error on load: %v", err)
s.Errorf("DirResolver - Update got error on load: %v", err)
} else {
acc := v.(*Account)
if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil {
s.Errorf("RESOLVER - Update for account %q resulted in error %v", pubKey, err)
s.Errorf("DirResolver - Update for account %q resulted in error %v", pubKey, err)
} else {
if _, jsa, err := acc.checkForJetStream(); err != nil {
if !IsNatsErr(err, JSNotEnabledForAccountErr) {
s.Warnf("RESOLVER - Error checking for JetStream support for account %q: %v", pubKey, err)
s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err)
}
} else if jsa == nil {
if err = s.configJetStream(acc); err != nil {
s.Errorf("RESOLVER - Error configuring JetStream for account %q: %v", pubKey, err)
s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err)
}
}
}
Expand All @@ -4028,7 +4028,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("RESOLVER - JWT update skipped due to bad subject %q", subj)
s.Debugf("DirResolver - jwt update skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down Expand Up @@ -4081,15 +4081,15 @@ func (dr *DirAccResolver) Start(s *Server) error {
accName := tk[accReqAccIndex]
if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil {
if errors.Is(err, fs.ErrNotExist) {
s.Debugf("RESOLVER - Could not find account %q", accName)
s.Debugf("DirResolver - Could not find account %q", accName)
go func() {
// Reply with empty response to signal absence of JWT to others,
// but not too fast to mitigate beating actual responses with content.
time.Sleep(fetchTimeout / 2)
time.Sleep(fetchTimeout / 10)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil)
}()
} else {
s.Errorf("RESOLVER - Error looking up account %q: %v", accName, err)
s.Errorf("DirResolver - Error looking up account %q: %v", accName, err)
}
} else {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(theJWT))
Expand All @@ -4106,14 +4106,14 @@ func (dr *DirAccResolver) Start(s *Server) error {
ourHash := dr.DirJWTStore.Hash()
if bytes.Equal(theirHash, ourHash[:]) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
s.Debugf("RESOLVER - Pack request matches hash %x", ourHash[:])
s.Debugf("DirResolver - Pack request matches hash %x", ourHash[:])
} else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(partialPackMsg))
}); err != nil {
// let them timeout
s.Errorf("RESOLVER - Pack request error: %v", err)
s.Errorf("DirResolver - Pack request error: %v", err)
} else {
s.Debugf("RESOLVER - Pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.Debugf("DirResolver - Pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
}
}); err != nil {
Expand All @@ -4134,12 +4134,12 @@ func (dr *DirAccResolver) Start(s *Server) error {
if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) {
hash := dr.DirJWTStore.Hash()
if len(msg) == 0 { // end of response stream
s.Debugf("RESOLVER - Merging finished and resulting in: %x", dr.DirJWTStore.Hash())
s.Debugf("DirResolver - Merging finished and resulting in: %x", dr.DirJWTStore.Hash())
return
} else if err := dr.DirJWTStore.Merge(string(msg)); err != nil {
s.Errorf("RESOLVER - Merging resulted in error: %v", err)
s.Errorf("DirResolver - Merging resulted in error: %v", err)
} else {
s.Debugf("RESOLVER - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
s.Debugf("DirResolver - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
}
}); err != nil {
return fmt.Errorf("error setting up pack response handling: %v", err)
Expand All @@ -4157,7 +4157,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
case <-ticker.C:
}
ourHash := dr.DirJWTStore.Hash()
s.Debugf("RESOLVER - Checking store state: %x", ourHash)
s.Debugf("DirResolver - Checking store state: %x", ourHash)
s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:])
}
})
Expand Down Expand Up @@ -4312,9 +4312,9 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); !ok {
} else if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("RESOLVER - Update got error on load: %v", err)
s.Errorf("DirResolver - Update got error on load: %v", err)
} else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil {
s.Errorf("RESOLVER - Update resulted in error %v", err)
s.Errorf("DirResolver - Update resulted in error %v", err)
}
}
dr.DirJWTStore.deleted = func(pubKey string) {
Expand All @@ -4330,7 +4330,7 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("RESOLVER - JWT update cache skipped due to bad subject %q", subj)
s.Debugf("DirResolver - jwt update cache skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/jwt_test.go
Expand Up @@ -3747,7 +3747,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) {
`, ojwt, syspub, dirBA, sAA.opts.Gateway.Port)))
sBA, _ := RunServerWithConfig(confBA)
defer sBA.Shutdown()
// Create Sever BA (using no_advertise to prevent fail over)
// Create Server BA (using no_advertise to prevent fail over)
confBB := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
server_name: srv-B-B
Expand Down

0 comments on commit f58d3d0

Please sign in to comment.