Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Performance issues with checkAckFloor with large first stream sequence. #4226

Merged
merged 1 commit into from Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 44 additions & 24 deletions server/consumer.go
Expand Up @@ -3256,10 +3256,10 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) {
// Should only be called from consumer leader.
func (o *consumer) checkAckFloor() {
o.mu.RLock()
mset, closed, asflr := o.mset, o.closed, o.asflr
mset, closed, asflr, numPending := o.mset, o.closed, o.asflr, len(o.pending)
o.mu.RUnlock()

if closed || mset == nil {
if asflr == 0 || closed || mset == nil {
return
}

Expand All @@ -3271,19 +3271,46 @@ func (o *consumer) checkAckFloor() {
return
}

// Process all messages that no longer exist.
for seq := asflr + 1; seq < ss.FirstSeq; seq++ {
// Check if this message was pending.
// Check which linear space is less to walk.
if ss.FirstSeq-asflr-1 < uint64(numPending) {
// Process all messages that no longer exist.
for seq := asflr + 1; seq < ss.FirstSeq; seq++ {
// Check if this message was pending.
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)
}
}
} else if numPending > 0 {
// here it shorter to walk pending.
// toTerm is seq, dseq, rcd for each entry.
toTerm := make([]uint64, 0, numPending*3)
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
for seq, p := range o.pending {
if seq < ss.FirstSeq {
var dseq uint64 = 1
if p != nil {
dseq = p.Sequence
}
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
toTerm = append(toTerm, seq, dseq, rdc)
}
}
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)

for i := 0; i < len(toTerm); i += 3 {
seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2]
o.processTerm(seq, dseq, rdc)
}
}

Expand Down Expand Up @@ -3324,20 +3351,13 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
return
}

// Track if we are interest retention policy, if not we can skip the ack floor check.
isInterestRetention := mset.isInterestRetention()

checkAckFloor := func() {
if isInterestRetention {
o.checkAckFloor()
}
}

// We will check this on entry and periodically.
checkAckFloor()
o.checkAckFloor()

// How often we will check for ack floor drift.
var ackFloorCheck = 30 * time.Second
// Spread these out for large numbers on a server restart.
delta := time.Duration(rand.Int63n(int64(time.Minute)))
var ackFloorCheck = time.Minute + delta

for {
select {
Expand All @@ -3353,7 +3373,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
o.suppressDeletion()
}
case <-time.After(ackFloorCheck):
checkAckFloor()
o.checkAckFloor()
case <-qch:
return
case <-s.quitCh:
Expand Down
141 changes: 3 additions & 138 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3236,142 +3236,7 @@ func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) {
checkAllLeaders()
}

func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

cluster {
name: "F3"
listen: 127.0.0.1:%d
routes = [%s]
}

accounts {
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

// Route Ports
// "S1": 14622,
// "S2": 15622,
// "S3": 16622,

// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.

// Do these in order, S1, S2 (proxy) then S3.
c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}

// S1
conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622")
c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf)))

// S2
// Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT.
np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL())
conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes)
c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf)))

// S3
conf = fmt.Sprintf(tmpl, "S3", t.TempDir(), 16622, "route://127.0.0.1:14622, route://127.0.0.1:15622")
c.servers[2], c.opts[2] = RunServerWithConfig(createConfFile(t, []byte(conf)))

c.checkClusterFormed()
c.waitOnClusterReady()
defer c.shutdown()
defer np.stop()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})

// Now create the consumer.
_, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{
Durable: "C",
AckPolicy: nats.AckExplicitPolicy,
DeliverSubject: "dx",
})
require_NoError(t, err)

// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", sl)
}
return nil
})

// Create the real consumer on the consumer leader to make it efficient.
nc, js = jsClientConnect(t, cl)
defer nc.Close()

_, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) {
msg.Ack()
}, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck())
require_NoError(t, err)

for i := 0; i < 1_000; i++ {
_, err := js.PublishAsync("EVENTS.PAID", []byte("ok"))
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

slow := c.servers[0]
mset, err := slow.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)

// Make sure preAck is non-nil, so we know the logic has kicked in.
mset.mu.RLock()
preAcks := mset.preAcks
mset.mu.RUnlock()
require_NotNil(t, preAcks)

checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
state := mset.state()
if state.Msgs == 0 {
mset.mu.RLock()
lp := len(mset.preAcks)
mset.mu.RUnlock()
if lp == 0 {
return nil
} else {
t.Fatalf("Expected no preAcks with no msgs, but got %d", lp)
}
}
return fmt.Errorf("Still have %d msgs left", state.Msgs)
})

}

func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) {
func TestJetStreamClusterInterestLeakOnDisableJetStream(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -3410,7 +3275,7 @@ func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) {
}
}

func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) {
func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -3517,7 +3382,7 @@ func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) {
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and
// make sure the system resolves itself.
func TestJetStreamConsumerAckFloorDrift(t *testing.T) {
func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down