Skip to content

Commit

Permalink
Merge pull request #2480 from nats-io/issue_2397
Browse files Browse the repository at this point in the history
[FIXED] Issue #2397
  • Loading branch information
derekcollison committed Sep 1, 2021
2 parents 2539bbb + 60e45ea commit 918aff0
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 35 deletions.
40 changes: 23 additions & 17 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,26 @@ const (
JsDefaultMaxAckPending = 20_000
)

// Helper function to set consumer config defaults from above.
func setConsumerConfigDefaults(config *ConsumerConfig) {
// Set to default if not specified.
if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
}
// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
config.MaxAckPending = JsDefaultMaxAckPending
}
}

func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil)
}
Expand All @@ -291,6 +311,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerConfigRequiredError()
}

// Make sure we have sane defaults.
setConsumerConfigDefaults(config)

if len(config.Description) > JSMaxDescriptionLen {
return nil, NewJSConsumerDescriptionTooLongError(JSMaxDescriptionLen)
}
Expand Down Expand Up @@ -329,10 +352,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if config.MaxWaiting < 0 {
return nil, NewJSConsumerMaxWaitingNegativeError()
}
// Set to default if not specified.
if config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
}
if config.Heartbeat > 0 {
return nil, NewJSConsumerHBRequiresPushError()
}
Expand All @@ -354,19 +373,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}

// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
config.MaxAckPending = JsDefaultMaxAckPending
}

// As best we can make sure the filtered subject is valid.
if config.FilterSubject != _EMPTY_ {
subjects, hasExt := mset.allSubjects()
Expand Down
11 changes: 10 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3045,6 +3045,9 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
return
}

// Make sure we have sane defaults.
setConsumerConfigDefaults(&req.Config)

// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
if req.Config.Direct {
Expand Down Expand Up @@ -3422,9 +3425,15 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// Delaying an error response gives the leader a chance to respond before us
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group)
} else if ca != nil {
js.mu.RLock()
var node RaftNode
if rg := ca.Group; rg != nil && rg.node != nil && rg.isMember(ourID) {
node = rg.node
}
js.mu.RUnlock()
if node != nil {
// Check here if we are a member and this is just a new consumer that does not have a leader yet.
if rg.node.GroupLeader() == _EMPTY_ && !rg.node.HadPreviousLeader() {
if node.GroupLeader() == _EMPTY_ && !node.HadPreviousLeader() {
resp.Error = NewJSConsumerNotFoundError()
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
}
Expand Down
57 changes: 43 additions & 14 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
return true
}
}

return false
}

Expand Down Expand Up @@ -1295,17 +1296,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {

// Make sure we do not leave the apply channel to fill up and block the raft layer.
defer func() {
if n.State() != Closed {
if n.Leader() {
n.StepDown()
}
// Drain the commit channel..
for len(ach) > 0 {
select {
case <-ach:
default:
return
}
if n.State() == Closed {
return
}
if n.Leader() {
n.StepDown()
}
// Drain the commit channel..
for len(ach) > 0 {
select {
case <-ach:
default:
return
}
}
}()
Expand Down Expand Up @@ -1529,11 +1531,35 @@ func (mset *stream) resetClusteredState() bool {
js.mu.Lock()
sa.Group.node = nil
js.mu.Unlock()
go js.processClusterCreateStream(acc, sa)
go js.restartClustered(acc, sa)
}
return true
}

// This will reset the stream and consumers.
// Should be done in separate go routine.
func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
js.processClusterCreateStream(acc, sa)

// Check consumers.
js.mu.Lock()
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
for _, ca := range sa.consumers {
if rg := ca.Group; rg != nil && rg.isMember(ourID) {
rg.node = nil // Erase group raft/node state.
consumers = append(consumers, ca)
}
}
}
js.mu.Unlock()

for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil)
}
}

func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error {
for _, e := range ce.Entries {
if e.Type == EntryNormal {
Expand Down Expand Up @@ -1561,7 +1587,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// We can skip if we know this is less than what we already have.
if lseq < last {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
if !isRecovering {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
}
continue
}

Expand Down Expand Up @@ -4042,8 +4070,9 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
} else {
oname = cfg.Durable
if ca := sa.consumers[oname]; ca != nil && !ca.deleted {
isPull := ca.Config.DeliverSubject == _EMPTY_
// This can be ok if delivery subject update.
shouldErr := !reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config) || ca.pending
shouldErr := isPull || ca.pending || (!reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config))
if !shouldErr {
rr := acc.sl.Match(ca.Config.DeliverSubject)
shouldErr = len(rr.psubs)+len(rr.qsubs) != 0
Expand Down
178 changes: 177 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -7763,7 +7765,7 @@ func TestJetStreamPullConsumerLeakedSubs(t *testing.T) {
defer sub.Unsubscribe()

// Load up a bunch of requests.
numRequests := 20 //100_000
numRequests := 20
for i := 0; i < numRequests; i++ {
js.PublishAsync("Domains.Domain", []byte("QUESTION"))
}
Expand Down Expand Up @@ -8026,6 +8028,11 @@ func TestJetStreamRaceOnRAFTCreate(t *testing.T) {
t.Fatalf("Error creating stream: %v", err)
}

js, err = nc.JetStream(nats.MaxWait(time.Second))
if err != nil {
t.Fatal(err)
}

size := 10
wg := sync.WaitGroup{}
wg.Add(size)
Expand Down Expand Up @@ -8078,6 +8085,175 @@ func TestJetStreamDeadlockOnVarz(t *testing.T) {
wg.Wait()
}

// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers.
func TestJetStreamClusterStreamReset(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("foo.created", []byte("REQ"))
}

// Durable.
sub, err := js.SubscribeSync("foo.created", nats.Durable("d1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}
// Let settle a bit.
time.Sleep(250 * time.Millisecond)

// Grab number go routines.
base := runtime.NumGoroutine()

// Grab a server that is the consumer leader for the durable.
cl := c.consumerLeader("$G", "TEST", "d1")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do a hard reset here by hand.
mset.resetClusteredState()
// Wait til we have the leader elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")

// So do not wait 10s in call in checkFor.
js2, _ := nc.JetStream(nats.MaxWait(100 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
return err
})

// Grab number go routines.
if after := runtime.NumGoroutine(); base > after {
t.Fatalf("Expected %d go routines, got %d", base, after)
}
}

// Issue #2397
func TestJetStreamClusterStreamCatchupNoState(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R2S", 2)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Hold onto servers.
sl := c.streamLeader("$G", "TEST")
if sl == nil {
t.Fatalf("Did not get a server")
}
nsl := c.randomNonStreamLeader("$G", "TEST")
if nsl == nil {
t.Fatalf("Did not get a server")
}
// Grab low level stream and raft node.
mset, err := nsl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
node := mset.raftNode()
if node == nil {
t.Fatalf("Could not get stream group name")
}
gname := node.Group()

numRequests := 100
for i := 0; i < numRequests; i++ {
// This will force a snapshot which will prune the normal log.
// We will remove the snapshot to simulate the error condition.
if i == 10 {
if err := node.InstallSnapshot(mset.stateSnapshot()); err != nil {
t.Fatalf("Error installing snapshot: %v", err)
}
}
js.Publish("foo.created", []byte("REQ"))
}

config := nsl.JetStreamConfig()
if config == nil {
t.Fatalf("No config")
}
lconfig := sl.JetStreamConfig()
if lconfig == nil {
t.Fatalf("No config")
}

nc.Close()
c.stopAll()
// Remove all state by truncating for the non-leader.
for _, fn := range []string{"1.blk", "1.idx", "1.fss"} {
fname := path.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn)
fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms)
if err != nil {
continue
}
fd.Truncate(0)
fd.Close()
}
// For both make sure we have no raft snapshots.
snapDir := path.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)
snapDir = path.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)

// Now restart.
c.restartAll()
for _, cs := range c.servers {
c.waitOnStreamCurrent(cs, "$G", "TEST")
}

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

if _, err := js.Publish("foo.created", []byte("REQ")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.LastSeq != 101 {
t.Fatalf("bad state after restart: %+v", si.State)
}
}

// Support functions

// Used to setup superclusters for tests.
Expand Down

0 comments on commit 918aff0

Please sign in to comment.