Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Parallel consumer #3880

Merged
merged 1 commit into from Feb 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 37 additions & 1 deletion server/consumer.go
Expand Up @@ -54,7 +54,7 @@ type ConsumerInfo struct {
}

type ConsumerConfig struct {
// Durable is deprecated. All consumers will have names. picked by clients.
// Durable is deprecated. All consumers should have names, picked by clients.
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Expand Down Expand Up @@ -298,6 +298,7 @@ type consumer struct {
prOk bool
uch chan struct{}
retention RetentionPolicy
inMonitor bool

// R>1 proposals
pch chan struct{}
Expand Down Expand Up @@ -959,6 +960,13 @@ func (o *consumer) clearNode() {
}
}

// IsLeader will return if we are the current leader.
func (o *consumer) IsLeader() bool {
o.mu.RLock()
defer o.mu.RUnlock()
return o.isLeader()
}

// Lock should be held.
func (o *consumer) isLeader() bool {
if o.node != nil {
Expand Down Expand Up @@ -1604,6 +1612,8 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
if o.cfg.FilterSubject != cfg.FilterSubject {
if cfg.FilterSubject != _EMPTY_ {
o.filterWC = subjectHasWildcard(cfg.FilterSubject)
} else {
o.filterWC = false
}
// Make sure we have correct signaling setup.
// Consumer lock can not be held.
Expand Down Expand Up @@ -4444,3 +4454,29 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s
o.signalNewMessages()
}
}

// Will check if we are running in the monitor already and if not set the appropriate flag.
func (o *consumer) checkInMonitor() bool {
o.mu.Lock()
defer o.mu.Unlock()

if o.inMonitor {
return true
}
o.inMonitor = true
return false
}

// Clear us being in the monitor routine.
func (o *consumer) clearMonitorRunning() {
o.mu.Lock()
defer o.mu.Unlock()
o.inMonitor = false
}

// Test whether we are in the monitor routine.
func (o *consumer) isMonitorRunning() bool {
o.mu.Lock()
defer o.mu.Unlock()
return o.inMonitor
}
24 changes: 16 additions & 8 deletions server/jetstream_api.go
Expand Up @@ -3823,6 +3823,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
if isClustered && !req.Config.Direct {
// If we are inline with client, we still may need to do a callout for consumer info
// during this call, so place in Go routine to not block client.
// Router and Gateway API calls already in separate context.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
} else {
Expand Down Expand Up @@ -4165,20 +4166,27 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
// We have a consumer assignment.
js.mu.RLock()

var node RaftNode
var leaderNotPartOfGroup bool
if rg := ca.Group; rg != nil && rg.node != nil && rg.isMember(ourID) {
node = rg.node
if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
leaderNotPartOfGroup = true
var isMember bool

rg := ca.Group
if rg != nil && rg.isMember(ourID) {
isMember = true
if rg.node != nil {
node = rg.node
if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
leaderNotPartOfGroup = true
}
}
}
js.mu.RUnlock()
// Check if we should ignore all together.
if node == nil {
// We have been assigned and are pending.
if ca.pending {
// Send our config and defaults for state and no cluster info.
// We have been assigned but have not created a node yet. If we are a member return
// our config and defaults for state and no cluster info.
if isMember {
resp.ConsumerInfo = &ConsumerInfo{
Stream: ca.Stream,
Name: ca.Name,
Expand All @@ -4190,7 +4198,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
return
}
// If we are a member and we have a group leader or we had a previous leader consider bailing out.
if node != nil && (node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader()) {
if node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader() {
if leaderNotPartOfGroup {
resp.Error = NewJSConsumerOfflineError()
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
Expand Down
108 changes: 58 additions & 50 deletions server/jetstream_cluster.go
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -146,7 +146,6 @@ type consumerAssignment struct {
// Internal
responded bool
deleted bool
pending bool
err error
}

Expand Down Expand Up @@ -3585,18 +3584,19 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}
js.mu.RLock()
s := js.srv
acc, err := s.LookupAccount(ca.Client.serviceAccount())
rg := ca.Group
alreadyRunning := rg != nil && rg.node != nil
accName, stream, consumer := ca.Client.serviceAccount(), ca.Stream, ca.Name
js.mu.RUnlock()

acc, err := s.LookupAccount(accName)
if err != nil {
s.Warnf("JetStream cluster failed to lookup account %q: %v", ca.Client.serviceAccount(), err)
js.mu.RUnlock()
s.Warnf("JetStream cluster failed to lookup axccount %q: %v", accName, err)
return
}
rg := ca.Group
alreadyRunning := rg.node != nil
js.mu.RUnlock()

// Go ahead and create or update the consumer.
mset, err := acc.lookupStream(ca.Stream)
mset, err := acc.lookupStream(stream)
if err != nil {
js.mu.Lock()
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.serviceAccount(), ca.Stream)
Expand All @@ -3614,15 +3614,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}

// Check if we already have this consumer running.
o := mset.lookupConsumer(ca.Name)
o := mset.lookupConsumer(consumer)

if !alreadyRunning {
// Process the raft group and make sure its running if needed.
storage := mset.config().Storage
if ca.Config.MemoryStorage {
storage = MemoryStorage
}
js.createRaftGroup(acc.GetName(), rg, storage)
js.createRaftGroup(accName, rg, storage)
} else {
// If we are clustered update the known peers.
js.mu.RLock()
Expand All @@ -3633,51 +3633,56 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}

// Check if we already have this consumer running.
var didCreate, isConfigUpdate bool
var didCreate, isConfigUpdate, needsLocalResponse bool
if o == nil {
// Add in the consumer if needed.
o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false)
didCreate = true
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil {
didCreate = true
}
} else {
if err := o.updateConfig(ca.Config); err != nil {
// This is essentially an update that has failed.
js.mu.Lock()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
// This consumer exists.
// Only update if config is really different.
cfg := o.config()
if !reflect.DeepEqual(&cfg, ca.Config) {
if err := o.updateConfig(ca.Config); err != nil {
// This is essentially an update that has failed.
js.mu.RLock()
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumer,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.RUnlock()
return
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
return
}
// Check if we already had a consumer assignment and its still pending.
cca, oca := ca, o.consumerAssignment()
o.mu.RLock()
leader := o.isLeader()
o.mu.RUnlock()

var sendState bool
js.mu.Lock()
js.mu.RLock()
n := rg.node
// Check if we already had a consumer assignment and its still pending.
cca, oca := ca, o.consumerAssignment()
if oca != nil {
if !oca.responded {
// We can't override info for replying here otherwise leader once elected can not respond.
// So just update Config, leave off client and reply to the originals.
cac := *oca
cac.Config = ca.Config
// So copy over original client and the reply from the old ca.
cac := *ca
cac.Client = oca.Client
cac.Reply = oca.Reply
cca = &cac
needsLocalResponse = true
}
// If we look like we are scaling up, let's send our current state to the group.
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && leader
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && o.IsLeader() && n != nil
// Signal that this is an update
isConfigUpdate = true
}
n := rg.node
js.mu.Unlock()
js.mu.RUnlock()

if sendState && n != nil {
if sendState {
if snap, err := o.store.EncodedState(); err == nil {
n.SendSnapshot(snap)
}
Expand All @@ -3694,6 +3699,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
err = o.setStoreState(state)
o.mu.Unlock()
}

if err != nil {
if IsNatsErr(err, JSConsumerStoreFailedErrF) {
s.Warnf("Consumer create failed for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err)
Expand Down Expand Up @@ -3755,7 +3761,6 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}
}

// Start our monitoring routine.
if rg.node == nil {
// Single replica consumer, process manually here.
js.mu.Lock()
Expand All @@ -3766,15 +3771,14 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
js.mu.Unlock()
js.processConsumerLeaderChange(o, true)
} else {
if !alreadyRunning {
// Clustered consumer.
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
}
// Only send response if not recovering.
if !js.isMetaRecovering() {
o.mu.RLock()
isLeader := o.isLeader()
o.mu.RUnlock()
if wasExisting && (isLeader || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) {
// For existing consumer, only send response if not recovering.
if wasExisting && !js.isMetaRecovering() {
if o.IsLeader() || (!didCreate && needsLocalResponse) {
// Process if existing as an update.
js.mu.RLock()
client, subject, reply := ca.Client, ca.Subject, ca.Reply
Expand Down Expand Up @@ -3970,6 +3974,12 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
return
}

// Make sure only one is running.
if o.checkInMonitor() {
return
}
defer o.clearMonitorRunning()

qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID()

s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
Expand Down Expand Up @@ -4373,8 +4383,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
ca.responded = true
js.mu.Unlock()

streamName := o.streamName()
consumerName := o.String()
streamName, consumerName := o.streamName(), o.String()
acc, _ := s.LookupAccount(account)
if acc == nil {
return stepDownIfLeader()
Expand Down Expand Up @@ -6493,7 +6502,6 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
}
ca.pending = true
sa.consumers[ca.Name] = ca

// Do formal proposal.
Expand Down
21 changes: 14 additions & 7 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -1486,13 +1486,21 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
})
require_NoError(t, err)

np := 20
np := 50

startCh := make(chan bool)
errCh := make(chan error, np)

cfg := &nats.ConsumerConfig{
Durable: "dlc",
Replicas: 3,
}

wg := sync.WaitGroup{}
swg := sync.WaitGroup{}
wg.Add(np)
swg.Add(np)

for i := 0; i < np; i++ {
go func() {
defer wg.Done()
Expand All @@ -1501,21 +1509,20 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

swg.Done()

// Make them all fire at once.
<-startCh

var err error
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dlc",
Replicas: 3,
})
if err != nil {
if _, err := js.AddConsumer("TEST", cfg); err != nil {
errCh <- err
}
}()
}

swg.Wait()
close(startCh)

wg.Wait()

if len(errCh) > 0 {
Expand Down
6 changes: 1 addition & 5 deletions server/stream.go
Expand Up @@ -541,11 +541,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}

// This is always true in single server mode.
mset.mu.RLock()
isLeader := mset.isLeader()
mset.mu.RUnlock()

if isLeader {
if mset.IsLeader() {
// Send advisory.
var suppress bool
if !s.standAloneMode() && sa == nil {
Expand Down