Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for data race when changing retention policy #4551

Merged
merged 1 commit into from Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion server/consumer.go
Expand Up @@ -2667,12 +2667,16 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {

mset := o.mset
clustered := o.node != nil

// In case retention changes for a stream, this ought to have been updated
// using the consumer lock to avoid a race.
retention := o.retention
o.mu.Unlock()

// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered this will be handled by processReplicatedAck
// after the ack has propagated.
if !clustered && mset != nil && mset.cfg.Retention != LimitsPolicy {
if !clustered && mset != nil && retention != LimitsPolicy {
if sagap > 1 {
// FIXME(dlc) - This is very inefficient, will need to fix.
for seq := sseq; seq > sseq-sagap; seq-- {
Expand Down
93 changes: 93 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21510,6 +21510,99 @@ func TestJetStreamLimitsToInterestPolicy(t *testing.T) {
require_Equal(t, info.State.Msgs, 10)
}

func TestJetStreamLimitsToInterestPolicyWhileAcking(t *testing.T) {
for _, st := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.leader())
defer nc.Close()
streamCfg := nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
Storage: st,
Replicas: 3,
}

stream, err := js.AddStream(&streamCfg)
require_NoError(t, err)

wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
payload := []byte(strings.Repeat("A", 128))

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}
js.Publish("foo", payload)
}
}()
for i := 0; i < 5; i++ {
cname := fmt.Sprintf("test_%d", i)
sub, err := js.PullSubscribe("foo", cname)
require_NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}

msgs, err := sub.Fetch(1)
if err != nil && errors.Is(err, nats.ErrTimeout) {
t.Logf("ERROR: %v", err)
}
for _, msg := range msgs {
msg.Ack()
}
}
}()
}
// Leave running for a few secs then do the change on a different connection.
time.Sleep(5 * time.Second)
nc2, js2 := jsClientConnect(t, c.leader())
defer nc2.Close()

// Try updating to interest-based and changing replicas too.
streamCfg = stream.Config
streamCfg.Retention = nats.InterestPolicy
_, err = js2.UpdateStream(&streamCfg)
require_NoError(t, err)

// We need to wait for all nodes to have applied the new stream
// configuration.
c.waitOnAllCurrent()

var retention nats.RetentionPolicy
checkFor(t, 15*time.Second, 500*time.Millisecond, func() error {
info, err := js2.StreamInfo("TEST", nats.MaxWait(500*time.Millisecond))
if err != nil {
return err
}
retention = info.Config.Retention
return nil
})
require_Equal(t, retention, nats.InterestPolicy)

// Cancel and wait for goroutines underneath.
cancel()
wg.Wait()
})
}
}

func TestJetStreamUsageSyncDeadlock(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down