Skip to content

Commit

Permalink
Updates to preacks when multiple consumers are mutually exclusive (#4007
Browse files Browse the repository at this point in the history
)

When consumers where mutually exclusive and acks came in before a
message we did not properly cleanup the messages.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 31, 2023
2 parents c194047 + ad5bb36 commit fbaeaf9
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 30 deletions.
7 changes: 4 additions & 3 deletions server/consumer.go
Expand Up @@ -4357,14 +4357,15 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
}

var rmseqs []uint64
mset.mu.RLock()
mset.mu.Lock()
for seq := start; seq <= stop; seq++ {
if !mset.checkInterest(seq, o) {
if mset.noInterest(seq, o) {
rmseqs = append(rmseqs, seq)
}
}
mset.mu.RUnlock()
mset.mu.Unlock()

// These can be removed.
for _, seq := range rmseqs {
mset.store.RemoveMsg(seq)
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -7609,7 +7609,7 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
ddloaded := mset.ddloaded
tierName := mset.tier

if mset.hasAllPreAcks(seq) {
if mset.hasAllPreAcks(seq, subj) {
mset.clearAllPreAcks(seq)
// Mark this to be skipped
subj, ts = _EMPTY_, 0
Expand Down
108 changes: 108 additions & 0 deletions server/norace_test.go
Expand Up @@ -7575,3 +7575,111 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T)
require_True(t, numPreAcks == 0)
}
}

func TestNoRaceJetStreamClusterUnbalancedInterestMultipleFilteredConsumers(t *testing.T) {
c, np := createStretchUnbalancedCluster(t)
defer c.shutdown()
defer np.stop()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})

// Create a fast ack consumer.
_, err = js.Subscribe("EV.NEW", func(m *nats.Msg) {
m.Ack()
}, nats.Durable("C"), nats.ManualAck())
require_NoError(t, err)

// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})

// Connect a client directly to the stream leader.
nc, js = jsClientConnect(t, sl)
defer nc.Close()

// Now create another fast ack consumer.
_, err = js.Subscribe("EV.UPDATED", func(m *nats.Msg) {
m.Ack()
}, nats.Durable("D"), nats.ManualAck())
require_NoError(t, err)

// Make sure this consumer leader is on S1.
cl = c.servers[0]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "D")
if s := c.consumerLeader(globalAccountName, "EVENTS", "D"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "D")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})

numToSend := 500
for i := 0; i < numToSend; i++ {
_, err := js.PublishAsync("EV.NEW", nil)
require_NoError(t, err)
_, err = js.PublishAsync("EV.UPDATED", nil)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(20 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Let acks propagate.
time.Sleep(250 * time.Millisecond)

ci, err := js.ConsumerInfo("EVENTS", "D")
require_NoError(t, err)
require_True(t, ci.NumPending == 0)
require_True(t, ci.NumAckPending == 0)
require_True(t, ci.Delivered.Consumer == 500)
require_True(t, ci.Delivered.Stream == 1000)
require_True(t, ci.AckFloor.Consumer == 500)
require_True(t, ci.AckFloor.Stream == 1000)

// Check final stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 0)
require_True(t, state.FirstSeq == 1001)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
// Now check preAcks
mset.mu.RLock()
numPreAcks := len(mset.preAcks)
mset.mu.RUnlock()
require_True(t, numPreAcks == 0)
}
}
57 changes: 31 additions & 26 deletions server/stream.go
Expand Up @@ -4004,7 +4004,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
// Check for preAcks and the need to skip vs store.
if mset.hasAllPreAcks(seq) {
if mset.hasAllPreAcks(seq, subject) {
mset.clearAllPreAcks(seq)
store.SkipMsg()
} else {
Expand Down Expand Up @@ -4826,8 +4826,22 @@ func (mset *stream) potentialFilteredConsumers() bool {
return false
}

// Check if there is no interest in this sequence number across our consumers.
// The consumer passed is optional if we are processing the ack for that consumer.
// Write lock should be held.
func (mset *stream) noInterest(seq uint64, obs *consumer) bool {
return !mset.checkForInterest(seq, obs)
}

// Check if there is no interest in this sequence number and subject across our consumers.
// The consumer passed is optional if we are processing the ack for that consumer.
// Write lock should be held.
func (mset *stream) noInterestWithSubject(seq uint64, subj string, obs *consumer) bool {
return !mset.checkForInterestWithSubject(seq, subj, obs)
}

// Write lock should be held here for the stream to avoid race conditions on state.
func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
func (mset *stream) checkForInterest(seq uint64, obs *consumer) bool {
var subj string
if mset.potentialFilteredConsumers() {
pmsg := getJSPubMsgFromPool()
Expand All @@ -4839,10 +4853,16 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
mset.registerPreAck(obs, seq)
return true
}
mset.clearAllPreAcks(seq)
return false
}
subj = sm.subj
}
return mset.checkForInterestWithSubject(seq, subj, obs)
}

// Checks for interest given a sequence and subject.
func (mset *stream) checkForInterestWithSubject(seq uint64, subj string, obs *consumer) bool {
for _, o := range mset.consumers {
// If this is us or we have a registered preAck for this consumer continue inspecting.
if o == obs || mset.hasPreAck(o, seq) {
Expand All @@ -4853,6 +4873,7 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
return true
}
}
mset.clearAllPreAcks(seq)
return false
}

Expand All @@ -4870,20 +4891,15 @@ func (mset *stream) hasPreAck(o *consumer, seq uint64) bool {
return found
}

// Check if we have all consumers pre-acked.
// Check if we have all consumers pre-acked for this sequence and subject.
// Write lock should be held.
func (mset *stream) hasAllPreAcks(seq uint64) bool {
if len(mset.preAcks) == 0 {
func (mset *stream) hasAllPreAcks(seq uint64, subj string) bool {
if len(mset.preAcks) == 0 || len(mset.preAcks[seq]) == 0 {
return false
}
return len(mset.preAcks[seq]) >= len(mset.consumers)
}

// Check if we have all consumers pre-acked.
func (mset *stream) clearAllPreAcksLock(seq uint64) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.clearAllPreAcks(seq)
// Since these can be filtered and mutually exclusive,
// if we have some preAcks we need to check all interest here.
return mset.noInterestWithSubject(seq, subj, nil)
}

// Check if we have all consumers pre-acked.
Expand Down Expand Up @@ -4925,13 +4941,6 @@ func (mset *stream) registerPreAck(o *consumer, seq uint64) {
mset.preAcks[seq][o] = struct{}{}
}

// This will clear an ack for a consumer.
func (mset *stream) clearPreAckLock(o *consumer, seq uint64) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.clearPreAck(o, seq)
}

// This will clear an ack for a consumer.
// Write lock should be held.
func (mset *stream) clearPreAck(o *consumer, seq uint64) {
Expand Down Expand Up @@ -4981,25 +4990,21 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
case WorkQueuePolicy:
// Normally we just remove a message when its ack'd here but if we have direct consumers
// from sources and/or mirrors we need to make sure they have delivered the msg.
shouldRemove = mset.directs <= 0 || !mset.checkInterest(seq, o)
shouldRemove = mset.directs <= 0 || mset.noInterest(seq, o)
case InterestPolicy:
shouldRemove = !mset.checkInterest(seq, o)
shouldRemove = mset.noInterest(seq, o)
}
mset.mu.Unlock()

// If nothing else to do.
if !shouldRemove {
// Clear any pending preAcks for this consumer.
mset.clearPreAckLock(o, seq)
return
}

// If we are here we should attempt to remove.
if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF {
// This should not happen, but being pedantic.
mset.registerPreAckLock(o, seq)
} else {
mset.clearAllPreAcksLock(seq)
}
}

Expand Down

0 comments on commit fbaeaf9

Please sign in to comment.