Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream: UpdateConsumer and new consumer config's options #893

Merged
merged 1 commit into from Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -17,9 +17,9 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649 h1:RhaQyUdFELJCBm3lG9xG5pmYyaNNSb49y+YSi8P9hio=
github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649/go.mod h1:cjxtMhZsZovK1XS2iiapCduR8HuqB/YpFamL0qntIcw=
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
37 changes: 37 additions & 0 deletions js.go
Expand Up @@ -903,6 +903,13 @@ type ConsumerConfig struct {
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`

// Ephemeral inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -2240,6 +2247,36 @@ func HeadersOnly() SubOpt {
})
}

// MaxRequestBatch sets the maximum pull consumer batch size that a Fetch()
// can request.
func MaxRequestBatch(max int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestBatch = max
return nil
})
}

// MaxRequestExpires sets the maximum pull consumer request expiration that a
// Fetch() can request (using the Fetch's timeout value).
func MaxRequestExpires(max time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestExpires = max
return nil
})
}

// InactiveThreshold indicates how long the server should keep an ephemeral
// after detecting loss of interest.
func InactiveThreshold(threshold time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
if threshold < 0 {
return fmt.Errorf("invalid InactiveThreshold value (%v), needs to be greater or equal to 0", threshold)
}
opts.cfg.InactiveThreshold = threshold
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
13 changes: 13 additions & 0 deletions jsm.go
Expand Up @@ -54,6 +54,9 @@ type JetStreamManager interface {
// AddConsumer adds a consumer to a stream.
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

// UpdateConsumer updates an existing consumer.
UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string, opts ...JSOpt) error

Expand Down Expand Up @@ -276,6 +279,16 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return info.ConsumerInfo, nil
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrInvalidDurableName
}
return js.AddConsumer(stream, cfg, opts...)
}

// consumerDeleteResponse is the response for a Consumer delete request.
type consumerDeleteResponse struct {
apiResponse
Expand Down
143 changes: 143 additions & 0 deletions test/js_test.go
Expand Up @@ -35,6 +35,20 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"
)

func getConnAndJS(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStreamContext) {
t.Helper()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Got error during initialization %v", err)
}
return nc, js
}

func TestJetStreamNotEnabled(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()
Expand Down Expand Up @@ -778,6 +792,32 @@ func TestJetStreamSubscribe(t *testing.T) {

// Both ChanQueueSubscribers use the same consumer.
expectConsumers(t, 4)

sub, err = js.SubscribeSync("foo", nats.InactiveThreshold(-100*time.Millisecond))
if err == nil || !strings.Contains(err.Error(), "invalid InactiveThreshold") {
t.Fatalf("Expected error about invalid option, got %v", err)
}

// Create an ephemeral with a lower inactive threshold
sub, err = js.SubscribeSync("foo", nats.InactiveThreshold(50*time.Millisecond))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Error on consumer info: %v", err)
}
name := ci.Name
nc.Close()

time.Sleep(150 * time.Millisecond)

nc, js = getConnAndJS(t, s)
defer nc.Close()

if ci, err := js.ConsumerInfo("TEST", name); err == nil {
t.Fatalf("Expected no consumer to exist, got %+v", ci)
}
}

func TestJetStreamAckPending_Pull(t *testing.T) {
Expand Down Expand Up @@ -1306,6 +1346,83 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("update consumer", func(t *testing.T) {
ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "update_push_consumer",
DeliverSubject: "bar",
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Currently, server supports these fields:
// description, ack_wait, max_deliver, sample_freq, max_ack_pending, max_waiting and headers_only
expected := ci.Config
expected.Description = "my description"
expected.AckWait = 2 * time.Second
expected.MaxDeliver = 1
expected.SampleFrequency = "30"
expected.MaxAckPending = 10
expected.HeadersOnly = true

// Check that stream name is required
_, err = js.UpdateConsumer("", &expected)
if err != nats.ErrStreamNameRequired {
t.Fatalf("Expected stream name required error, got %v", err)
}
// Check that durable name is required
expected.Durable = ""
_, err = js.UpdateConsumer("foo", &expected)
if err != nats.ErrInvalidDurableName {
t.Fatalf("Expected consumer name required error, got %v", err)
}
expected.Durable = "update_push_consumer"

// Check that configuration is required
_, err = js.UpdateConsumer("foo", nil)
if err != nats.ErrConsumerConfigRequired {
t.Fatalf("Expected consumer configuration required error, got %v", err)
}

// Now check that update works and expected fields have been updated
ci, err = js.UpdateConsumer("foo", &expected)
if err != nil {
t.Fatalf("Error on update: %v", err)
}
if !reflect.DeepEqual(ci.Config, expected) {
t.Fatalf("Expected config to be %+v, got %+v", expected, ci.Config)
}

// Now check with pull consumer
ci, err = js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "update_pull_consumer",
AckPolicy: nats.AckExplicitPolicy,
MaxWaiting: 1,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Currently, server supports these fields:
// description, ack_wait, max_deliver, sample_freq, max_ack_pending, max_waiting and headers_only
expected = ci.Config
expected.Description = "my description"
expected.AckWait = 2 * time.Second
expected.MaxDeliver = 1
expected.SampleFrequency = "30"
expected.MaxAckPending = 10
expected.MaxWaiting = 20
expected.HeadersOnly = true
expected.MaxRequestBatch = 10
expected.MaxRequestExpires = 2 * time.Second
ci, err = js.UpdateConsumer("foo", &expected)
if err != nil {
t.Fatalf("Error on update: %v", err)
}
if !reflect.DeepEqual(ci.Config, expected) {
t.Fatalf("Expected config to be %+v, got %+v", expected, ci.Config)
}
})

t.Run("purge stream", func(t *testing.T) {
if err := js.PurgeStream("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -5104,6 +5221,32 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) {
}
}

t.Run("max request batch", func(t *testing.T) {
defer js.PurgeStream(subject)

sub, err := js.PullSubscribe(subject, "max-request-batch", nats.MaxRequestBatch(2))
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestBatch of 2") {
t.Fatalf("Expected error about max request batch size, got %v", err)
}
})

t.Run("max request expires", func(t *testing.T) {
defer js.PurgeStream(subject)

sub, err := js.PullSubscribe(subject, "max-request-expires", nats.MaxRequestExpires(50*time.Millisecond))
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestExpires of 50ms") {
t.Fatalf("Expected error about max request expiration, got %v", err)
}
})

t.Run("batch size", func(t *testing.T) {
defer js.PurgeStream(subject)

Expand Down