Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to set memory storage to true for a consumer #1078

Merged
merged 3 commits into from Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions example_test.go
Expand Up @@ -630,6 +630,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerReplicas(1))

// Force memory storage while subscribing.
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerMemoryStorage())
}

func ExampleMaxWait() {
Expand Down
11 changes: 11 additions & 0 deletions js.go
Expand Up @@ -1357,6 +1357,9 @@ func checkConfig(s, u *ConsumerConfig) error {
if u.Replicas > 0 && u.Replicas != s.Replicas {
return makeErr("replicas", u.Replicas, s.Replicas)
}
if u.MemoryStorage && !s.MemoryStorage {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be verifying it both ways, i.e. true -> false and false -> true. Right now check will pass even if u.MemoryStorage == false and s.MemoryStorage == true.

Suggested change
if u.MemoryStorage && !s.MemoryStorage {
if u.MemoryStorage != !s.MemoryStorage {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we can't unfortunately, because we don't have a "3 states" value: true, false, not-specified. The point of this function is to check the user intent vs what is already configured in the server.
We have no way to express that the user does not want MemoryStorage, so if you call js.Subscribe() without specifying anything and the JS consumer happens to be configured on the server with MemoryStorage, you don't want to call to fail. Otherwise we would need an option that says "NoMemoryStorage", etc..

So this function for boolean is not great, but the best we can do is check when the intent was to set a boolean to true, then only we can check that it matches the server setting.

return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage)
}
return nil
}

Expand Down Expand Up @@ -2485,6 +2488,14 @@ func ConsumerReplicas(replicas int) SubOpt {
})
}

// ConsumerMemoryStorage sets the memory storage to true for a consumer.
func ConsumerMemoryStorage() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MemoryStorage = true
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
61 changes: 61 additions & 0 deletions js_test.go
Expand Up @@ -1096,3 +1096,64 @@ func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) {
t.Fatalf("Wrong header: %v", r.Header)
}
}

func TestJetStreamConsumerMemoryStorage(t *testing.T) {
opts := natsserver.DefaultTestOptions
opts.Port = -1
opts.JetStream = true
s := natsserver.RunServer(&opts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use s := RunBasicJetStreamServer() instead.

defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

if _, err := js.AddStream(&StreamConfig{Name: "STR", Subjects: []string{"foo"}}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

// Pull ephemeral consumer with memory storage.
sub, err := js.PullSubscribe("foo", "", ConsumerMemoryStorage())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

consInfo, err := sub.ConsumerInfo()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of repeating this code for the 3 types of consumers, I would have made it a small helper function inline:

checkCons := func(sub *Subscription) {
    t.Helper()
    consInfo, err := sub.ConsumerInfo()
    ...
}

then you would invoke it after each sub creation.

if err != nil {
t.Fatalf("Error getting consumer info: %v", err)
}

if !consInfo.Config.MemoryStorage {
t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage)
}

// Create a sync subscription with an in-memory ephemeral consumer.
sub, err = js.SubscribeSync("foo", ConsumerMemoryStorage())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

consInfo, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Error getting consumer info: %v", err)
}

if !consInfo.Config.MemoryStorage {
t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage)
}

// Async subscription with an in-memory ephemeral consumer.
cb := func(msg *Msg) {}
sub, err = js.Subscribe("foo", cb, ConsumerMemoryStorage())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

consInfo, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Error getting consumer info: %v", err)
}

if !consInfo.Config.MemoryStorage {
t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you may want to add is a test where a durable is created with MemoryStorage and call a js.SubscribeSync() with Durable() but without the memory storage option in the subscribe call and verifies that it actually works. That would catch the change the Piotr was suggesting that would - in my opinion - break things.

}