Skip to content

Commit

Permalink
Merge pull request #3880 from nats-io/p-consumer-create
Browse files Browse the repository at this point in the history
[IMPROVED] Parallel consumer
  • Loading branch information
derekcollison committed Feb 18, 2023
2 parents 689d521 + efa3bcc commit 7f5bef4
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 71 deletions.
38 changes: 37 additions & 1 deletion server/consumer.go
Expand Up @@ -54,7 +54,7 @@ type ConsumerInfo struct {
}

type ConsumerConfig struct {
// Durable is deprecated. All consumers will have names. picked by clients.
// Durable is deprecated. All consumers should have names, picked by clients.
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Expand Down Expand Up @@ -298,6 +298,7 @@ type consumer struct {
prOk bool
uch chan struct{}
retention RetentionPolicy
inMonitor bool

// R>1 proposals
pch chan struct{}
Expand Down Expand Up @@ -959,6 +960,13 @@ func (o *consumer) clearNode() {
}
}

// IsLeader will return if we are the current leader.
func (o *consumer) IsLeader() bool {
o.mu.RLock()
defer o.mu.RUnlock()
return o.isLeader()
}

// Lock should be held.
func (o *consumer) isLeader() bool {
if o.node != nil {
Expand Down Expand Up @@ -1604,6 +1612,8 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
if o.cfg.FilterSubject != cfg.FilterSubject {
if cfg.FilterSubject != _EMPTY_ {
o.filterWC = subjectHasWildcard(cfg.FilterSubject)
} else {
o.filterWC = false
}
// Make sure we have correct signaling setup.
// Consumer lock can not be held.
Expand Down Expand Up @@ -4444,3 +4454,29 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s
o.signalNewMessages()
}
}

// Will check if we are running in the monitor already and if not set the appropriate flag.
func (o *consumer) checkInMonitor() bool {
o.mu.Lock()
defer o.mu.Unlock()

if o.inMonitor {
return true
}
o.inMonitor = true
return false
}

// Clear us being in the monitor routine.
func (o *consumer) clearMonitorRunning() {
o.mu.Lock()
defer o.mu.Unlock()
o.inMonitor = false
}

// Test whether we are in the monitor routine.
func (o *consumer) isMonitorRunning() bool {
o.mu.Lock()
defer o.mu.Unlock()
return o.inMonitor
}
24 changes: 16 additions & 8 deletions server/jetstream_api.go
Expand Up @@ -3823,6 +3823,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
if isClustered && !req.Config.Direct {
// If we are inline with client, we still may need to do a callout for consumer info
// during this call, so place in Go routine to not block client.
// Router and Gateway API calls already in separate context.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
} else {
Expand Down Expand Up @@ -4165,20 +4166,27 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
// We have a consumer assignment.
js.mu.RLock()

var node RaftNode
var leaderNotPartOfGroup bool
if rg := ca.Group; rg != nil && rg.node != nil && rg.isMember(ourID) {
node = rg.node
if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
leaderNotPartOfGroup = true
var isMember bool

rg := ca.Group
if rg != nil && rg.isMember(ourID) {
isMember = true
if rg.node != nil {
node = rg.node
if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
leaderNotPartOfGroup = true
}
}
}
js.mu.RUnlock()
// Check if we should ignore all together.
if node == nil {
// We have been assigned and are pending.
if ca.pending {
// Send our config and defaults for state and no cluster info.
// We have been assigned but have not created a node yet. If we are a member return
// our config and defaults for state and no cluster info.
if isMember {
resp.ConsumerInfo = &ConsumerInfo{
Stream: ca.Stream,
Name: ca.Name,
Expand All @@ -4190,7 +4198,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
return
}
// If we are a member and we have a group leader or we had a previous leader consider bailing out.
if node != nil && (node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader()) {
if node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader() {
if leaderNotPartOfGroup {
resp.Error = NewJSConsumerOfflineError()
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
Expand Down
108 changes: 58 additions & 50 deletions server/jetstream_cluster.go
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -146,7 +146,6 @@ type consumerAssignment struct {
// Internal
responded bool
deleted bool
pending bool
err error
}

Expand Down Expand Up @@ -3585,18 +3584,19 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}
js.mu.RLock()
s := js.srv
acc, err := s.LookupAccount(ca.Client.serviceAccount())
rg := ca.Group
alreadyRunning := rg != nil && rg.node != nil
accName, stream, consumer := ca.Client.serviceAccount(), ca.Stream, ca.Name
js.mu.RUnlock()

acc, err := s.LookupAccount(accName)
if err != nil {
s.Warnf("JetStream cluster failed to lookup account %q: %v", ca.Client.serviceAccount(), err)
js.mu.RUnlock()
s.Warnf("JetStream cluster failed to lookup axccount %q: %v", accName, err)
return
}
rg := ca.Group
alreadyRunning := rg.node != nil
js.mu.RUnlock()

// Go ahead and create or update the consumer.
mset, err := acc.lookupStream(ca.Stream)
mset, err := acc.lookupStream(stream)
if err != nil {
js.mu.Lock()
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.serviceAccount(), ca.Stream)
Expand All @@ -3614,15 +3614,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}

// Check if we already have this consumer running.
o := mset.lookupConsumer(ca.Name)
o := mset.lookupConsumer(consumer)

if !alreadyRunning {
// Process the raft group and make sure its running if needed.
storage := mset.config().Storage
if ca.Config.MemoryStorage {
storage = MemoryStorage
}
js.createRaftGroup(acc.GetName(), rg, storage)
js.createRaftGroup(accName, rg, storage)
} else {
// If we are clustered update the known peers.
js.mu.RLock()
Expand All @@ -3633,51 +3633,56 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}

// Check if we already have this consumer running.
var didCreate, isConfigUpdate bool
var didCreate, isConfigUpdate, needsLocalResponse bool
if o == nil {
// Add in the consumer if needed.
o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false)
didCreate = true
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil {
didCreate = true
}
} else {
if err := o.updateConfig(ca.Config); err != nil {
// This is essentially an update that has failed.
js.mu.Lock()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
// This consumer exists.
// Only update if config is really different.
cfg := o.config()
if !reflect.DeepEqual(&cfg, ca.Config) {
if err := o.updateConfig(ca.Config); err != nil {
// This is essentially an update that has failed.
js.mu.RLock()
result := &consumerAssignmentResult{
Account: accName,
Stream: stream,
Consumer: consumer,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.RUnlock()
return
}
result.Response.Error = NewJSConsumerNameExistError()
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
return
}
// Check if we already had a consumer assignment and its still pending.
cca, oca := ca, o.consumerAssignment()
o.mu.RLock()
leader := o.isLeader()
o.mu.RUnlock()

var sendState bool
js.mu.Lock()
js.mu.RLock()
n := rg.node
// Check if we already had a consumer assignment and its still pending.
cca, oca := ca, o.consumerAssignment()
if oca != nil {
if !oca.responded {
// We can't override info for replying here otherwise leader once elected can not respond.
// So just update Config, leave off client and reply to the originals.
cac := *oca
cac.Config = ca.Config
// So copy over original client and the reply from the old ca.
cac := *ca
cac.Client = oca.Client
cac.Reply = oca.Reply
cca = &cac
needsLocalResponse = true
}
// If we look like we are scaling up, let's send our current state to the group.
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && leader
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && o.IsLeader() && n != nil
// Signal that this is an update
isConfigUpdate = true
}
n := rg.node
js.mu.Unlock()
js.mu.RUnlock()

if sendState && n != nil {
if sendState {
if snap, err := o.store.EncodedState(); err == nil {
n.SendSnapshot(snap)
}
Expand All @@ -3694,6 +3699,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
err = o.setStoreState(state)
o.mu.Unlock()
}

if err != nil {
if IsNatsErr(err, JSConsumerStoreFailedErrF) {
s.Warnf("Consumer create failed for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err)
Expand Down Expand Up @@ -3755,7 +3761,6 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}
}

// Start our monitoring routine.
if rg.node == nil {
// Single replica consumer, process manually here.
js.mu.Lock()
Expand All @@ -3766,15 +3771,14 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
js.mu.Unlock()
js.processConsumerLeaderChange(o, true)
} else {
if !alreadyRunning {
// Clustered consumer.
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
}
// Only send response if not recovering.
if !js.isMetaRecovering() {
o.mu.RLock()
isLeader := o.isLeader()
o.mu.RUnlock()
if wasExisting && (isLeader || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) {
// For existing consumer, only send response if not recovering.
if wasExisting && !js.isMetaRecovering() {
if o.IsLeader() || (!didCreate && needsLocalResponse) {
// Process if existing as an update.
js.mu.RLock()
client, subject, reply := ca.Client, ca.Subject, ca.Reply
Expand Down Expand Up @@ -3970,6 +3974,12 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
return
}

// Make sure only one is running.
if o.checkInMonitor() {
return
}
defer o.clearMonitorRunning()

qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID()

s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
Expand Down Expand Up @@ -4373,8 +4383,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
ca.responded = true
js.mu.Unlock()

streamName := o.streamName()
consumerName := o.String()
streamName, consumerName := o.streamName(), o.String()
acc, _ := s.LookupAccount(account)
if acc == nil {
return stepDownIfLeader()
Expand Down Expand Up @@ -6493,7 +6502,6 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
}
ca.pending = true
sa.consumers[ca.Name] = ca

// Do formal proposal.
Expand Down
21 changes: 14 additions & 7 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -1486,13 +1486,21 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
})
require_NoError(t, err)

np := 20
np := 50

startCh := make(chan bool)
errCh := make(chan error, np)

cfg := &nats.ConsumerConfig{
Durable: "dlc",
Replicas: 3,
}

wg := sync.WaitGroup{}
swg := sync.WaitGroup{}
wg.Add(np)
swg.Add(np)

for i := 0; i < np; i++ {
go func() {
defer wg.Done()
Expand All @@ -1501,21 +1509,20 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

swg.Done()

// Make them all fire at once.
<-startCh

var err error
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dlc",
Replicas: 3,
})
if err != nil {
if _, err := js.AddConsumer("TEST", cfg); err != nil {
errCh <- err
}
}()
}

swg.Wait()
close(startCh)

wg.Wait()

if len(errCh) > 0 {
Expand Down
6 changes: 1 addition & 5 deletions server/stream.go
Expand Up @@ -541,11 +541,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}

// This is always true in single server mode.
mset.mu.RLock()
isLeader := mset.isLeader()
mset.mu.RUnlock()

if isLeader {
if mset.IsLeader() {
// Send advisory.
var suppress bool
if !s.standAloneMode() && sa == nil {
Expand Down

0 comments on commit 7f5bef4

Please sign in to comment.