Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[changed] JetStream function to not lookup account info & moved it #739

Merged
merged 1 commit into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 1 addition & 20 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ const (
)

// JetStream returns a JetStreamContext for messaging and stream management.
// Errors are only returned if inconsistent options are provided.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{
nc: nc,
Expand All @@ -181,26 +182,6 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
return nil, err
}
}

// If we have check recently we can avoid another account lookup here.
// We want these to be lighweight and created at will.
nc.mu.Lock()
now := time.Now()
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
if checkAccount {
nc.jsLastCheck = now
}
nc.mu.Unlock()

if checkAccount {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that disappears, then above jsLastCheck should disappear too and be removed from the Conn object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, existing code is broken: if one calls JetStream() for the first time and say the overall call fails (JetStream not enabled, or custom prefix leads nowhere), then the second call to JetStream() would succeed because we would not check for the AccountInfo().. so this is bad. At the very least if "time check" is maintained, we should record jsLastCheck only on success. But still, it means that the next call to JetStream() may succeed even if the underlying AccountInfo() would have failed because we passed a bad prefix, etc..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. JetStream() is now a constructor only. because it can't reallly fail, we can use js.AccountInfo()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time check bug was me, and agree we should only stamp (if we keep it) on success.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not clear (and this is me, not the code) as to why we are doing this change vs an option like SkipAccountCheck().

This feels like it could jarring to only catch the you do not have JetStream enabled when you make a secondary call, but maybe that is ok here. Originally before domains and api prefixes it felt like the natural place to check this and return that error.

if _, err := js.AccountInfo(); err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return nil, err
}
}

return js, nil
}

Expand Down
6 changes: 6 additions & 0 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type accountInfoResponse struct {
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now has to be the case for every API call to the JS context, but I think that is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wallyqs we need to probably go through our public calls and make sure that is so

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes could happen if JS becomes disabled

// Other errors can happen but are generally considered retryable
func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
Expand All @@ -181,6 +183,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {

resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
if err != nil {
// todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return nil, err
}
var info accountInfoResponse
Expand Down
3 changes: 0 additions & 3 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,6 @@ type Conn struct {
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix

// JetStream Contexts last account check.
jsLastCheck time.Time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we all agreed that JS contexts should be lightweight and easy to create. In many tests I create multiple ones with different characteristics.

Question though, do we expect code to look like this all the time or folks will skip the AccountInfo and just check errors on the other API calls?

nc, _ := nc.Connect("demo.nats.io")
js, _ := nc.JetStream()
if _, err := js.AccountInfo(); err != nil {}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Help me understand: can the enabled/disabled nature of JetStream for this connection change overtime? Meaning, after calling JetStream() (and lets say we still do an account check inside, and it returned ok), is there a possibility that JS may no longer be enabled for this account? What if the connection reconnects to a server where this account does not have JS enabled, which likely would be misconfiguration?

If it can't change, then instead of a time check, it should be simply a "already checked and here is the result" type of things (say 2 booleans).

If it can change overtime and since all APIs will return "not enabled" if JS is or no longer is enabled, not sure what we gain by doing it inside JetStream(). Say you already have the context, but then JS is disabled, then the context is "valid" and yet all other APIs call will fail with "not enabled". Same if you recently got a context and we use time-check, then if we are within the window, JetStream() would return a context (which won't work when calling APIs), but if you happen to ask after the time-check has passed, this call would return "not enabled".

I am not against doing the account check inside JetStream() (with the time-check bug fixed), since it can give a "fail fast" behavior where if you get "not enabled", there is no need to proceed further, but it also can be non deterministic (as described above), and all APIs should be checked for success/failure anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to reload or jwt update Js can be legitimately enabled disabled on the fly.
We have code that turns a no responder into the same error, so flapping because of that can happen as well.
If you want to fail fast, perhaps for debugging, I'd suggest call js.AccountInfo.
I'd also suggest calling that function on retries. you fails for some reason now you can better determine if retrying makes sense or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for users that want to ensure that JS is ready and available, a bit more code would be needed which might be better to have outside of the creation of the JetStream() context:

nc, _ := nc.Connect("demo.nats.io")
js, _ := nc.JetStream()

ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()

// IsJSAvailable()
for {
        select {
        case <-ctx.Done():
                return
        default:
        }

	if _, err := js.AccountInfo(); err != nil {
		// Handle temporary errors:
		//
		// - Quorum not ready
		// - JS API not yet ready
		// - No responders (temporary while JS booting up)

		// Then: timeout when too many failures or no response
	}
}

There would be some retries when this happens for example and backoff until is ready.

}

type natsReader struct {
Expand Down
31 changes: 23 additions & 8 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func TestJetStreamNotEnabled(t *testing.T) {
}
defer nc.Close()

if _, err := nc.JetStream(); err != nats.ErrJetStreamNotEnabled {
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Got error during initialization %v", err)
}
if _, err = js.AccountInfo(); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Did not get the proper error, got %v", err)
}
}
Expand Down Expand Up @@ -73,7 +77,11 @@ func TestJetStreamNotAccountEnabled(t *testing.T) {
}
defer nc.Close()

if _, err := nc.JetStream(); err != nats.ErrJetStreamNotEnabled {
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Got error during initialization %v", err)
}
if _, err = js.AccountInfo(); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Did not get the proper error, got %v", err)
}
}
Expand Down Expand Up @@ -1835,7 +1843,7 @@ func TestJetStreamManagement(t *testing.T) {
if info.Limits.MaxConsumers != -1 {
t.Errorf("Expected to not have consumer limits, got: %v", info.Limits.MaxConsumers)
}
if info.API.Total != 15 {
if info.API.Total != 14 {
t.Errorf("Expected 15 API calls, got: %v", info.API.Total)
}
if info.API.Errors != 1 {
Expand All @@ -1861,6 +1869,7 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) {
}
defer nc.Close()

// constructor
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -2401,7 +2410,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {

_, err = js1.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 2,
Replicas: 1,
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -4113,12 +4122,14 @@ func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream *

timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
var jsm nats.JetStreamManager
jsm, err = nc.JetStream()
jsm, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}
_, err = jsm.AccountInfo()
if err != nil {
// Backoff for a bit until cluster and resources are ready.
time.Sleep(500 * time.Millisecond)
continue
}

_, err = jsm.AddStream(stream)
Expand All @@ -4140,7 +4151,11 @@ func waitForJSReady(t *testing.T, nc *nats.Conn) {
var err error
timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
_, err = nc.JetStream()
js, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}
_, err = js.AccountInfo()
if err != nil {
// Backoff for a bit until cluster ready.
time.Sleep(250 * time.Millisecond)
Expand Down