Skip to content

Commit

Permalink
resolver: wait for all available servers to respond on fetch
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed May 13, 2023
1 parent 811ef75 commit 94b8d0b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
24 changes: 18 additions & 6 deletions server/accounts.go
Expand Up @@ -4082,12 +4082,10 @@ func (dr *DirAccResolver) Start(s *Server) error {
if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil {
if errors.Is(err, fs.ErrNotExist) {
s.Debugf("DirResolver - Could not find account %q", accName)
go func() {
// Reply with empty response to signal absence of JWT to others,
// but not too fast to mitigate beating actual responses with content.
time.Sleep(fetchTimeout / 10)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil)
}()
// Reply with empty response to signal absence of JWT to others,
// but not too fast to mitigate beating actual responses with content.
time.Sleep(fetchTimeout / 10)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil)
} else {
s.Errorf("DirResolver - Error looking up account %q: %v", accName, err)
}
Expand Down Expand Up @@ -4238,6 +4236,12 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
respC := make(chan []byte, 1)
accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name)
s.mu.Lock()
// Resolver will wait for available routes + gateways to reply
// before serving an error in case there weren't any found.
expectedServers := len(s.routes)
if s.gateway != nil {
expectedServers += len(s.gateway.remotes)
}
if s.sys == nil || s.sys.replies == nil {
s.mu.Unlock()
return _EMPTY_, fmt.Errorf("eventing shut down")
Expand All @@ -4251,6 +4255,14 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
copy(clone, msg)

s.mu.Lock()
expectedServers--
if len(msg) == 0 {
// Skip empty responses until getting all the available servers.
if expectedServers > 0 {
s.mu.Unlock()
return
}
}
// Use the first valid response if there is still interest or
// one of the empty responses to signal that it was not found.
if _, ok := replies[replySubj]; ok {
Expand Down
2 changes: 1 addition & 1 deletion server/jwt_test.go
Expand Up @@ -6774,7 +6774,7 @@ func TestJWTAccountNATSResolverWrongCreds(t *testing.T) {

// Check that trying to connect with bad credentials should not hang until the fetch timeout
// and instead return a faster response when an account is not found.
_, err := nats.Connect(sC.ClientURL(), nats.UserCredentials(cCreds), nats.Timeout(1*time.Second))
_, err := nats.Connect(sC.ClientURL(), nats.UserCredentials(cCreds), nats.Timeout(500*time.Second))
if err != nil && !errors.Is(err, nats.ErrAuthorization) {
t.Fatalf("Expected auth error: %v", err)
}
Expand Down

0 comments on commit 94b8d0b

Please sign in to comment.