Skip to content

Commit

Permalink
Merge pull request #768 from nats-io/max-msgs-subj
Browse files Browse the repository at this point in the history
Add MaxMsgsPerSubject to StreamConfig
  • Loading branch information
wallyqs committed Jul 1, 2021
2 parents 274aa57 + c8db2d3 commit c7fc3c7
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 19 deletions.
35 changes: 18 additions & 17 deletions jsm.go
Expand Up @@ -74,23 +74,24 @@ type JetStreamManager interface {
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
}

// Placement is used to guide placement of streams in clustered JetStream.
Expand Down
88 changes: 86 additions & 2 deletions test/js_test.go
Expand Up @@ -2336,8 +2336,8 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
},
V: {
users: [ {
user: v,
password: quux,
user: v,
password: quux,
permissions: { publish: {deny: ["$JS.API.CONSUMER.INFO.ORDERS.d1"]} }
} ]
imports [
Expand Down Expand Up @@ -6327,3 +6327,87 @@ func TestJetStreamDomain(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
}

// Test that we properly enfore per subject msg limits.
func TestJetStreamMaxMsgsPerSubject(t *testing.T) {
const subjectMax = 5
msc := nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", "baz.*"},
Storage: nats.MemoryStorage,
MaxMsgsPerSubject: subjectMax,
}
fsc := msc
fsc.Storage = nats.FileStorage

cases := []struct {
name string
mconfig *nats.StreamConfig
}{
{"MemoryStore", &msc},
{"FileStore", &fsc},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

// Client for API requests.
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Got error during initialization %v", err)
}

_, err = js.AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer js.DeleteStream(c.mconfig.Name)

pubAndCheck := func(subj string, num int, expectedNumMsgs uint64) {
t.Helper()
for i := 0; i < num; i++ {
if _, err = js.Publish(subj, []byte("TSLA")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
si, err := js.StreamInfo(c.mconfig.Name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != expectedNumMsgs {
t.Fatalf("Expected %d msgs, got %d", expectedNumMsgs, si.State.Msgs)
}
}

pubAndCheck("foo", 1, 1)
pubAndCheck("foo", 4, 5)
// Now make sure our per subject limits kick in..
pubAndCheck("foo", 2, 5)
pubAndCheck("baz.22", 5, 10)
pubAndCheck("baz.33", 5, 15)
// We are maxed so totals should be same no matter what we add here.
pubAndCheck("baz.22", 5, 15)
pubAndCheck("baz.33", 5, 15)

// Now purge and make sure all is still good.
if err := js.PurgeStream(c.mconfig.Name); err != nil {
t.Fatalf("Unexpected purge error: %v", err)
}
pubAndCheck("foo", 1, 1)
pubAndCheck("foo", 4, 5)
pubAndCheck("baz.22", 5, 10)
pubAndCheck("baz.33", 5, 15)
})
}
}

0 comments on commit c7fc3c7

Please sign in to comment.