Skip to content

Commit

Permalink
Merge pull request #739 from nats-io/move-account-lookup
Browse files Browse the repository at this point in the history
[changed] JetStream function to not lookup account info & moved it
  • Loading branch information
matthiashanel committed Jun 2, 2021
2 parents da90d22 + efed691 commit 1267a88
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 30 deletions.
21 changes: 1 addition & 20 deletions js.go
Expand Up @@ -167,6 +167,7 @@ const (
)

// JetStream returns a JetStreamContext for messaging and stream management.
// Errors are only returned if inconsistent options are provided.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{
nc: nc,
Expand All @@ -181,26 +182,6 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
return nil, err
}
}

// If we have check recently we can avoid another account lookup here.
// We want these to be lighweight and created at will.
nc.mu.Lock()
now := time.Now()
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
if checkAccount {
nc.jsLastCheck = now
}
nc.mu.Unlock()

if checkAccount {
if _, err := js.AccountInfo(); err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return nil, err
}
}

return js, nil
}

Expand Down
6 changes: 6 additions & 0 deletions jsm.go
Expand Up @@ -170,6 +170,8 @@ type accountInfoResponse struct {
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
Expand All @@ -181,6 +183,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {

resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
if err != nil {
// todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return nil, err
}
var info accountInfoResponse
Expand Down
3 changes: 0 additions & 3 deletions nats.go
Expand Up @@ -496,9 +496,6 @@ type Conn struct {
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix

// JetStream Contexts last account check.
jsLastCheck time.Time
}

type natsReader struct {
Expand Down
29 changes: 22 additions & 7 deletions test/js_test.go
Expand Up @@ -42,7 +42,11 @@ func TestJetStreamNotEnabled(t *testing.T) {
}
defer nc.Close()

if _, err := nc.JetStream(); err != nats.ErrJetStreamNotEnabled {
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Got error during initialization %v", err)
}
if _, err = js.AccountInfo(); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Did not get the proper error, got %v", err)
}
}
Expand Down Expand Up @@ -73,7 +77,11 @@ func TestJetStreamNotAccountEnabled(t *testing.T) {
}
defer nc.Close()

if _, err := nc.JetStream(); err != nats.ErrJetStreamNotEnabled {
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Got error during initialization %v", err)
}
if _, err = js.AccountInfo(); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Did not get the proper error, got %v", err)
}
}
Expand Down Expand Up @@ -1835,7 +1843,7 @@ func TestJetStreamManagement(t *testing.T) {
if info.Limits.MaxConsumers != -1 {
t.Errorf("Expected to not have consumer limits, got: %v", info.Limits.MaxConsumers)
}
if info.API.Total != 15 {
if info.API.Total != 14 {
t.Errorf("Expected 15 API calls, got: %v", info.API.Total)
}
if info.API.Errors != 1 {
Expand All @@ -1861,6 +1869,7 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) {
}
defer nc.Close()

// constructor
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -4113,12 +4122,14 @@ func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream *

timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
var jsm nats.JetStreamManager
jsm, err = nc.JetStream()
jsm, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}
_, err = jsm.AccountInfo()
if err != nil {
// Backoff for a bit until cluster and resources are ready.
time.Sleep(500 * time.Millisecond)
continue
}

_, err = jsm.AddStream(stream)
Expand All @@ -4140,7 +4151,11 @@ func waitForJSReady(t *testing.T, nc *nats.Conn) {
var err error
timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
_, err = nc.JetStream()
js, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}
_, err = js.AccountInfo()
if err != nil {
// Backoff for a bit until cluster ready.
time.Sleep(250 * time.Millisecond)
Expand Down

0 comments on commit 1267a88

Please sign in to comment.