Skip to content

Commit

Permalink
[IMPROVED] Improvements to preAcks. (#4006)
Browse files Browse the repository at this point in the history
Better handling of multiple consumers so as to not delete messages too
early.
Better cleanup handling.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 31, 2023
2 parents 7862881 + 8c0a45e commit 5e85889
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 69 deletions.
25 changes: 9 additions & 16 deletions server/jetstream_cluster.go
Expand Up @@ -2567,16 +2567,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Check for any preAcks in case we are interest based.
mset.mu.Lock()
seq := lseq + 1 - mset.clfs
var shouldAck bool
if len(mset.preAcks) > 0 {
if _, shouldAck = mset.preAcks[seq]; shouldAck {
delete(mset.preAcks, seq)
}
}
mset.clearAllPreAcks(seq)
mset.mu.Unlock()
if shouldAck {
mset.ackMsg(nil, seq)
}
continue
}

Expand All @@ -2590,7 +2582,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Messages to be skipped have no subject or timestamp or msg or hdr.
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
// Skip and update our lseq.
mset.setLastSeq(mset.store.SkipMsg())
last := mset.store.SkipMsg()
mset.setLastSeq(last)
mset.clearAllPreAcks(last)
continue
}

Expand Down Expand Up @@ -7200,6 +7194,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
mset.store.Compact(snap.FirstSeq)
mset.store.FastState(&state)
mset.setLastSeq(state.LastSeq)
mset.clearAllPreAcksBelowFloor(state.FirstSeq)
}
// Range the deleted and delete if applicable.
for _, dseq := range snap.Deleted {
Expand Down Expand Up @@ -7614,12 +7609,10 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
ddloaded := mset.ddloaded
tierName := mset.tier

if len(mset.preAcks) > 0 {
if _, shouldSkip := mset.preAcks[seq]; shouldSkip {
delete(mset.preAcks, seq)
// Mark this to be skipped
subj, ts = _EMPTY_, 0
}
if mset.hasAllPreAcks(seq) {
mset.clearAllPreAcks(seq)
// Mark this to be skipped
subj, ts = _EMPTY_, 0
}
mset.mu.Unlock()

Expand Down
177 changes: 157 additions & 20 deletions server/norace_test.go
Expand Up @@ -6846,15 +6846,15 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
}
}

// We test an interest based stream that has a cluster with a node with asymmetric paths from
// the stream leader and the consumer leader such that the consumer leader path is fast and
// replicated acks arrive sooner then the actual message. This path was considered, but also
// categorized as very rare and was expensive as it tried to forward a new stream msg delete
// proposal to the original stream leader. It now will deal with the issue locally and not
// slow down the ingest rate to the stream's publishers.
func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T) {
// Uncomment to run. Do not want as part of Travis tests atm.
skip(t)
// Unbalanced stretch cluster.
// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.
//
// Route Ports
// "S1": 14622
// "S2": 15622
// "S3": 16622
func createStretchUnbalancedCluster(t testing.TB) (c *cluster, np *netProxy) {
t.Helper()

tmpl := `
listen: 127.0.0.1:-1
Expand All @@ -6871,24 +6871,16 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

// Route Ports
// "S1": 14622,
// "S2": 15622,
// "S3": 16622,

// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.

// Do these in order, S1, S2 (proxy) then S3.
c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}
c = &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}

// S1
conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622")
c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf)))

// S2
// Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT.
np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
np = createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL())
conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes)
c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf)))
Expand All @@ -6899,6 +6891,21 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T

c.checkClusterFormed()
c.waitOnClusterReady()

return c, np
}

// We test an interest based stream that has a cluster with a node with asymmetric paths from
// the stream leader and the consumer leader such that the consumer leader path is fast and
// replicated acks arrive sooner then the actual message. This path was considered, but also
// categorized as very rare and was expensive as it tried to forward a new stream msg delete
// proposal to the original stream leader. It now will deal with the issue locally and not
// slow down the ingest rate to the stream's publishers.
func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T) {
// Uncomment to run. Do not want as part of Travis tests atm.
skip(t)

c, np := createStretchUnbalancedCluster(t)
defer c.shutdown()
defer np.stop()

Expand Down Expand Up @@ -6935,7 +6942,7 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", sl)
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})
Expand Down Expand Up @@ -7438,3 +7445,133 @@ func TestNoRaceFileStoreNumPending(t *testing.T) {
}
}
}

func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T) {
c, np := createStretchUnbalancedCluster(t)
defer c.shutdown()
defer np.stop()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})

// Create a fast ack consumer.
_, err = js.Subscribe("EV.NEW", func(m *nats.Msg) {
m.Ack()
}, nats.Durable("C"), nats.ManualAck())
require_NoError(t, err)

// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})

// Connect a client directly to the stream leader.
nc, js = jsClientConnect(t, sl)
defer nc.Close()

// Now create a pull subscriber.
sub, err := js.PullSubscribe("EV.NEW", "D", nats.ManualAck())
require_NoError(t, err)

// Make sure this consumer leader is on S1.
cl = c.servers[0]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "D")
if s := c.consumerLeader(globalAccountName, "EVENTS", "D"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "D")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})

numToSend := 1000
for i := 0; i < numToSend; i++ {
_, err := js.PublishAsync("EV.NEW", nil)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(20 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Now make sure we can pull messages since we have not acked.
// The bug is that the acks arrive on S1 faster then the messages but we want to
// make sure we do not remove prematurely.
msgs, err := sub.Fetch(100, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == 100)
for _, m := range msgs {
m.AckSync()
}

ci, err := js.ConsumerInfo("EVENTS", "D")
require_NoError(t, err)
require_True(t, ci.NumPending == uint64(numToSend-100))
require_True(t, ci.NumAckPending == 0)
require_True(t, ci.Delivered.Stream == 100)
require_True(t, ci.AckFloor.Stream == 100)

// Check stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 900)
require_True(t, state.FirstSeq == 101)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
}

msgs, err = sub.Fetch(900, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == 900)
for _, m := range msgs {
m.AckSync()
}

// Let acks propagate.
time.Sleep(250 * time.Millisecond)

// Check final stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 0)
require_True(t, state.FirstSeq == 1001)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
// Now check preAcks
mset.mu.RLock()
numPreAcks := len(mset.preAcks)
mset.mu.RUnlock()
require_True(t, numPreAcks == 0)
}
}

0 comments on commit 5e85889

Please sign in to comment.