From c8db2d38925927f44bfbf270dfbce485097941db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaime=20Pi=C3=B1a?= Date: Wed, 30 Jun 2021 09:46:01 -0700 Subject: [PATCH] Add MaxMsgsPerSubject --- jsm.go | 35 ++++++++++---------- test/js_test.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 104 insertions(+), 19 deletions(-) diff --git a/jsm.go b/jsm.go index db38ff825..00ea173a8 100644 --- a/jsm.go +++ b/jsm.go @@ -74,23 +74,24 @@ type JetStreamManager interface { // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. type StreamConfig struct { - Name string `json:"name"` - Subjects []string `json:"subjects,omitempty"` - Retention RetentionPolicy `json:"retention"` - MaxConsumers int `json:"max_consumers"` - MaxMsgs int64 `json:"max_msgs"` - MaxBytes int64 `json:"max_bytes"` - Discard DiscardPolicy `json:"discard"` - MaxAge time.Duration `json:"max_age"` - MaxMsgSize int32 `json:"max_msg_size,omitempty"` - Storage StorageType `json:"storage"` - Replicas int `json:"num_replicas"` - NoAck bool `json:"no_ack,omitempty"` - Template string `json:"template_owner,omitempty"` - Duplicates time.Duration `json:"duplicate_window,omitempty"` - Placement *Placement `json:"placement,omitempty"` - Mirror *StreamSource `json:"mirror,omitempty"` - Sources []*StreamSource `json:"sources,omitempty"` + Name string `json:"name"` + Subjects []string `json:"subjects,omitempty"` + Retention RetentionPolicy `json:"retention"` + MaxConsumers int `json:"max_consumers"` + MaxMsgs int64 `json:"max_msgs"` + MaxBytes int64 `json:"max_bytes"` + Discard DiscardPolicy `json:"discard"` + MaxAge time.Duration `json:"max_age"` + MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` + MaxMsgSize int32 `json:"max_msg_size,omitempty"` + Storage StorageType `json:"storage"` + Replicas int `json:"num_replicas"` + NoAck bool `json:"no_ack,omitempty"` + Template string `json:"template_owner,omitempty"` + Duplicates time.Duration `json:"duplicate_window,omitempty"` + Placement *Placement `json:"placement,omitempty"` + Mirror *StreamSource `json:"mirror,omitempty"` + Sources []*StreamSource `json:"sources,omitempty"` } // Placement is used to guide placement of streams in clustered JetStream. diff --git a/test/js_test.go b/test/js_test.go index fba2a4691..98e094cb0 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2336,8 +2336,8 @@ func TestJetStreamImportDirectOnly(t *testing.T) { }, V: { users: [ { - user: v, - password: quux, + user: v, + password: quux, permissions: { publish: {deny: ["$JS.API.CONSUMER.INFO.ORDERS.d1"]} } } ] imports [ @@ -6327,3 +6327,87 @@ func TestJetStreamDomain(t *testing.T) { t.Errorf("Unexpected error: %v", err) } } + +// Test that we properly enfore per subject msg limits. +func TestJetStreamMaxMsgsPerSubject(t *testing.T) { + const subjectMax = 5 + msc := nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz.*"}, + Storage: nats.MemoryStorage, + MaxMsgsPerSubject: subjectMax, + } + fsc := msc + fsc.Storage = nats.FileStorage + + cases := []struct { + name string + mconfig *nats.StreamConfig + }{ + {"MemoryStore", &msc}, + {"FileStore", &fsc}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + // Client for API requests. + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Got error during initialization %v", err) + } + + _, err = js.AddStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer js.DeleteStream(c.mconfig.Name) + + pubAndCheck := func(subj string, num int, expectedNumMsgs uint64) { + t.Helper() + for i := 0; i < num; i++ { + if _, err = js.Publish(subj, []byte("TSLA")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + si, err := js.StreamInfo(c.mconfig.Name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != expectedNumMsgs { + t.Fatalf("Expected %d msgs, got %d", expectedNumMsgs, si.State.Msgs) + } + } + + pubAndCheck("foo", 1, 1) + pubAndCheck("foo", 4, 5) + // Now make sure our per subject limits kick in.. + pubAndCheck("foo", 2, 5) + pubAndCheck("baz.22", 5, 10) + pubAndCheck("baz.33", 5, 15) + // We are maxed so totals should be same no matter what we add here. + pubAndCheck("baz.22", 5, 15) + pubAndCheck("baz.33", 5, 15) + + // Now purge and make sure all is still good. + if err := js.PurgeStream(c.mconfig.Name); err != nil { + t.Fatalf("Unexpected purge error: %v", err) + } + pubAndCheck("foo", 1, 1) + pubAndCheck("foo", 4, 5) + pubAndCheck("baz.22", 5, 10) + pubAndCheck("baz.33", 5, 15) + }) + } +}