Skip to content

Commit

Permalink
Fix for data race when changing retention policy
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Sep 17, 2023
1 parent 6f38056 commit 661b2ff
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 1 deletion.
6 changes: 5 additions & 1 deletion server/consumer.go
Expand Up @@ -2667,12 +2667,16 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {

mset := o.mset
clustered := o.node != nil

// In case retention changes for a stream, this ought to have been updated
// using the consumer lock to avoid a race.
retention := o.retention
o.mu.Unlock()

// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered this will be handled by processReplicatedAck
// after the ack has propagated.
if !clustered && mset != nil && mset.cfg.Retention != LimitsPolicy {
if !clustered && mset != nil && retention != LimitsPolicy {
if sagap > 1 {
// FIXME(dlc) - This is very inefficient, will need to fix.
for seq := sseq; seq > sseq-sagap; seq-- {
Expand Down
93 changes: 93 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21510,6 +21510,99 @@ func TestJetStreamLimitsToInterestPolicy(t *testing.T) {
require_Equal(t, info.State.Msgs, 10)
}

func TestJetStreamLimitsToInterestPolicyWhileAcking(t *testing.T) {
for _, st := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.leader())
defer nc.Close()
streamCfg := nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
Storage: st,
Replicas: 3,
}

stream, err := js.AddStream(&streamCfg)
require_NoError(t, err)

wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
payload := []byte(strings.Repeat("A", 128))

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}
js.Publish("foo", payload)
}
}()
for i := 0; i < 5; i++ {
cname := fmt.Sprintf("test_%d", i)
sub, err := js.PullSubscribe("foo", cname)
require_NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}

msgs, err := sub.Fetch(1)
if err != nil && errors.Is(err, nats.ErrTimeout) {
t.Logf("ERROR: %v", err)
}
for _, msg := range msgs {
msg.Ack()
}
}
}()
}
// Leave running for a few secs then do the change on a different connection.
time.Sleep(5 * time.Second)
nc2, js2 := jsClientConnect(t, c.leader())
defer nc2.Close()

// Try updating to interest-based and changing replicas too.
streamCfg = stream.Config
streamCfg.Retention = nats.InterestPolicy
_, err = js2.UpdateStream(&streamCfg)
require_NoError(t, err)

// We need to wait for all nodes to have applied the new stream
// configuration.
c.waitOnAllCurrent()

var retention nats.RetentionPolicy
checkFor(t, 15*time.Second, 500*time.Millisecond, func() error {
info, err := js2.StreamInfo("TEST", nats.MaxWait(500*time.Millisecond))
if err != nil {
return err
}
retention = info.Config.Retention
return nil
})
require_Equal(t, retention, nats.InterestPolicy)

// Cancel and wait for goroutines underneath.
cancel()
wg.Wait()
})
}
}

func TestJetStreamUsageSyncDeadlock(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down

0 comments on commit 661b2ff

Please sign in to comment.