Skip to content

Commit

Permalink
Fix for Issue #2397
Browse files Browse the repository at this point in the history
When we had partial state due to server failure or being shutdown ungracefully we could enter into a stream reset state.
The stream reset state is harsh but worked, however there was a bug that would not restart consumers that were attached.
Also if no state exists, or state was truncated, we can detect that and not go through a full reset.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 1, 2021
1 parent 2539bbb commit d809b02
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 16 deletions.
54 changes: 41 additions & 13 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
return true
}
}

return false
}

Expand Down Expand Up @@ -1295,17 +1296,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {

// Make sure we do not leave the apply channel to fill up and block the raft layer.
defer func() {
if n.State() != Closed {
if n.Leader() {
n.StepDown()
}
// Drain the commit channel..
for len(ach) > 0 {
select {
case <-ach:
default:
return
}
if n.State() == Closed {
return
}
if n.Leader() {
n.StepDown()
}
// Drain the commit channel..
for len(ach) > 0 {
select {
case <-ach:
default:
return
}
}
}()
Expand Down Expand Up @@ -1529,11 +1531,35 @@ func (mset *stream) resetClusteredState() bool {
js.mu.Lock()
sa.Group.node = nil
js.mu.Unlock()
go js.processClusterCreateStream(acc, sa)
go js.restartClustered(acc, sa)
}
return true
}

// This will reset the stream and consumers.
// Should be done in separate go routine.
func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
js.processClusterCreateStream(acc, sa)

// Check consumers.
js.mu.Lock()
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
for _, ca := range sa.consumers {
if rg := ca.Group; rg != nil && rg.isMember(ourID) {
rg.node = nil // Erase group raft/node state.
consumers = append(consumers, ca)
}
}
}
js.mu.Unlock()

for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil)
}
}

func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error {
for _, e := range ce.Entries {
if e.Type == EntryNormal {
Expand Down Expand Up @@ -1561,7 +1587,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// We can skip if we know this is less than what we already have.
if lseq < last {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
if !isRecovering {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
}
continue
}

Expand Down
173 changes: 172 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -7763,7 +7765,7 @@ func TestJetStreamPullConsumerLeakedSubs(t *testing.T) {
defer sub.Unsubscribe()

// Load up a bunch of requests.
numRequests := 20 //100_000
numRequests := 20
for i := 0; i < numRequests; i++ {
js.PublishAsync("Domains.Domain", []byte("QUESTION"))
}
Expand Down Expand Up @@ -8078,6 +8080,175 @@ func TestJetStreamDeadlockOnVarz(t *testing.T) {
wg.Wait()
}

// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers.
func TestJetStreamClusterStreamReset(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("foo.created", []byte("REQ"))
}

// Durable.
sub, err := js.SubscribeSync("foo.created", nats.Durable("d1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}
// Let settle a bit.
time.Sleep(250 * time.Millisecond)

// Grab number go routines.
base := runtime.NumGoroutine()

// Grab a server that is the consumer leader for the durable.
cl := c.consumerLeader("$G", "TEST", "d1")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do a hard reset here by hand.
mset.resetClusteredState()
// Wait til we have the leader elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")

// So do not wait 10s in call in checkFor.
js2, _ := nc.JetStream(nats.MaxWait(100 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
return err
})

// Grab number go routines.
if after := runtime.NumGoroutine(); base > after {
t.Fatalf("Expected %d go routines, got %d", base, after)
}
}

// Issue #2397
func TestJetStreamClusterStreamCatchupNoState(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R2S", 2)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Hold onto servers.
sl := c.streamLeader("$G", "TEST")
if sl == nil {
t.Fatalf("Did not get a server")
}
nsl := c.randomNonStreamLeader("$G", "TEST")
if nsl == nil {
t.Fatalf("Did not get a server")
}
// Grab low level stream and raft node.
mset, err := nsl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
node := mset.raftNode()
if node == nil {
t.Fatalf("Could not get stream group name")
}
gname := node.Group()

numRequests := 100
for i := 0; i < numRequests; i++ {
// This will force a snapshot which will prune the normal log.
// We will remove the snapshot to simulate the error condition.
if i == 10 {
if err := node.InstallSnapshot(mset.stateSnapshot()); err != nil {
t.Fatalf("Error installing snapshot: %v", err)
}
}
js.Publish("foo.created", []byte("REQ"))
}

config := nsl.JetStreamConfig()
if config == nil {
t.Fatalf("No config")
}
lconfig := sl.JetStreamConfig()
if lconfig == nil {
t.Fatalf("No config")
}

nc.Close()
c.stopAll()
// Remove all state by truncating for the non-leader.
for _, fn := range []string{"1.blk", "1.idx", "1.fss"} {
fname := path.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn)
fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms)
if err != nil {
continue
}
fd.Truncate(0)
fd.Close()
}
// For both make sure we have no raft snapshots.
snapDir := path.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)
snapDir = path.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)

// Now restart.
c.restartAll()
for _, cs := range c.servers {
c.waitOnStreamCurrent(cs, "$G", "TEST")
}

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

if _, err := js.Publish("foo.created", []byte("REQ")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.LastSeq != 101 {
t.Fatalf("bad state after restart: %+v", si.State)
}
}

// Support functions

// Used to setup superclusters for tests.
Expand Down
5 changes: 3 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2600,8 +2600,9 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// For clustering the lower layers will pass our expected lseq. If it is present check for that here.
if lseq > 0 && lseq != (mset.lseq+mset.clfs) {
isMisMatch := true
// If our first message for this mirror, see if we have to adjust our starting sequence.
if mset.cfg.Mirror != nil {
// We may be able to recover here if we have no state whatsoever, or we are a mirror.
// See if we have to adjust our starting sequence.
if mset.lseq == 0 || mset.cfg.Mirror != nil {
var state StreamState
mset.store.FastState(&state)
if state.FirstSeq == 0 {
Expand Down

0 comments on commit d809b02

Please sign in to comment.