New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JetStream: lot of changes #794
Conversation
They will be described in the release notes, but gist: Added: - `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants) - `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation) - Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers - Field `Last` in `SequencePair` Changed: - With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API - If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes - Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5 - Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error Fixed: - Possible lock inversion - JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()` Resolves #785 Resolves #776 Resolves #775 Resolves #748 Resolves #747 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Also ran this branch against my server version, the cross account ack issue is gone.
comments are for documentation only and optional.
example_test.go
Outdated
@@ -323,7 +323,8 @@ func ExampleJetStream() { | |||
}, nats.ManualAck()) | |||
|
|||
// Async queue subscription where members load balance the | |||
// received messages together. | |||
// received messages together. Since no consumer name is specified, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe expand on: since no consumer name is specified
Arguably I'm asking this because it's first up in the review, but it's also an example...
Perhaps add (as additional argument)
nats.go
Outdated
// error will be returned. | ||
// If you do not wish the JetStream consumer to be automatically deleted, | ||
// ensure that the consumer is not created by the library, which means | ||
// create the consumer with AddConsumer and bind to this consumer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a reference to the option name. (Using nats.Bind option)
@@ -875,6 +877,21 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { | |||
} | |||
|
|||
// Subscribe will create a subscription to the appropriate stream and consumer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe expand on appropriate?
The stream matching subject?
js.go
Outdated
// This applies only in cases where the no consumer exists and it will be | ||
// created by the library by the subscribe API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the no consumer?
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
@matthiashanel I tried to address your comments on the last commit (e077154). If you could have another look and tell me if they need more tweaking. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!!! LGTM
As discussed with Matthias who came up with the idea, this is better because then we make use of the provided subject. Otherwise it was looking weird to have something which meaning was: ``` js.SubscribeSync("ignored", nats.BindDeliverSubject("p.d4")) ``` Instead you would now have: ``` sub, err = js.SubscribeSync("p.d4", nats.SubjectIsDelivery()) ``` Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
They will be described in the release notes, but gist:
Added:
DeliverSubject()
option to configure the deliver subject of a JetStream consumer created by thejs.Subscribe()
call (and variants)BindDeliverSubject()
option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation)DeliverGroup
inConsumerConfig
,PushBound
inConsumerInfo
. They help making prevent incorrect subscriptions to JetStream consumersLast
inSequencePair
Changed:
PullSubscription
, callingNextMsg()
orNextMsgWithContext()
will now returnErrTypeSubscription
. You must use theFetch()
APIUnsubscribe()
or when theDrain()
completesDeliverGroup
, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5Fixed:
Unsubscribe()
Resolves #785
Resolves #776
Resolves #775
Resolves #748
Resolves #747
Signed-off-by: Ivan Kozlovic ivan@synadia.com