Skip to content

Commit

Permalink
[FIXED] Performance issues with checkAckFloor with large first stream…
Browse files Browse the repository at this point in the history
… sequence. (#4226)

Bail early if new consumer, meaning stream sequence floor is 0.
Decide which linear space to scan.
Do no work if no pending and we just need to adjust which we do at the
end.

Also realized some tests were named wrong and were not being run, or
were in wrong file.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
neilalexander committed Jun 9, 2023
2 parents d3bde2c + 9eeffbc commit 9d2ae42
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 162 deletions.
68 changes: 44 additions & 24 deletions server/consumer.go
Expand Up @@ -3256,10 +3256,10 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) {
// Should only be called from consumer leader.
func (o *consumer) checkAckFloor() {
o.mu.RLock()
mset, closed, asflr := o.mset, o.closed, o.asflr
mset, closed, asflr, numPending := o.mset, o.closed, o.asflr, len(o.pending)
o.mu.RUnlock()

if closed || mset == nil {
if asflr == 0 || closed || mset == nil {
return
}

Expand All @@ -3271,19 +3271,46 @@ func (o *consumer) checkAckFloor() {
return
}

// Process all messages that no longer exist.
for seq := asflr + 1; seq < ss.FirstSeq; seq++ {
// Check if this message was pending.
// Check which linear space is less to walk.
if ss.FirstSeq-asflr-1 < uint64(numPending) {
// Process all messages that no longer exist.
for seq := asflr + 1; seq < ss.FirstSeq; seq++ {
// Check if this message was pending.
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)
}
}
} else if numPending > 0 {
// here it shorter to walk pending.
// toTerm is seq, dseq, rcd for each entry.
toTerm := make([]uint64, 0, numPending*3)
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
for seq, p := range o.pending {
if seq < ss.FirstSeq {
var dseq uint64 = 1
if p != nil {
dseq = p.Sequence
}
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
toTerm = append(toTerm, seq, dseq, rdc)
}
}
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)

for i := 0; i < len(toTerm); i += 3 {
seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2]
o.processTerm(seq, dseq, rdc)
}
}

Expand Down Expand Up @@ -3324,20 +3351,13 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
return
}

// Track if we are interest retention policy, if not we can skip the ack floor check.
isInterestRetention := mset.isInterestRetention()

checkAckFloor := func() {
if isInterestRetention {
o.checkAckFloor()
}
}

// We will check this on entry and periodically.
checkAckFloor()
o.checkAckFloor()

// How often we will check for ack floor drift.
var ackFloorCheck = 30 * time.Second
// Spread these out for large numbers on a server restart.
delta := time.Duration(rand.Int63n(int64(time.Minute)))
var ackFloorCheck = time.Minute + delta

for {
select {
Expand All @@ -3353,7 +3373,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
o.suppressDeletion()
}
case <-time.After(ackFloorCheck):
checkAckFloor()
o.checkAckFloor()
case <-qch:
return
case <-s.quitCh:
Expand Down
141 changes: 3 additions & 138 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3236,142 +3236,7 @@ func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) {
checkAllLeaders()
}

func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: "F3"
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

// Route Ports
// "S1": 14622,
// "S2": 15622,
// "S3": 16622,

// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.

// Do these in order, S1, S2 (proxy) then S3.
c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}

// S1
conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622")
c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf)))

// S2
// Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT.
np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL())
conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes)
c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf)))

// S3
conf = fmt.Sprintf(tmpl, "S3", t.TempDir(), 16622, "route://127.0.0.1:14622, route://127.0.0.1:15622")
c.servers[2], c.opts[2] = RunServerWithConfig(createConfFile(t, []byte(conf)))

c.checkClusterFormed()
c.waitOnClusterReady()
defer c.shutdown()
defer np.stop()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})

// Now create the consumer.
_, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{
Durable: "C",
AckPolicy: nats.AckExplicitPolicy,
DeliverSubject: "dx",
})
require_NoError(t, err)

// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", sl)
}
return nil
})

// Create the real consumer on the consumer leader to make it efficient.
nc, js = jsClientConnect(t, cl)
defer nc.Close()

_, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) {
msg.Ack()
}, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck())
require_NoError(t, err)

for i := 0; i < 1_000; i++ {
_, err := js.PublishAsync("EVENTS.PAID", []byte("ok"))
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

slow := c.servers[0]
mset, err := slow.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)

// Make sure preAck is non-nil, so we know the logic has kicked in.
mset.mu.RLock()
preAcks := mset.preAcks
mset.mu.RUnlock()
require_NotNil(t, preAcks)

checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
state := mset.state()
if state.Msgs == 0 {
mset.mu.RLock()
lp := len(mset.preAcks)
mset.mu.RUnlock()
if lp == 0 {
return nil
} else {
t.Fatalf("Expected no preAcks with no msgs, but got %d", lp)
}
}
return fmt.Errorf("Still have %d msgs left", state.Msgs)
})

}

func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) {
func TestJetStreamClusterInterestLeakOnDisableJetStream(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -3410,7 +3275,7 @@ func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) {
}
}

func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) {
func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -3517,7 +3382,7 @@ func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) {
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and
// make sure the system resolves itself.
func TestJetStreamConsumerAckFloorDrift(t *testing.T) {
func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down

0 comments on commit 9d2ae42

Please sign in to comment.