From efed691619ec9a7107fe24f6d264a685efa8ec15 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 1 Jun 2021 13:42:17 -0400 Subject: [PATCH] [changed] JetStream function to be a constructor only. Removded then lookup of account info. now already existing js.AccountInfo has to be called separately. Signed-off-by: Matthias Hanel --- js.go | 21 +-------------------- jsm.go | 6 ++++++ nats.go | 3 --- test/js_test.go | 31 +++++++++++++++++++++++-------- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/js.go b/js.go index 7b70bb1e1..d0fda20ce 100644 --- a/js.go +++ b/js.go @@ -167,6 +167,7 @@ const ( ) // JetStream returns a JetStreamContext for messaging and stream management. +// Errors are only returned if inconsistent options are provided. func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { js := &js{ nc: nc, @@ -181,26 +182,6 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { return nil, err } } - - // If we have check recently we can avoid another account lookup here. - // We want these to be lighweight and created at will. - nc.mu.Lock() - now := time.Now() - checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck - if checkAccount { - nc.jsLastCheck = now - } - nc.mu.Unlock() - - if checkAccount { - if _, err := js.AccountInfo(); err != nil { - if err == ErrNoResponders { - err = ErrJetStreamNotEnabled - } - return nil, err - } - } - return js, nil } diff --git a/jsm.go b/jsm.go index b6a3f8b7a..4d5457d09 100644 --- a/jsm.go +++ b/jsm.go @@ -170,6 +170,8 @@ type accountInfoResponse struct { } // AccountInfo retrieves info about the JetStream usage from the current account. +// If JetStream is not enabled, this will return ErrJetStreamNotEnabled +// Other errors can happen but are generally considered retryable func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { @@ -181,6 +183,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) if err != nil { + // todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had + if err == ErrNoResponders { + err = ErrJetStreamNotEnabled + } return nil, err } var info accountInfoResponse diff --git a/nats.go b/nats.go index 88f528109..170094692 100644 --- a/nats.go +++ b/nats.go @@ -496,9 +496,6 @@ type Conn struct { respMux *Subscription // A single response subscription respMap map[string]chan *Msg // Request map for the response msg channels respRand *rand.Rand // Used for generating suffix - - // JetStream Contexts last account check. - jsLastCheck time.Time } type natsReader struct { diff --git a/test/js_test.go b/test/js_test.go index 625ac81c6..d12fd08c7 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -42,7 +42,11 @@ func TestJetStreamNotEnabled(t *testing.T) { } defer nc.Close() - if _, err := nc.JetStream(); err != nats.ErrJetStreamNotEnabled { + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Got error during initialization %v", err) + } + if _, err = js.AccountInfo(); err != nats.ErrJetStreamNotEnabled { t.Fatalf("Did not get the proper error, got %v", err) } } @@ -73,7 +77,11 @@ func TestJetStreamNotAccountEnabled(t *testing.T) { } defer nc.Close() - if _, err := nc.JetStream(); err != nats.ErrJetStreamNotEnabled { + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Got error during initialization %v", err) + } + if _, err = js.AccountInfo(); err != nats.ErrJetStreamNotEnabled { t.Fatalf("Did not get the proper error, got %v", err) } } @@ -1835,7 +1843,7 @@ func TestJetStreamManagement(t *testing.T) { if info.Limits.MaxConsumers != -1 { t.Errorf("Expected to not have consumer limits, got: %v", info.Limits.MaxConsumers) } - if info.API.Total != 15 { + if info.API.Total != 14 { t.Errorf("Expected 15 API calls, got: %v", info.API.Total) } if info.API.Errors != 1 { @@ -1861,6 +1869,7 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) { } defer nc.Close() + // constructor js, err := nc.JetStream() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2401,7 +2410,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { _, err = js1.AddStream(&nats.StreamConfig{ Name: "TEST", - Replicas: 2, + Replicas: 1, }) if err != nil { t.Fatal(err) @@ -4113,12 +4122,14 @@ func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream * timeout := time.Now().Add(10 * time.Second) for time.Now().Before(timeout) { - var jsm nats.JetStreamManager - jsm, err = nc.JetStream() + jsm, err := nc.JetStream() + if err != nil { + t.Fatal(err) + } + _, err = jsm.AccountInfo() if err != nil { // Backoff for a bit until cluster and resources are ready. time.Sleep(500 * time.Millisecond) - continue } _, err = jsm.AddStream(stream) @@ -4140,7 +4151,11 @@ func waitForJSReady(t *testing.T, nc *nats.Conn) { var err error timeout := time.Now().Add(10 * time.Second) for time.Now().Before(timeout) { - _, err = nc.JetStream() + js, err := nc.JetStream() + if err != nil { + t.Fatal(err) + } + _, err = js.AccountInfo() if err != nil { // Backoff for a bit until cluster ready. time.Sleep(250 * time.Millisecond)