-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow switching from limits-based to interest-based retention in stream update #4361
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1495,9 +1495,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str | |
if cfg.Storage != old.Storage { | ||
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change storage type")) | ||
} | ||
// Can't change retention. | ||
// Can only change retention from limits to interest or back, not to/from work queue for now. | ||
if cfg.Retention != old.Retention { | ||
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change retention policy")) | ||
if old.Retention == WorkQueuePolicy || cfg.Retention == WorkQueuePolicy { | ||
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change retention policy to/from workqueue")) | ||
} | ||
} | ||
// Can not have a template owner for now. | ||
if old.Template != _EMPTY_ { | ||
|
@@ -1785,9 +1787,41 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) | |
// a subsequent update to an existing tier will then move from existing past tier to existing new tier | ||
} | ||
|
||
if mset.isLeader() && ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy { | ||
// Before we can update the retention policy for the consumer, we need | ||
// the replica count of all consumers to match the stream. | ||
for _, c := range mset.sa.consumers { | ||
if c.Config.Replicas > 0 && c.Config.Replicas != cfg.Replicas { | ||
mset.mu.Unlock() | ||
return fmt.Errorf("consumer %q replica count must be %d", c.Name, cfg.Replicas) | ||
} | ||
} | ||
} | ||
|
||
// Now update config and store's version of our config. | ||
mset.cfg = *cfg | ||
|
||
// If we're changing retention and haven't errored because of consumer | ||
// replicas by now, whip through and update the consumer retention. | ||
if ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy { | ||
toUpdate := make([]*consumer, 0, len(mset.consumers)) | ||
for _, c := range mset.consumers { | ||
toUpdate = append(toUpdate, c) | ||
} | ||
mset.mu.Unlock() | ||
for _, c := range toUpdate { | ||
c.mu.Lock() | ||
c.retention = cfg.Retention | ||
c.mu.Unlock() | ||
if c.retention == InterestPolicy { | ||
// If we're switching to interest, force a check of the | ||
// interest of existing stream messages. | ||
c.checkStateForInterestStream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! I saw this and was confused for a second then realized what you did here.. |
||
} | ||
} | ||
mset.mu.Lock() | ||
} | ||
|
||
// If we are the leader never suppress update advisory, simply send. | ||
if mset.isLeader() && sendAdvisory { | ||
mset.sendUpdateAdvisoryLocked() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,6 @@ | |
package server | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"math/rand" | ||
"net/url" | ||
|
@@ -112,17 +111,17 @@ func require_Error(t *testing.T, err error, expected ...error) { | |
t.Fatalf("Expected one of %v, got '%v'", expected, err) | ||
} | ||
|
||
func require_Equal(t *testing.T, a, b string) { | ||
func require_Equal[T comparable](t *testing.T, a, b T) { | ||
t.Helper() | ||
if strings.Compare(a, b) != 0 { | ||
t.Fatalf("require equal, but got: %v != %v", a, b) | ||
if a != b { | ||
t.Fatalf("require %T equal, but got: %v != %v", a, a, b) | ||
} | ||
} | ||
|
||
func require_NotEqual(t *testing.T, a, b [32]byte) { | ||
func require_NotEqual[T comparable](t *testing.T, a, b T) { | ||
t.Helper() | ||
if bytes.Equal(a[:], b[:]) { | ||
t.Fatalf("require not equal, but got: %v != %v", a, b) | ||
if a == b { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this do the right thing for 2 different byte slices that have same contents? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function was previously taking Trying to pass in slices instead of fixed-size arrays will result in a compile-time error as they are not |
||
t.Fatalf("require %T not equal, but got: %v != %v", a, a, b) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One important piece, if the consumer is retention interest, then its replication factor must be same as stream, so need to adjust that here and add in an R1 consumer to the test above etc.