Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Improvements to preAcks. #4006

Merged
merged 2 commits into from Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 9 additions & 16 deletions server/jetstream_cluster.go
Expand Up @@ -2567,16 +2567,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Check for any preAcks in case we are interest based.
mset.mu.Lock()
seq := lseq + 1 - mset.clfs
var shouldAck bool
if len(mset.preAcks) > 0 {
if _, shouldAck = mset.preAcks[seq]; shouldAck {
delete(mset.preAcks, seq)
}
}
mset.clearAllPreAcks(seq)
mset.mu.Unlock()
if shouldAck {
mset.ackMsg(nil, seq)
}
continue
}

Expand All @@ -2590,7 +2582,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Messages to be skipped have no subject or timestamp or msg or hdr.
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
// Skip and update our lseq.
mset.setLastSeq(mset.store.SkipMsg())
last := mset.store.SkipMsg()
mset.setLastSeq(last)
mset.clearAllPreAcks(last)
continue
}

Expand Down Expand Up @@ -7200,6 +7194,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
mset.store.Compact(snap.FirstSeq)
mset.store.FastState(&state)
mset.setLastSeq(state.LastSeq)
mset.clearAllPreAcksBelowFloor(state.FirstSeq)
}
// Range the deleted and delete if applicable.
for _, dseq := range snap.Deleted {
Expand Down Expand Up @@ -7614,12 +7609,10 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
ddloaded := mset.ddloaded
tierName := mset.tier

if len(mset.preAcks) > 0 {
if _, shouldSkip := mset.preAcks[seq]; shouldSkip {
delete(mset.preAcks, seq)
// Mark this to be skipped
subj, ts = _EMPTY_, 0
}
if mset.hasAllPreAcks(seq) {
mset.clearAllPreAcks(seq)
// Mark this to be skipped
subj, ts = _EMPTY_, 0
}
mset.mu.Unlock()

Expand Down
177 changes: 157 additions & 20 deletions server/norace_test.go
Expand Up @@ -6843,15 +6843,15 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
}
}

// We test an interest based stream that has a cluster with a node with asymmetric paths from
// the stream leader and the consumer leader such that the consumer leader path is fast and
// replicated acks arrive sooner then the actual message. This path was considered, but also
// categorized as very rare and was expensive as it tried to forward a new stream msg delete
// proposal to the original stream leader. It now will deal with the issue locally and not
// slow down the ingest rate to the stream's publishers.
func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T) {
// Uncomment to run. Do not want as part of Travis tests atm.
skip(t)
// Unbalanced stretch cluster.
// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.
//
// Route Ports
// "S1": 14622
// "S2": 15622
// "S3": 16622
func createStretchUnbalancedCluster(t testing.TB) (c *cluster, np *netProxy) {
t.Helper()

tmpl := `
listen: 127.0.0.1:-1
Expand All @@ -6868,24 +6868,16 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

// Route Ports
// "S1": 14622,
// "S2": 15622,
// "S3": 16622,

// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.

// Do these in order, S1, S2 (proxy) then S3.
c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}
c = &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}

// S1
conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622")
c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf)))

// S2
// Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT.
np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
np = createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL())
conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes)
c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf)))
Expand All @@ -6896,6 +6888,21 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T

c.checkClusterFormed()
c.waitOnClusterReady()

return c, np
}

// We test an interest based stream that has a cluster with a node with asymmetric paths from
// the stream leader and the consumer leader such that the consumer leader path is fast and
// replicated acks arrive sooner then the actual message. This path was considered, but also
// categorized as very rare and was expensive as it tried to forward a new stream msg delete
// proposal to the original stream leader. It now will deal with the issue locally and not
// slow down the ingest rate to the stream's publishers.
func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T) {
// Uncomment to run. Do not want as part of Travis tests atm.
skip(t)

c, np := createStretchUnbalancedCluster(t)
defer c.shutdown()
defer np.stop()

Expand Down Expand Up @@ -6932,7 +6939,7 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", sl)
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})
Expand Down Expand Up @@ -7435,3 +7442,133 @@ func TestNoRaceFileStoreNumPending(t *testing.T) {
}
}
}

func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T) {
c, np := createStretchUnbalancedCluster(t)
defer c.shutdown()
defer np.stop()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})

// Create a fast ack consumer.
_, err = js.Subscribe("EV.NEW", func(m *nats.Msg) {
m.Ack()
}, nats.Durable("C"), nats.ManualAck())
require_NoError(t, err)

// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})

// Connect a client directly to the stream leader.
nc, js = jsClientConnect(t, sl)
defer nc.Close()

// Now create a pull subscriber.
sub, err := js.PullSubscribe("EV.NEW", "D", nats.ManualAck())
require_NoError(t, err)

// Make sure this consumer leader is on S1.
cl = c.servers[0]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "D")
if s := c.consumerLeader(globalAccountName, "EVENTS", "D"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "D")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})

numToSend := 1000
for i := 0; i < numToSend; i++ {
_, err := js.PublishAsync("EV.NEW", nil)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(20 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Now make sure we can pull messages since we have not acked.
// The bug is that the acks arrive on S1 faster then the messages but we want to
// make sure we do not remove prematurely.
msgs, err := sub.Fetch(100, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == 100)
for _, m := range msgs {
m.AckSync()
}

ci, err := js.ConsumerInfo("EVENTS", "D")
require_NoError(t, err)
require_True(t, ci.NumPending == uint64(numToSend-100))
require_True(t, ci.NumAckPending == 0)
require_True(t, ci.Delivered.Stream == 100)
require_True(t, ci.AckFloor.Stream == 100)

// Check stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 900)
require_True(t, state.FirstSeq == 101)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
}

msgs, err = sub.Fetch(900, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == 900)
for _, m := range msgs {
m.AckSync()
}

// Let acks propagate.
time.Sleep(250 * time.Millisecond)

// Check final stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 0)
require_True(t, state.FirstSeq == 1001)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
// Now check preAcks
mset.mu.RLock()
numPreAcks := len(mset.preAcks)
mset.mu.RUnlock()
require_True(t, numPreAcks == 0)
}
}