Skip to content

Commit

Permalink
[IMPROVED] Protect against concurrent creation of streams and consume…
Browse files Browse the repository at this point in the history
…rs. (#4013)

Also make sure we have exited monitoring routines when doing resets for
both streams and consumers.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 3, 2023
2 parents f3cab83 + ff3f102 commit 14ad983
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 52 deletions.
6 changes: 4 additions & 2 deletions server/consumer.go
Expand Up @@ -299,6 +299,8 @@ type consumer struct {
prOk bool
uch chan struct{}
retention RetentionPolicy

monitorWg sync.WaitGroup
inMonitor bool

// R>1 proposals
Expand Down Expand Up @@ -4559,8 +4561,8 @@ func (o *consumer) clearMonitorRunning() {

// Test whether we are in the monitor routine.
func (o *consumer) isMonitorRunning() bool {
o.mu.Lock()
defer o.mu.Unlock()
o.mu.RLock()
defer o.mu.RUnlock()
return o.inMonitor
}

Expand Down
1 change: 1 addition & 0 deletions server/jetstream.go
Expand Up @@ -137,6 +137,7 @@ type jsAccount struct {
js *jetStream
account *Account
storeDir string
inflight sync.Map
streams map[string]*stream
templates map[string]*streamTemplate
store TemplateStore
Expand Down
68 changes: 39 additions & 29 deletions server/jetstream_cluster.go
Expand Up @@ -436,8 +436,11 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {

// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
// Read lock should be held.
func (cc *jetStreamCluster) isStreamHealthy(account, stream string) bool {
func (js *jetStream) isStreamHealthy(account, stream string) bool {
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster

if cc == nil {
// Non-clustered mode
return true
Expand Down Expand Up @@ -477,8 +480,11 @@ func (cc *jetStreamCluster) isStreamHealthy(account, stream string) bool {

// isConsumerCurrent will determine if the consumer is up to date.
// For R1 it will make sure the consunmer is present on this server.
// Read lock should be held.
func (cc *jetStreamCluster) isConsumerCurrent(account, stream, consumer string) bool {
func (js *jetStream) isConsumerCurrent(account, stream, consumer string) bool {
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster

if cc == nil {
// Non-clustered mode
return true
Expand Down Expand Up @@ -2486,41 +2492,42 @@ func (mset *stream) resetClusteredState(err error) bool {

// Preserve our current state and messages unless we have a first sequence mismatch.
shouldDelete := err == errFirstSequenceMismatch

mset.monitorWg.Wait()
mset.resetAndWaitOnConsumers()
// Stop our stream.
mset.stop(shouldDelete, false)

if sa != nil {
s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
js.mu.Lock()
s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
// Now wipe groups from assignments.
sa.Group.node = nil
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
for _, ca := range sa.consumers {
if rg := ca.Group; rg != nil && rg.isMember(ourID) {
rg.node = nil // Erase group raft/node state.
consumers = append(consumers, ca)
}
}
}
js.mu.Unlock()
go js.restartClustered(acc, sa)
}
return true
}

// This will reset the stream and consumers.
// Should be done in separate go routine.
func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
// Check and collect consumers first.
js.mu.RLock()
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
for _, ca := range sa.consumers {
if rg := ca.Group; rg != nil && rg.isMember(ourID) {
rg.node = nil // Erase group raft/node state.
consumers = append(consumers, ca)
// restart in a separate Go routine.
// This will reset the stream and consumers.
go func() {
// Reset stream.
js.processClusterCreateStream(acc, sa)
// Reset consumers.
for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil, false)
}
}
}()
}
js.mu.RUnlock()

// Reset stream.
js.processClusterCreateStream(acc, sa)
// Reset consumers.
for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil, false)
}
return true
}

func isControlHdr(hdr []byte) bool {
Expand Down Expand Up @@ -3974,6 +3981,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Clustered consumer.
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
o.monitorWg.Add(1)
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
}
// For existing consumer, only send response if not recovering.
Expand Down Expand Up @@ -4172,6 +4180,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
s, n, cc := js.server(), o.raftNode(), js.cluster
defer s.grWG.Done()

defer o.monitorWg.Done()

if n == nil {
s.Warnf("No RAFT group for '%s > %s > %s'", o.acc.Name, ca.Stream, ca.Name)
return
Expand Down
4 changes: 2 additions & 2 deletions server/monitor.go
Expand Up @@ -3129,15 +3129,15 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
for stream, sa := range asa {
if sa.Group.isMember(ourID) {
// Make sure we can look up
if !cc.isStreamHealthy(acc, stream) {
if !js.isStreamHealthy(acc, stream) {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream)
return health
}
// Now check consumers.
for consumer, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
if !cc.isConsumerCurrent(acc, stream, consumer) {
if !js.isConsumerCurrent(acc, stream, consumer) {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
return health
Expand Down
123 changes: 122 additions & 1 deletion server/norace_test.go
Expand Up @@ -3607,7 +3607,7 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) {
fs = o.raftNode().(*raft).wal.(*fileStore)
state = fs.State()
err = fs.Truncate(state.FirstSeq)
require_NoError(t, err)
require_True(t, err == nil || err == ErrInvalidSequence)
state = fs.State()

sub, err = js.PullSubscribe("foo", "dlc")
Expand Down Expand Up @@ -7687,3 +7687,124 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleFilteredConsumers(t *te
require_True(t, numPreAcks == 0)
}
}

func TestNoRaceParallelStreamAndConsumerCreation(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// stream config.
scfg := &StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
MaxMsgs: 10,
Storage: FileStorage,
Replicas: 1,
}

// Will do these direct against the low level API to really make
// sure parallel creation ok.
np := 1000
startCh := make(chan bool)
errCh := make(chan error, np)
wg := sync.WaitGroup{}
wg.Add(np)

var streams sync.Map

for i := 0; i < np; i++ {
go func() {
defer wg.Done()

// Make them all fire at once.
<-startCh

if mset, err := s.GlobalAccount().addStream(scfg); err != nil {
t.Logf("Stream create got an error: %v", err)
errCh <- err
} else {
streams.Store(mset, true)
}
}()
}
time.Sleep(100 * time.Millisecond)
close(startCh)
wg.Wait()

// Check for no errors.
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}

// Now make sure we really only created one stream.
var numStreams int
streams.Range(func(k, v any) bool {
numStreams++
return true
})
if numStreams > 1 {
t.Fatalf("Expected only one stream to be really created, got %d out of %d attempts", numStreams, np)
}

// Also make sure we cleanup the inflight entries for streams.
gacc := s.GlobalAccount()
_, jsa, err := gacc.checkForJetStream()
require_NoError(t, err)
var numEntries int
jsa.inflight.Range(func(k, v any) bool {
numEntries++
return true
})
if numEntries > 0 {
t.Fatalf("Expected no inflight entries to be left over, got %d", numEntries)
}

// Now do consumers.
mset, err := gacc.lookupStream("TEST")
require_NoError(t, err)

cfg := &ConsumerConfig{
DeliverSubject: "to",
Name: "DLC",
AckPolicy: AckExplicit,
}

startCh = make(chan bool)
errCh = make(chan error, np)
wg.Add(np)

var consumers sync.Map

for i := 0; i < np; i++ {
go func() {
defer wg.Done()

// Make them all fire at once.
<-startCh

if _, err = mset.addConsumer(cfg); err != nil {
t.Logf("Consumer create got an error: %v", err)
errCh <- err
} else {
consumers.Store(mset, true)
}
}()
}
time.Sleep(100 * time.Millisecond)
close(startCh)
wg.Wait()

// Check for no errors.
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}

// Now make sure we really only created one stream.
var numConsumers int
consumers.Range(func(k, v any) bool {
numConsumers++
return true
})
if numConsumers > 1 {
t.Fatalf("Expected only one consumer to be really created, got %d out of %d attempts", numConsumers, np)
}
}

0 comments on commit 14ad983

Please sign in to comment.