Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] When taking over make sure to sync and reset clfs for clustered streams. #4365

Merged
merged 1 commit into from Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 10 additions & 7 deletions server/jetstream_cluster.go
Expand Up @@ -2291,9 +2291,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

case isLeader = <-lch:
if isLeader {
if sendSnapshot && mset != nil && n != nil {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
if mset != nil && n != nil {
// Send a snapshot if being asked or if we are tracking
// a failed state so that followers sync.
if clfs := mset.clearCLFS(); clfs > 0 || sendSnapshot {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
}
if isRestore {
acc, _ := s.LookupAccount(sa.Client.serviceAccount())
Expand Down Expand Up @@ -2714,15 +2718,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// Grab last sequence and CLFS.
last, clfs := mset.lastSeqAndCLFS()

// We can skip if we know this is less than what we already have.
if lseq-clfs < last {
s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
mset.account(), mset.name(), lseq+1-clfs, last)
// Check for any preAcks in case we are interest based.

mset.mu.Lock()
seq := lseq + 1 - mset.clfs
mset.clearAllPreAcks(seq)
// Check for any preAcks in case we are interest based.
mset.clearAllPreAcks(lseq + 1 - mset.clfs)
mset.mu.Unlock()
continue
}
Expand Down
200 changes: 200 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4785,3 +4785,203 @@ func TestJetStreamAccountUsageDrifts(t *testing.T) {
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
}
}

func TestJetStreamClusterStreamFailTracking(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

m := nats.NewMsg("foo")
m.Data = []byte("OK")

b, bsz := 0, 5
sendBatch := func() {
for i := b * bsz; i < b*bsz+bsz; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}
b++
}

sendBatch()

_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

sendBatch()

// Now stop one and restart.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
mset, err := nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
// Reset raft
mset.resetClusteredState(nil)
time.Sleep(100 * time.Millisecond)

nl.Shutdown()
nl.WaitForShutdown()

sendBatch()

nl = c.restartServer(nl)

sendBatch()

for {
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
if nl == c.streamLeader(globalAccountName, "TEST") {
break
}
}

sendBatch()

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)

// Make sure all in order.
errCh := make(chan error, 100)
var wg sync.WaitGroup
wg.Add(1)

expected, seen := b*bsz, 0

sub, err := js.Subscribe("foo", func(msg *nats.Msg) {
expectedID := fmt.Sprintf("ID:%d", seen)
if v := msg.Header.Get(JSMsgId); v != expectedID {
errCh <- err
wg.Done()
msg.Sub.Unsubscribe()
return
}
seen++
if seen >= expected {
wg.Done()
msg.Sub.Unsubscribe()
}
})
require_NoError(t, err)
defer sub.Unsubscribe()

wg.Wait()
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}

func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

m := nats.NewMsg("foo")
m.Data = []byte("OK")

// Send 1000 a dupe every msgID.
for i := 0; i < 1000; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}

// Now stop one.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
nl.Shutdown()
nl.WaitForShutdown()

// Now send more and make sure leader snapshots.
for i := 1000; i < 2000; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}

sl := c.streamLeader(globalAccountName, "TEST")
mset, err := sl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
require_NotNil(t, node)
node.InstallSnapshot(mset.stateSnapshot())

// Now restart nl
nl = c.restartServer(nl)
c.waitOnServerCurrent(nl)

// Move leader to NL
for {
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
if nl == c.streamLeader(globalAccountName, "TEST") {
break
}
}

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)

// Make sure all in order.
errCh := make(chan error, 100)
var wg sync.WaitGroup
wg.Add(1)

expected, seen := 2000, 0

sub, err := js.Subscribe("foo", func(msg *nats.Msg) {
expectedID := fmt.Sprintf("ID:%d", seen)
if v := msg.Header.Get(JSMsgId); v != expectedID {
errCh <- err
wg.Done()
msg.Sub.Unsubscribe()
return
}
seen++
if seen >= expected {
wg.Done()
msg.Sub.Unsubscribe()
}
})
require_NoError(t, err)
defer sub.Unsubscribe()

wg.Wait()
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}
8 changes: 8 additions & 0 deletions server/stream.go
Expand Up @@ -838,6 +838,14 @@ func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
return mset.lseq, mset.clfs
}

func (mset *stream) clearCLFS() uint64 {
mset.mu.Lock()
defer mset.mu.Unlock()
clfs := mset.clfs
mset.clfs = 0
return clfs
}

func (mset *stream) lastSeq() uint64 {
mset.mu.RLock()
lseq := mset.lseq
Expand Down