From 5e89fb460a7be8bfcc3a3ff52b6be443719cf107 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 24 May 2021 18:21:06 -0400 Subject: [PATCH 1/9] [added] option to not create durable on subscribe Signed-off-by: Matthias Hanel --- js.go | 49 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/js.go b/js.go index 7b70bb1e1..89d5b337f 100644 --- a/js.go +++ b/js.go @@ -183,21 +183,24 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { } // If we have check recently we can avoid another account lookup here. - // We want these to be lighweight and created at will. - nc.mu.Lock() - now := time.Now() - checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck - if checkAccount { - nc.jsLastCheck = now - } - nc.mu.Unlock() + // We want these to be lightweight and created at will. + // The account check is permanently disabled when the api prefix is set to anything but default + if js.opts.pre == defaultAPIPrefix { + nc.mu.Lock() + now := time.Now() + checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck + if checkAccount { + nc.jsLastCheck = now + } + nc.mu.Unlock() - if checkAccount { - if _, err := js.AccountInfo(); err != nil { - if err == ErrNoResponders { - err = ErrJetStreamNotEnabled + if checkAccount { + if _, err := js.AccountInfo(); err != nil { + if err == ErrNoResponders { + err = ErrJetStreamNotEnabled + } + return nil, err } - return nil, err } } @@ -1041,7 +1044,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // With an explicit durable name, then can lookup // the consumer to which it should be attaching to. consumer = o.cfg.Durable - if consumer != _EMPTY_ { + if consumer == _EMPTY_ && o.noDurCreate { + return nil, fmt.Errorf("nats: NoCreate requires a durable name") + } else if consumer != _EMPTY_ && !o.noDurCreate { // Only create in case there is no consumer already. info, err = js.ConsumerInfo(stream, consumer) if err != nil && err.Error() != "nats: consumer not found" { @@ -1049,7 +1054,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } - if info != nil { + if o.noDurCreate { + attached = true + deliver = subj + } else if info != nil { // Attach using the found consumer config. ccfg = &info.Config attached = true @@ -1351,6 +1359,8 @@ type subOpts struct { mack bool // For creating or updating. cfg *ConsumerConfig + // Assert consumer exists and do not create it + noDurCreate bool } // ManualAck disables auto ack functionality for async subscriptions. @@ -1376,6 +1386,15 @@ func Durable(name string) SubOpt { }) } +// NoDurableCreate instructs the API to not create a durable. +// If no durable name was provided, an error will be raised. +func NoDurableCreate() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.noDurCreate = true + return nil + }) +} + // DeliverAll will configure a Consumer to receive all the // messages from a Stream. func DeliverAll() SubOpt { From 30415af14c3fe9763ba3aaa4de80d3af7990123f Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 24 May 2021 18:59:21 -0400 Subject: [PATCH 2/9] Changing to BindJetStream Signed-off-by: Matthias Hanel --- js.go | 76 ++++++++++++++++++++++++++++--------------------- test/js_test.go | 2 +- 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/js.go b/js.go index 89d5b337f..a0cc96467 100644 --- a/js.go +++ b/js.go @@ -147,6 +147,9 @@ type js struct { stc chan struct{} dch chan struct{} rr *rand.Rand + + // Assumes underlying objects exist and prevents additional lookups + bound bool } type jsOpts struct { @@ -166,8 +169,9 @@ const ( defaultAccountCheck = 20 * time.Second ) -// JetStream returns a JetStreamContext for messaging and stream management. -func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { +// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create +// underlying objects like stream or durable. It assumes they exist already. +func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) { js := &js{ nc: nc, opts: &jsOpts{ @@ -181,26 +185,42 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { return nil, err } } + return js, nil +} +// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create +// underlying objects like stream or durable. It assumes they exist already. +func (nc *Conn) BindJetStream(opts ...JSOpt) (JetStreamContext, error) { + js, err := nc.jetStream(opts...) + if err != nil { + return nil, err + } + js.bound = true + return js, nil +} + +// JetStream returns a JetStreamContext for messaging and stream management. +func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { + js, err := nc.jetStream(opts...) + if err != nil { + return nil, err + } // If we have check recently we can avoid another account lookup here. // We want these to be lightweight and created at will. - // The account check is permanently disabled when the api prefix is set to anything but default - if js.opts.pre == defaultAPIPrefix { - nc.mu.Lock() - now := time.Now() - checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck - if checkAccount { - nc.jsLastCheck = now - } - nc.mu.Unlock() + nc.mu.Lock() + now := time.Now() + checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck + if checkAccount { + nc.jsLastCheck = now + } + nc.mu.Unlock() - if checkAccount { - if _, err := js.AccountInfo(); err != nil { - if err == ErrNoResponders { - err = ErrJetStreamNotEnabled - } - return nil, err + if checkAccount { + if _, err := js.AccountInfo(); err != nil { + if err == ErrNoResponders { + err = ErrJetStreamNotEnabled } + return nil, err } } @@ -1033,6 +1053,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // Find the stream mapped to the subject if not bound to a stream already. if o.stream == _EMPTY_ { + if js.bound { + return nil, fmt.Errorf("nats: a bound JS requires a stream name") + } stream, err = js.lookupStreamBySubject(subj) if err != nil { return nil, err @@ -1044,9 +1067,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // With an explicit durable name, then can lookup // the consumer to which it should be attaching to. consumer = o.cfg.Durable - if consumer == _EMPTY_ && o.noDurCreate { - return nil, fmt.Errorf("nats: NoCreate requires a durable name") - } else if consumer != _EMPTY_ && !o.noDurCreate { + if consumer == _EMPTY_ && js.bound { + return nil, fmt.Errorf("nats: a bound JS requires a durable name") + } else if consumer != _EMPTY_ && !js.bound { // Only create in case there is no consumer already. info, err = js.ConsumerInfo(stream, consumer) if err != nil && err.Error() != "nats: consumer not found" { @@ -1054,7 +1077,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } - if o.noDurCreate { + if js.bound { attached = true deliver = subj } else if info != nil { @@ -1359,8 +1382,6 @@ type subOpts struct { mack bool // For creating or updating. cfg *ConsumerConfig - // Assert consumer exists and do not create it - noDurCreate bool } // ManualAck disables auto ack functionality for async subscriptions. @@ -1386,15 +1407,6 @@ func Durable(name string) SubOpt { }) } -// NoDurableCreate instructs the API to not create a durable. -// If no durable name was provided, an error will be raised. -func NoDurableCreate() SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.noDurCreate = true - return nil - }) -} - // DeliverAll will configure a Consumer to receive all the // messages from a Stream. func DeliverAll() SubOpt { diff --git a/test/js_test.go b/test/js_test.go index 625ac81c6..354672dac 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2252,7 +2252,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } defer ncm.Close() - jsm, err := ncm.JetStream() + jsm, err := ncm.BindJetStream() if err != nil { t.Fatal(err) } From e6e0f0b34302c7b2a5d971a197e32e98bd3dae65 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 24 May 2021 20:39:29 -0400 Subject: [PATCH 3/9] fix test and clean up errors Signed-off-by: Matthias Hanel --- js.go | 5 +++-- nats.go | 2 ++ test/js_test.go | 14 +++++++------- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/js.go b/js.go index a0cc96467..b977e1b8d 100644 --- a/js.go +++ b/js.go @@ -190,6 +190,7 @@ func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) { // BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create // underlying objects like stream or durable. It assumes they exist already. +// This will also disable the use of ephemeral consumer. func (nc *Conn) BindJetStream(opts ...JSOpt) (JetStreamContext, error) { js, err := nc.jetStream(opts...) if err != nil { @@ -1054,7 +1055,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // Find the stream mapped to the subject if not bound to a stream already. if o.stream == _EMPTY_ { if js.bound { - return nil, fmt.Errorf("nats: a bound JS requires a stream name") + return nil, ErrBoundJetStreamStream } stream, err = js.lookupStreamBySubject(subj) if err != nil { @@ -1068,7 +1069,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // the consumer to which it should be attaching to. consumer = o.cfg.Durable if consumer == _EMPTY_ && js.bound { - return nil, fmt.Errorf("nats: a bound JS requires a durable name") + return nil, ErrBoundJetStreamDurable } else if consumer != _EMPTY_ && !js.bound { // Only create in case there is no consumer already. info, err = js.ConsumerInfo(stream, consumer) diff --git a/nats.go b/nats.go index 88f528109..02c7bfe42 100644 --- a/nats.go +++ b/nats.go @@ -135,6 +135,8 @@ var ( ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") ErrPullModeNotAllowed = errors.New("nats: pull based not supported") ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") + ErrBoundJetStreamStream = errors.New("nats: a bound JS requires a stream name") + ErrBoundJetStreamDurable = errors.New("nats: a bound JS requires a stream name") ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") ErrNoStreamResponse = errors.New("nats: no response from stream") ErrNotJSMessage = errors.New("nats: not a jetstream message") diff --git a/test/js_test.go b/test/js_test.go index 354672dac..1c4ffffbd 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2294,7 +2294,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } defer nc.Close() - js, err := nc.JetStream() + js, err := nc.BindJetStream() if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -2344,14 +2344,14 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } // Cannot subscribe with JS context from another account right now. - if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrBoundJetStreamStream { + t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamStream, err) } - if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrBoundJetStreamDurable { + t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamDurable, err) } - if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nil { + t.Fatalf("Expected no error, got '%v'", err) } } From 94f77fcc271593ad00b0873bf41bd44e1bf9a1d3 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 24 May 2021 20:43:58 -0400 Subject: [PATCH 4/9] remove comment Signed-off-by: Matthias Hanel --- js.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/js.go b/js.go index b977e1b8d..f86f216e5 100644 --- a/js.go +++ b/js.go @@ -169,8 +169,6 @@ const ( defaultAccountCheck = 20 * time.Second ) -// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create -// underlying objects like stream or durable. It assumes they exist already. func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) { js := &js{ nc: nc, From e6acd58f2e79c2ea5d778b3e0923911ecaa5133f Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 25 May 2021 12:21:29 -0400 Subject: [PATCH 5/9] Update nats.go Co-authored-by: Waldemar Quevedo --- nats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats.go b/nats.go index 02c7bfe42..20e6d755e 100644 --- a/nats.go +++ b/nats.go @@ -136,7 +136,7 @@ var ( ErrPullModeNotAllowed = errors.New("nats: pull based not supported") ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") ErrBoundJetStreamStream = errors.New("nats: a bound JS requires a stream name") - ErrBoundJetStreamDurable = errors.New("nats: a bound JS requires a stream name") + ErrBoundJetStreamDurable = errors.New("nats: a bound JS requires a durable name") ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") ErrNoStreamResponse = errors.New("nats: no response from stream") ErrNotJSMessage = errors.New("nats: not a jetstream message") From 335b6c891f882dd4cd25940df43124d72438b000 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 25 May 2021 12:21:46 -0400 Subject: [PATCH 6/9] Update test/js_test.go Co-authored-by: Waldemar Quevedo --- test/js_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/js_test.go b/test/js_test.go index 1c4ffffbd..2130a7b1d 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2343,7 +2343,6 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } } - // Cannot subscribe with JS context from another account right now. if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrBoundJetStreamStream { t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamStream, err) } From 4361906b9b5c1f7649b8c3724df15d767693e976 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 25 May 2021 13:10:35 -0400 Subject: [PATCH 7/9] adding test for pull subscriber and ack Signed-off-by: Matthias Hanel --- test/js_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/js_test.go b/test/js_test.go index 2130a7b1d..4ef7cbe16 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2288,6 +2288,14 @@ func TestJetStreamImportDirectOnly(t *testing.T) { t.Fatalf("push consumer create failed: %v", err) } + _, err = jsm.AddConsumer("ORDERS", &nats.ConsumerConfig{ + Durable: "d4", + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + t.Fatalf("pull consumer create failed: %v", err) + } + nc, err := nats.Connect(s.ClientURL()) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2343,14 +2351,21 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } } + // Cannot subscribe with JS context from another account unless the stream is specified if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrBoundJetStreamStream { t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamStream, err) } if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrBoundJetStreamDurable { t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamDurable, err) } - if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nil { + if sub, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nil { t.Fatalf("Expected no error, got '%v'", err) + } else if m, err := sub.Fetch(1); err != nil { + t.Fatalf("expected no error, got %v", err) + } else if len(m) != 1 { + t.Fatalf("expected one message, got %d", len(m)) + } else if err = m[0].Ack(); err != nil { + t.Fatalf("expected no error, got %v", err) } } From 2a0972927c4d53dcc6dd659c398d0c24fb121f6d Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 25 May 2021 13:58:40 -0400 Subject: [PATCH 8/9] Update test/js_test.go Co-authored-by: Waldemar Quevedo --- test/js_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/js_test.go b/test/js_test.go index 4ef7cbe16..87c20210a 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2364,7 +2364,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) { t.Fatalf("expected no error, got %v", err) } else if len(m) != 1 { t.Fatalf("expected one message, got %d", len(m)) - } else if err = m[0].Ack(); err != nil { + } else if err = m[0].AckSync(); err != nil { t.Fatalf("expected no error, got %v", err) } } From 93227342853947f2b06f7c98e55212a31d950114 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 25 May 2021 13:59:19 -0400 Subject: [PATCH 9/9] Update js.go Co-authored-by: Waldemar Quevedo --- js.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/js.go b/js.go index f86f216e5..6c8fa7fb4 100644 --- a/js.go +++ b/js.go @@ -187,7 +187,8 @@ func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) { } // BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create -// underlying objects like stream or durable. It assumes they exist already. +// underlying objects like stream or durable on Subscribe. It assumes they exist already and that +// the JetStream context is bound to another account via an import with limited permissions for example. // This will also disable the use of ephemeral consumer. func (nc *Conn) BindJetStream(opts ...JSOpt) (JetStreamContext, error) { js, err := nc.jetStream(opts...)