Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow switching from limits-based to interest-based retention in stream update #4361

Merged
merged 1 commit into from Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -6980,7 +6980,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
} else {
nca := ca.copyGroup()

rBefore := ca.Config.replicas(sa.Config)
rBefore := nca.Config.replicas(sa.Config)
rAfter := cfg.replicas(sa.Config)

var curLeader string
Expand Down
110 changes: 110 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21407,3 +21407,113 @@ func TestJetStreamServerReencryption(t *testing.T) {
})
}
}

func TestJetStreamLimitsToInterestPolicy(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.leader())
defer nc.Close()

// This is the index of the consumer that we'll create as R1
// instead of R3, just to prove that it blocks the stream
// update from happening properly.
singleReplica := 3

streamCfg := nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
Storage: nats.MemoryStorage,
Replicas: 3,
}

stream, err := js.AddStream(&streamCfg)
require_NoError(t, err)

for i := 0; i < 10; i++ {
replicas := streamCfg.Replicas
if i == singleReplica {
// Make one of the consumers R1 so that we can check
// that the switch to interest-based retention is also
// turning it into an R3 consumer.
replicas = 1
}
cname := fmt.Sprintf("test_%d", i)
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: cname,
Durable: cname,
AckPolicy: nats.AckAllPolicy,
Replicas: replicas,
})
require_NoError(t, err)
}

for i := 0; i < 20; i++ {
_, err := js.Publish("foo", []byte{1, 2, 3, 4, 5})
require_NoError(t, err)
}

// Pull 10 or more messages from the stream. We will never pull
// less than 10, which guarantees that the lowest ack floor of
// all consumers should be 10.
for i := 0; i < 10; i++ {
cname := fmt.Sprintf("test_%d", i)
count := 10 + i // At least 10 messages

sub, err := js.PullSubscribe("foo", cname)
require_NoError(t, err)

msgs, err := sub.Fetch(count)
require_NoError(t, err)
require_Equal(t, len(msgs), count)
require_NoError(t, msgs[len(msgs)-1].AckSync())

// At this point the ack floor should match the count of
// messages we received.
info, err := js.ConsumerInfo("TEST", cname)
require_NoError(t, err)
require_Equal(t, info.AckFloor.Consumer, uint64(count))
}

// Try updating to interest-based. This should fail because
// we have a consumer that is R1 on an R3 stream.
streamCfg = stream.Config
streamCfg.Retention = nats.InterestPolicy
_, err = js.UpdateStream(&streamCfg)
require_Error(t, err)

// Now we'll make the R1 consumer an R3.
cname := fmt.Sprintf("test_%d", singleReplica)
cinfo, err := js.ConsumerInfo("TEST", cname)
require_NoError(t, err)

cinfo.Config.Replicas = streamCfg.Replicas
_, _ = js.UpdateConsumer("TEST", &cinfo.Config)
// TODO(nat): The jsConsumerCreateRequest update doesn't always
// respond when there are no errors updating a consumer, so this
// nearly always returns a timeout, despite actually doing what
// it should. We'll make sure the replicas were updated by doing
// another consumer info just to be sure.
// require_NoError(t, err)
c.waitOnAllCurrent()
cinfo, err = js.ConsumerInfo("TEST", cname)
require_NoError(t, err)
require_Equal(t, cinfo.Config.Replicas, streamCfg.Replicas)
require_Equal(t, len(cinfo.Cluster.Replicas), streamCfg.Replicas-1)

// This time it should succeed.
_, err = js.UpdateStream(&streamCfg)
require_NoError(t, err)

// We need to wait for all nodes to have applied the new stream
// configuration.
c.waitOnAllCurrent()

// Now we should only have 10 messages left in the stream, as
// each consumer has acked at least the first 10 messages.
info, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, info.State.FirstSeq, 11)
require_Equal(t, info.State.Msgs, 10)
}
38 changes: 36 additions & 2 deletions server/stream.go
Expand Up @@ -1495,9 +1495,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
if cfg.Storage != old.Storage {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change storage type"))
}
// Can't change retention.
// Can only change retention from limits to interest or back, not to/from work queue for now.
if cfg.Retention != old.Retention {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change retention policy"))
if old.Retention == WorkQueuePolicy || cfg.Retention == WorkQueuePolicy {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change retention policy to/from workqueue"))
}
}
// Can not have a template owner for now.
if old.Template != _EMPTY_ {
Expand Down Expand Up @@ -1785,9 +1787,41 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
// a subsequent update to an existing tier will then move from existing past tier to existing new tier
}

if mset.isLeader() && ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy {
// Before we can update the retention policy for the consumer, we need
// the replica count of all consumers to match the stream.
for _, c := range mset.sa.consumers {
if c.Config.Replicas > 0 && c.Config.Replicas != cfg.Replicas {
mset.mu.Unlock()
return fmt.Errorf("consumer %q replica count must be %d", c.Name, cfg.Replicas)
}
}
}

// Now update config and store's version of our config.
mset.cfg = *cfg

// If we're changing retention and haven't errored because of consumer
// replicas by now, whip through and update the consumer retention.
if ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy {
toUpdate := make([]*consumer, 0, len(mset.consumers))
for _, c := range mset.consumers {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One important piece, if the consumer is retention interest, then its replication factor must be same as stream, so need to adjust that here and add in an R1 consumer to the test above etc.

toUpdate = append(toUpdate, c)
}
mset.mu.Unlock()
for _, c := range toUpdate {
c.mu.Lock()
c.retention = cfg.Retention
c.mu.Unlock()
if c.retention == InterestPolicy {
// If we're switching to interest, force a check of the
// interest of existing stream messages.
c.checkStateForInterestStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I saw this and was confused for a second then realized what you did here..

}
}
mset.mu.Lock()
}

// If we are the leader never suppress update advisory, simply send.
if mset.isLeader() && sendAdvisory {
mset.sendUpdateAdvisoryLocked()
Expand Down
13 changes: 6 additions & 7 deletions server/test_test.go
Expand Up @@ -14,7 +14,6 @@
package server

import (
"bytes"
"fmt"
"math/rand"
"net/url"
Expand Down Expand Up @@ -112,17 +111,17 @@ func require_Error(t *testing.T, err error, expected ...error) {
t.Fatalf("Expected one of %v, got '%v'", expected, err)
}

func require_Equal(t *testing.T, a, b string) {
func require_Equal[T comparable](t *testing.T, a, b T) {
t.Helper()
if strings.Compare(a, b) != 0 {
t.Fatalf("require equal, but got: %v != %v", a, b)
if a != b {
t.Fatalf("require %T equal, but got: %v != %v", a, a, b)
}
}

func require_NotEqual(t *testing.T, a, b [32]byte) {
func require_NotEqual[T comparable](t *testing.T, a, b T) {
t.Helper()
if bytes.Equal(a[:], b[:]) {
t.Fatalf("require not equal, but got: %v != %v", a, b)
if a == b {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this do the right thing for 2 different byte slices that have same contents?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function was previously taking a, b [32]byte and fixed-size arrays can be compared using ==, so it will continue to work for what it was being used for.

Trying to pass in slices instead of fixed-size arrays will result in a compile-time error as they are not comparable.

t.Fatalf("require %T not equal, but got: %v != %v", a, a, b)
}
}

Expand Down