Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MaxMsgsPerSubject to StreamConfig #768

Merged
merged 1 commit into from Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 18 additions & 17 deletions jsm.go
Expand Up @@ -74,23 +74,24 @@ type JetStreamManager interface {
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
}

// Placement is used to guide placement of streams in clustered JetStream.
Expand Down
88 changes: 86 additions & 2 deletions test/js_test.go
Expand Up @@ -2336,8 +2336,8 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
},
V: {
users: [ {
user: v,
password: quux,
user: v,
password: quux,
permissions: { publish: {deny: ["$JS.API.CONSUMER.INFO.ORDERS.d1"]} }
} ]
imports [
Expand Down Expand Up @@ -6327,3 +6327,87 @@ func TestJetStreamDomain(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
}

// Test that we properly enfore per subject msg limits.
func TestJetStreamMaxMsgsPerSubject(t *testing.T) {
const subjectMax = 5
msc := nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", "baz.*"},
Storage: nats.MemoryStorage,
MaxMsgsPerSubject: subjectMax,
}
fsc := msc
fsc.Storage = nats.FileStorage

cases := []struct {
name string
mconfig *nats.StreamConfig
}{
{"MemoryStore", &msc},
{"FileStore", &fsc},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

// Client for API requests.
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Got error during initialization %v", err)
}

_, err = js.AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer js.DeleteStream(c.mconfig.Name)

pubAndCheck := func(subj string, num int, expectedNumMsgs uint64) {
t.Helper()
for i := 0; i < num; i++ {
if _, err = js.Publish(subj, []byte("TSLA")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
si, err := js.StreamInfo(c.mconfig.Name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != expectedNumMsgs {
t.Fatalf("Expected %d msgs, got %d", expectedNumMsgs, si.State.Msgs)
}
}

pubAndCheck("foo", 1, 1)
pubAndCheck("foo", 4, 5)
// Now make sure our per subject limits kick in..
pubAndCheck("foo", 2, 5)
pubAndCheck("baz.22", 5, 10)
pubAndCheck("baz.33", 5, 15)
// We are maxed so totals should be same no matter what we add here.
pubAndCheck("baz.22", 5, 15)
pubAndCheck("baz.33", 5, 15)

// Now purge and make sure all is still good.
if err := js.PurgeStream(c.mconfig.Name); err != nil {
t.Fatalf("Unexpected purge error: %v", err)
}
pubAndCheck("foo", 1, 1)
pubAndCheck("foo", 4, 5)
pubAndCheck("baz.22", 5, 10)
pubAndCheck("baz.33", 5, 15)
})
}
}