Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved publisher performance under some instances of asymmetric network latency clusters. #3981

Merged
merged 1 commit into from Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -2185,6 +2185,15 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

case <-t.C:
doSnapshot()
// Check is we have preAcks left over if we have become the leader.
if isLeader {
mset.mu.Lock()
if mset.preAcks != nil {
mset.preAcks = nil
}
mset.mu.Unlock()
}

case <-uch:
// keep stream assignment current
sa = mset.streamAssignment()
Expand Down
188 changes: 182 additions & 6 deletions server/norace_test.go
Expand Up @@ -6551,11 +6551,13 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
// Test params.
numSourceStreams := 20
numConsumersPerSource := 1
numPullersForAggregate := 50
numPullersPerConsumer := 50
numPublishers := 100
setHighStartSequence := false
simulateMaxRedeliveries := false
testTime := 60 * time.Minute // make sure to do --timeout=65m
maxBadPubTimes := uint32(20)
badPubThresh := 5 * time.Second
testTime := 5 * time.Minute // make sure to do --timeout=65m

t.Logf("Starting Test: Total Test Time %v", testTime)

Expand Down Expand Up @@ -6673,11 +6675,11 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
)
require_NoError(t, err)

t.Logf("Creating %d Pull Subscribers", numPullersForAggregate)
t.Logf("Creating %d x 2 Pull Subscribers", numPullersPerConsumer)

// Now create the pullers.
for _, subName := range []string{"C1", "C2"} {
for i := 0; i < numPullersForAggregate; i++ {
for i := 0; i < numPullersPerConsumer; i++ {
go func(subName string) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
Expand All @@ -6698,8 +6700,10 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
}
// Shuffle
rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })

// Wait for a random interval up to 100ms.
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

for _, m := range msgs {
// If we want to simulate max redeliveries being hit, since not acking
// once will cause it due to subscriber setup.
Expand Down Expand Up @@ -6751,6 +6755,9 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {

t.Logf("Creating %d Publishers", numPublishers)

var numLimitsExceeded atomic.Uint32
errCh := make(chan error, 100)

for i := 0; i < numPublishers; i++ {
go func() {
nc, js := jsClientConnect(t, c.randomServer())
Expand All @@ -6769,8 +6776,13 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
return
}
elapsed := time.Since(start)
if elapsed > 5*time.Second {
if elapsed > badPubThresh {
t.Logf("Publish time took more than expected: %v", elapsed)
numLimitsExceeded.Add(1)
if ne := numLimitsExceeded.Load(); ne > maxBadPubTimes {
errCh <- fmt.Errorf("Too many exceeded times on publish: %d", ne)
return
}
}
updatePubStats(elapsed)
}
Expand Down Expand Up @@ -6823,5 +6835,169 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {

}()

time.Sleep(testTime)
select {
case e := <-errCh:
t.Fatal(e)
case <-time.After(testTime):
t.Fatalf("Did not receive completion signal")
}
}

// We test an interest based stream that has a cluster with a node with asymmetric paths from
// the stream leader and the consumer leader such that the consumer leader path is fast and
// replicated acks arrive sooner then the actual message. This path was considered, but also
// categorized as very rare and was expensive as it tried to forward a new stream msg delete
// proposal to the original stream leader. It now will deal with the issue locally and not
// slow down the ingest rate to the stream's publishers.
func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T) {
// Uncomment to run. Do not want as part of Travis tests atm.
skip(t)

tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

cluster {
name: "F3"
listen: 127.0.0.1:%d
routes = [%s]
}

accounts {
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

// Route Ports
// "S1": 14622,
// "S2": 15622,
// "S3": 16622,

// S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path.

// Do these in order, S1, S2 (proxy) then S3.
c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"}

// S1
conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622")
c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf)))

// S2
// Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT.
np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true)
routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL())
conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes)
c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf)))

// S3
conf = fmt.Sprintf(tmpl, "S3", t.TempDir(), 16622, "route://127.0.0.1:14622, route://127.0.0.1:15622")
c.servers[2], c.opts[2] = RunServerWithConfig(createConfFile(t, []byte(conf)))

c.checkClusterFormed()
c.waitOnClusterReady()
defer c.shutdown()
defer np.stop()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})

// Now create the consumer.
_, err = js.PullSubscribe(_EMPTY_, "C", nats.BindStream("EVENTS"), nats.ManualAck())
require_NoError(t, err)

// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", sl)
}
return nil
})

go func(js nats.JetStream) {
sub, err := js.PullSubscribe(_EMPTY_, "C", nats.BindStream("EVENTS"), nats.ManualAck())
require_NoError(t, err)

for {
msgs, err := sub.Fetch(100, nats.MaxWait(2*time.Second))
if err != nil && err != nats.ErrTimeout {
return
}
// Shuffle
rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
for _, m := range msgs {
m.Ack()
}
}
}(js)

numPublishers := 25
pubThresh := 2 * time.Second
var maxExceeded atomic.Int64
errCh := make(chan error, numPublishers)
wg := sync.WaitGroup{}

msg := make([]byte, 2*1024) // 2k payload
rand.Read(msg)

// 25 publishers.
for i := 0; i < numPublishers; i++ {
wg.Add(1)
go func(iter int) {
defer wg.Done()

// Connect to random, the slow ones will be connected to the slow node.
// But if you connect them all there it will pass.
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

for i := 0; i < 1_000; i++ {
start := time.Now()
_, err := js.Publish("EV.PAID", msg)
if err != nil {
errCh <- fmt.Errorf("Publish error: %v", err)
return
}
if elapsed := time.Since(start); elapsed > pubThresh {
errCh <- fmt.Errorf("Publish time exceeded")
if int64(elapsed) > maxExceeded.Load() {
maxExceeded.Store(int64(elapsed))
}
return
}
}
}(i)
}

wg.Wait()

select {
case e := <-errCh:
t.Fatalf("%v: threshold is %v, maximum seen: %v", e, pubThresh, time.Duration(maxExceeded.Load()))
default:
}
}
2 changes: 1 addition & 1 deletion server/raft.go
Expand Up @@ -2092,7 +2092,7 @@ func (n *raft) runAsLeader() {
continue
}
n.sendAppendEntry(entries)
// We need to re-craete `entries` because there is a reference
// We need to re-create `entries` because there is a reference
// to it in the node's pae map.
entries = nil
}
Expand Down
28 changes: 23 additions & 5 deletions server/stream.go
Expand Up @@ -230,6 +230,9 @@ type stream struct {
sigq *ipQueue[*cMsg]
csl *Sublist

// For non limits policy streams when they process an ack before the actual msg.
preAcks map[uint64]struct{}

// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
sa *streamAssignment
Expand Down Expand Up @@ -3995,7 +3998,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
} else {
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
// Check for preAcks and the need to skip vs store.
var shouldSkip bool
if _, shouldSkip = mset.preAcks[seq]; shouldSkip {
delete(mset.preAcks, seq)
}
if shouldSkip {
store.SkipMsg()
} else {
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
}
}

if err != nil {
Expand Down Expand Up @@ -4854,10 +4866,16 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
if shouldRemove {
if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF {
// This should be rare but I have seen it.
// The ack reached us before the actual msg with AckNone and InterestPolicy.
if n := mset.raftNode(); n != nil {
md := streamMsgDelete{Seq: seq, NoErase: true, Stream: mset.cfg.Name}
n.ForwardProposal(encodeMsgDelete(&md))
// The ack reached us before the actual msg.
var state StreamState
mset.store.FastState(&state)
if seq >= state.LastSeq {
mset.mu.Lock()
if mset.preAcks == nil {
mset.preAcks = make(map[uint64]struct{})
}
mset.preAcks[seq] = struct{}{}
mset.mu.Unlock()
}
}
}
Expand Down