Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Protect against concurrent creation of streams and consumers. #4013

Merged
merged 5 commits into from Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions server/consumer.go
Expand Up @@ -299,6 +299,8 @@ type consumer struct {
prOk bool
uch chan struct{}
retention RetentionPolicy

monitorWg sync.WaitGroup
inMonitor bool

// R>1 proposals
Expand Down Expand Up @@ -4559,8 +4561,8 @@ func (o *consumer) clearMonitorRunning() {

// Test whether we are in the monitor routine.
func (o *consumer) isMonitorRunning() bool {
o.mu.Lock()
defer o.mu.Unlock()
o.mu.RLock()
defer o.mu.RUnlock()
return o.inMonitor
}

Expand Down
1 change: 1 addition & 0 deletions server/jetstream.go
Expand Up @@ -137,6 +137,7 @@ type jsAccount struct {
js *jetStream
account *Account
storeDir string
inflight sync.Map
streams map[string]*stream
templates map[string]*streamTemplate
store TemplateStore
Expand Down
54 changes: 29 additions & 25 deletions server/jetstream_cluster.go
Expand Up @@ -2486,41 +2486,42 @@ func (mset *stream) resetClusteredState(err error) bool {

// Preserve our current state and messages unless we have a first sequence mismatch.
shouldDelete := err == errFirstSequenceMismatch

mset.monitorWg.Wait()
mset.resetAndWaitOnConsumers()
// Stop our stream.
mset.stop(shouldDelete, false)

if sa != nil {
s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
js.mu.Lock()
s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
// Now wipe groups from assignments.
sa.Group.node = nil
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
for _, ca := range sa.consumers {
if rg := ca.Group; rg != nil && rg.isMember(ourID) {
rg.node = nil // Erase group raft/node state.
consumers = append(consumers, ca)
}
}
}
js.mu.Unlock()
go js.restartClustered(acc, sa)
}
return true
}

// This will reset the stream and consumers.
// Should be done in separate go routine.
func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
// Check and collect consumers first.
js.mu.RLock()
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
for _, ca := range sa.consumers {
if rg := ca.Group; rg != nil && rg.isMember(ourID) {
rg.node = nil // Erase group raft/node state.
consumers = append(consumers, ca)
// restart in a separate Go routine.
// This will reset the stream and consumers.
go func() {
// Reset stream.
js.processClusterCreateStream(acc, sa)
// Reset consumers.
for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil, false)
}
}
}()
}
js.mu.RUnlock()

// Reset stream.
js.processClusterCreateStream(acc, sa)
// Reset consumers.
for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil, false)
}
return true
}

func isControlHdr(hdr []byte) bool {
Expand Down Expand Up @@ -3974,6 +3975,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Clustered consumer.
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
o.monitorWg.Add(1)
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
}
// For existing consumer, only send response if not recovering.
Expand Down Expand Up @@ -4172,6 +4174,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
s, n, cc := js.server(), o.raftNode(), js.cluster
defer s.grWG.Done()

defer o.monitorWg.Done()

if n == nil {
s.Warnf("No RAFT group for '%s > %s > %s'", o.acc.Name, ca.Stream, ca.Name)
return
Expand Down
108 changes: 108 additions & 0 deletions server/norace_test.go
Expand Up @@ -7687,3 +7687,111 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleFilteredConsumers(t *te
require_True(t, numPreAcks == 0)
}
}

func TestNoRaceParallelStreamAndConsumerCreation(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// stream config.
scfg := &StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
MaxMsgs: 10,
Storage: FileStorage,
Replicas: 1,
}

// Will do these direct against the low level API to really make
// sure parallel creation ok.
np := 1000
startCh := make(chan bool)
errCh := make(chan error, np)
wg := sync.WaitGroup{}
wg.Add(np)

var streams sync.Map

for i := 0; i < np; i++ {
go func() {
defer wg.Done()

// Make them all fire at once.
<-startCh

if mset, err := s.GlobalAccount().addStream(scfg); err != nil {
t.Logf("Stream create got an error: %v", err)
errCh <- err
} else {
streams.Store(mset, true)
}
}()
}
time.Sleep(100 * time.Millisecond)
close(startCh)
wg.Wait()

// Check for no errors.
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}

// Now make sure we really only created one stream.
var numStreams int
streams.Range(func(k, v any) bool {
numStreams++
return true
})
if numStreams > 1 {
t.Fatalf("Expected only one stream to be really created, got %d out of %d attempts", numStreams, np)
}

// Now do consumers.
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

cfg := &ConsumerConfig{
DeliverSubject: "to",
Name: "DLC",
AckPolicy: AckExplicit,
}

startCh = make(chan bool)
errCh = make(chan error, np)
wg.Add(np)

var consumers sync.Map

for i := 0; i < np; i++ {
go func() {
defer wg.Done()

// Make them all fire at once.
<-startCh

if _, err = mset.addConsumer(cfg); err != nil {
t.Logf("Consumer create got an error: %v", err)
errCh <- err
} else {
consumers.Store(mset, true)
}
}()
}
time.Sleep(100 * time.Millisecond)
close(startCh)
wg.Wait()

// Check for no errors.
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}

// Now make sure we really only created one stream.
var numConsumers int
consumers.Range(func(k, v any) bool {
numConsumers++
return true
})
if numConsumers > 1 {
t.Fatalf("Expected only one consumer to be really created, got %d out of %d attempts", numConsumers, np)
}
}
69 changes: 51 additions & 18 deletions server/stream.go
Expand Up @@ -366,10 +366,21 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
return nil, ApiErrors[JSStreamReplicasNotSupportedErr]
}

// Make sure we are ok when these are done in parallel.
var wg sync.WaitGroup
v, loaded := jsa.inflight.LoadOrStore(cfg.Name, &wg)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
ifwg := v.(*sync.WaitGroup)
if loaded {
ifwg.Wait()
} else {
ifwg.Add(1)
defer ifwg.Done()
}

js, isClustered := jsa.jetStreamAndClustered()
jsa.mu.RLock()
jsa.mu.Lock()
if mset, ok := jsa.streams[cfg.Name]; ok {
jsa.mu.RUnlock()
jsa.mu.Unlock()
// Check to see if configs are same.
ocfg := mset.config()
if reflect.DeepEqual(ocfg, cfg) {
Expand All @@ -388,7 +399,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
if !isClustered {
reserved = jsa.tieredReservation(tier, &cfg)
}
jsa.mu.RUnlock()
jsa.mu.Unlock()

if !hasTier {
return nil, NewJSNoLimitsError()
}
Expand Down Expand Up @@ -4427,6 +4439,26 @@ func (mset *stream) internalLoop() {
}
}

// Used to break consumers out of their
func (mset *stream) resetAndWaitOnConsumers() {
mset.mu.RLock()
consumers := make([]*consumer, 0, len(mset.consumers))
for _, o := range mset.consumers {
consumers = append(consumers, o)
}
mset.mu.RUnlock()

for _, o := range consumers {
if node := o.raftNode(); node != nil {
if o.IsLeader() {
node.StepDown()
}
node.Delete()
}
o.monitorWg.Wait()
}
}

// Internal function to delete a stream.
func (mset *stream) delete() error {
if mset == nil {
Expand All @@ -4445,7 +4477,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
return NewJSNotEnabledForAccountError()
}

// Remove from our account map.
// Remove from our account map first.
jsa.mu.Lock()
delete(jsa.streams, mset.cfg.Name)
accName := jsa.account.Name
Expand Down Expand Up @@ -4474,16 +4506,30 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.cancelSourceConsumer(si.iname)
}
}

// Cluster cleanup
var sa *streamAssignment
if n := mset.node; n != nil {
if deleteFlag {
n.Delete()
sa = mset.sa
} else if n.NeedSnapshot() {
// Attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
n.Stop()
}
}
mset.mu.Unlock()

for _, o := range obs {
// Third flag says do not broadcast a signal.
// TODO(dlc) - If we have an err here we don't want to stop
// but should we log?
o.stopWithFlags(deleteFlag, deleteFlag, false, advisory)
o.monitorWg.Wait()
}
mset.mu.Lock()

mset.mu.Lock()
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
Expand All @@ -4495,19 +4541,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.infoSub = nil
}

// Cluster cleanup
var sa *streamAssignment
if n := mset.node; n != nil {
if deleteFlag {
n.Delete()
sa = mset.sa
} else if n.NeedSnapshot() {
// Attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
n.Stop()
}
}

// Send stream delete advisory after the consumers.
if deleteFlag && advisory {
mset.sendDeleteAdvisoryLocked()
Expand Down