Skip to content

Commit

Permalink
Allow switching from limits-based to interest-based retention in stre…
Browse files Browse the repository at this point in the history
…am update

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Aug 9, 2023
1 parent 6eb77fd commit d7f76da
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 10 deletions.
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -6980,7 +6980,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
} else {
nca := ca.copyGroup()

rBefore := ca.Config.replicas(sa.Config)
rBefore := nca.Config.replicas(sa.Config)
rAfter := cfg.replicas(sa.Config)

var curLeader string
Expand Down
110 changes: 110 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21407,3 +21407,113 @@ func TestJetStreamServerReencryption(t *testing.T) {
})
}
}

func TestJetStreamLimitsToInterestPolicy(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.leader())
defer nc.Close()

// This is the index of the consumer that we'll create as R1
// instead of R3, just to prove that it blocks the stream
// update from happening properly.
singleReplica := 3

streamCfg := nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
Storage: nats.MemoryStorage,
Replicas: 3,
}

stream, err := js.AddStream(&streamCfg)
require_NoError(t, err)

for i := 0; i < 10; i++ {
replicas := streamCfg.Replicas
if i == singleReplica {
// Make one of the consumers R1 so that we can check
// that the switch to interest-based retention is also
// turning it into an R3 consumer.
replicas = 1
}
cname := fmt.Sprintf("test_%d", i)
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: cname,
Durable: cname,
AckPolicy: nats.AckAllPolicy,
Replicas: replicas,
})
require_NoError(t, err)
}

for i := 0; i < 20; i++ {
_, err := js.Publish("foo", []byte{1, 2, 3, 4, 5})
require_NoError(t, err)
}

// Pull 10 or more messages from the stream. We will never pull
// less than 10, which guarantees that the lowest ack floor of
// all consumers should be 10.
for i := 0; i < 10; i++ {
cname := fmt.Sprintf("test_%d", i)
count := 10 + i // At least 10 messages

sub, err := js.PullSubscribe("foo", cname)
require_NoError(t, err)

msgs, err := sub.Fetch(count)
require_NoError(t, err)
require_Equal(t, len(msgs), count)
require_NoError(t, msgs[len(msgs)-1].AckSync())

// At this point the ack floor should match the count of
// messages we received.
info, err := js.ConsumerInfo("TEST", cname)
require_NoError(t, err)
require_Equal(t, info.AckFloor.Consumer, uint64(count))
}

// Try updating to interest-based. This should fail because
// we have a consumer that is R1 on an R3 stream.
streamCfg = stream.Config
streamCfg.Retention = nats.InterestPolicy
_, err = js.UpdateStream(&streamCfg)
require_Error(t, err)

// Now we'll make the R1 consumer an R3.
cname := fmt.Sprintf("test_%d", singleReplica)
cinfo, err := js.ConsumerInfo("TEST", cname)
require_NoError(t, err)

cinfo.Config.Replicas = streamCfg.Replicas
_, _ = js.UpdateConsumer("TEST", &cinfo.Config)
// TODO(nat): The jsConsumerCreateRequest update doesn't always
// respond when there are no errors updating a consumer, so this
// nearly always returns a timeout, despite actually doing what
// it should. We'll make sure the replicas were updated by doing
// another consumer info just to be sure.
// require_NoError(t, err)
c.waitOnAllCurrent()
cinfo, err = js.ConsumerInfo("TEST", cname)
require_NoError(t, err)
require_Equal(t, cinfo.Config.Replicas, streamCfg.Replicas)
require_Equal(t, len(cinfo.Cluster.Replicas), streamCfg.Replicas-1)

// This time it should succeed.
_, err = js.UpdateStream(&streamCfg)
require_NoError(t, err)

// We need to wait for all nodes to have applied the new stream
// configuration.
c.waitOnAllCurrent()

// Now we should only have 10 messages left in the stream, as
// each consumer has acked at least the first 10 messages.
info, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, info.State.FirstSeq, 11)
require_Equal(t, info.State.Msgs, 10)
}
38 changes: 36 additions & 2 deletions server/stream.go
Expand Up @@ -1495,9 +1495,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
if cfg.Storage != old.Storage {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change storage type"))
}
// Can't change retention.
// Can only change retention from limits to interest or back, not to/from work queue for now.
if cfg.Retention != old.Retention {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change retention policy"))
if old.Retention == WorkQueuePolicy || cfg.Retention == WorkQueuePolicy {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change retention policy to/from workqueue"))
}
}
// Can not have a template owner for now.
if old.Template != _EMPTY_ {
Expand Down Expand Up @@ -1785,9 +1787,41 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
// a subsequent update to an existing tier will then move from existing past tier to existing new tier
}

if mset.isLeader() && ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy {
// Before we can update the retention policy for the consumer, we need
// the replica count of all consumers to match the stream.
for _, c := range mset.sa.consumers {
if c.Config.Replicas > 0 && c.Config.Replicas != cfg.Replicas {
mset.mu.Unlock()
return fmt.Errorf("consumer %q replica count must be %d", c.Name, cfg.Replicas)
}
}
}

// Now update config and store's version of our config.
mset.cfg = *cfg

// If we're changing retention and haven't errored because of consumer
// replicas by now, whip through and update the consumer retention.
if ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy {
toUpdate := make([]*consumer, 0, len(mset.consumers))
for _, c := range mset.consumers {
toUpdate = append(toUpdate, c)
}
mset.mu.Unlock()
for _, c := range toUpdate {
c.mu.Lock()
c.retention = cfg.Retention
c.mu.Unlock()
if c.retention == InterestPolicy {
// If we're switching to interest, force a check of the
// interest of existing stream messages.
c.checkStateForInterestStream()
}
}
mset.mu.Lock()
}

// If we are the leader never suppress update advisory, simply send.
if mset.isLeader() && sendAdvisory {
mset.sendUpdateAdvisoryLocked()
Expand Down
13 changes: 6 additions & 7 deletions server/test_test.go
Expand Up @@ -14,7 +14,6 @@
package server

import (
"bytes"
"fmt"
"math/rand"
"net/url"
Expand Down Expand Up @@ -112,17 +111,17 @@ func require_Error(t *testing.T, err error, expected ...error) {
t.Fatalf("Expected one of %v, got '%v'", expected, err)
}

func require_Equal(t *testing.T, a, b string) {
func require_Equal[T comparable](t *testing.T, a, b T) {
t.Helper()
if strings.Compare(a, b) != 0 {
t.Fatalf("require equal, but got: %v != %v", a, b)
if a != b {
t.Fatalf("require %T equal, but got: %v != %v", a, a, b)
}
}

func require_NotEqual(t *testing.T, a, b [32]byte) {
func require_NotEqual[T comparable](t *testing.T, a, b T) {
t.Helper()
if bytes.Equal(a[:], b[:]) {
t.Fatalf("require not equal, but got: %v != %v", a, b)
if a == b {
t.Fatalf("require %T not equal, but got: %v != %v", a, a, b)
}
}

Expand Down

0 comments on commit d7f76da

Please sign in to comment.