Skip to content

Commit

Permalink
Recover in consumer assignment when asset already existed
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Aug 17, 2023
1 parent 3c85490 commit c437157
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
3 changes: 2 additions & 1 deletion server/consumer.go
Expand Up @@ -214,7 +214,8 @@ var (
// Calculate accurate replicas for the consumer config with the parent stream config.
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 {
// Matches old-school ephemerals only, where the replica count is 0.
return 1
}
return strCfg.Replicas
Expand Down
8 changes: 4 additions & 4 deletions server/jetstream_cluster.go
Expand Up @@ -4001,7 +4001,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
var didCreate, isConfigUpdate, needsLocalResponse bool
if o == nil {
// Add in the consumer if needed.
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil {
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil {
didCreate = true
}
} else {
Expand Down Expand Up @@ -5996,11 +5996,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
// Need to remap any consumers.
for _, ca := range osa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy.
replicas := ca.Config.replicas(cfg)
if ca.Config.Durable != _EMPTY_ || replicas > 1 || cfg.Retention != LimitsPolicy {
numPeers := len(ca.Group.Peers)
if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy {
cca := ca.copyGroup()
// Adjust preferred as needed.
if replicas == 1 && len(rg.Peers) > 1 {
if numPeers == 1 && len(rg.Peers) > 1 {
cca.Group.Preferred = ca.Group.Peers[0]
} else {
cca.Group.Preferred = _EMPTY_
Expand Down

0 comments on commit c437157

Please sign in to comment.