diff --git a/example_test.go b/example_test.go index 42e3f322d..9651785b8 100644 --- a/example_test.go +++ b/example_test.go @@ -630,6 +630,11 @@ func ExampleSubOpt() { js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerReplicas(1)) + + // Force memory storage while subscribing. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.Durable("FOO"), nats.ConsumerMemoryStorage()) } func ExampleMaxWait() { diff --git a/js.go b/js.go index 28211042a..6d716cd7a 100644 --- a/js.go +++ b/js.go @@ -1357,6 +1357,9 @@ func checkConfig(s, u *ConsumerConfig) error { if u.Replicas > 0 && u.Replicas != s.Replicas { return makeErr("replicas", u.Replicas, s.Replicas) } + if u.MemoryStorage && !s.MemoryStorage { + return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage) + } return nil } @@ -2485,6 +2488,14 @@ func ConsumerReplicas(replicas int) SubOpt { }) } +// ConsumerMemoryStorage sets the memory storage to true for a consumer. +func ConsumerMemoryStorage() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MemoryStorage = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/js_test.go b/js_test.go index b241a678c..60ccd5758 100644 --- a/js_test.go +++ b/js_test.go @@ -1096,3 +1096,64 @@ func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) { t.Fatalf("Wrong header: %v", r.Header) } } + +func TestJetStreamConsumerMemoryStorage(t *testing.T) { + opts := natsserver.DefaultTestOptions + opts.Port = -1 + opts.JetStream = true + s := natsserver.RunServer(&opts) + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + if _, err := js.AddStream(&StreamConfig{Name: "STR", Subjects: []string{"foo"}}); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + // Pull ephemeral consumer with memory storage. + sub, err := js.PullSubscribe("foo", "", ConsumerMemoryStorage()) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + consInfo, err := sub.ConsumerInfo() + if err != nil { + t.Fatalf("Error getting consumer info: %v", err) + } + + if !consInfo.Config.MemoryStorage { + t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage) + } + + // Create a sync subscription with an in-memory ephemeral consumer. + sub, err = js.SubscribeSync("foo", ConsumerMemoryStorage()) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + consInfo, err = sub.ConsumerInfo() + if err != nil { + t.Fatalf("Error getting consumer info: %v", err) + } + + if !consInfo.Config.MemoryStorage { + t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage) + } + + // Async subscription with an in-memory ephemeral consumer. + cb := func(msg *Msg) {} + sub, err = js.Subscribe("foo", cb, ConsumerMemoryStorage()) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + consInfo, err = sub.ConsumerInfo() + if err != nil { + t.Fatalf("Error getting consumer info: %v", err) + } + + if !consInfo.Config.MemoryStorage { + t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage) + } +}