Skip to content

Commit

Permalink
Merge pull request #3960 from nats-io/fix-3953
Browse files Browse the repository at this point in the history
[FIXED] Scaling up workqueue stream does not remove ack'd messages.
  • Loading branch information
derekcollison committed Mar 14, 2023
2 parents 07bc964 + 5a1878b commit 3ecf55b
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 3 deletions.
19 changes: 18 additions & 1 deletion server/consumer.go
Expand Up @@ -981,7 +981,24 @@ func (o *consumer) setLeader(isLeader bool) {

// If we are here we have a change in leader status.
if isLeader {
if mset == nil || isRunning {
if mset == nil {
return
}
if isRunning {
// If we detect we are scaling up, make sure to create clustered routines and channels.
o.mu.Lock()
if o.node != nil && o.pch == nil {
// We are moving from R1 to clustered.
o.pch = make(chan struct{}, 1)
go o.loopAndForwardProposals(o.qch)
if o.phead != nil {
select {
case o.pch <- struct{}{}:
default:
}
}
}
o.mu.Unlock()
return
}

Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_cluster.go
Expand Up @@ -5665,9 +5665,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su

// Need to remap any consumers.
for _, ca := range osa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1.
// Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy.
numPeers := len(ca.Group.Peers)
if ca.Config.Durable != _EMPTY_ || numPeers > 1 {
if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy {
cca := ca.copyGroup()
// Adjust preferred as needed.
if numPeers == 1 && len(rg.Peers) > 1 {
Expand Down
92 changes: 92 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -2923,3 +2923,95 @@ func TestJetStreamClusterStreamMaxAgeScaleUp(t *testing.T) {
})
}
}

func TestJetStreamClusterWorkQueueConsumerReplicatedAfterScaleUp(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 1,
Subjects: []string{"WQ"},
Retention: nats.WorkQueuePolicy,
})
require_NoError(t, err)

// Create an ephemeral consumer.
sub, err := js.SubscribeSync("WQ")
require_NoError(t, err)

// Scale up to R3.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 3,
Subjects: []string{"WQ"},
Retention: nats.WorkQueuePolicy,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

ci, err := sub.ConsumerInfo()
require_NoError(t, err)

require_True(t, ci.Config.Replicas == 0 || ci.Config.Replicas == 3)

s := c.consumerLeader(globalAccountName, "TEST", ci.Name)
require_NotNil(t, s)

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

o := mset.lookupConsumer(ci.Name)
require_NotNil(t, o)
require_NotNil(t, o.raftNode())
}

// https://github.com/nats-io/nats-server/issues/3953
func TestJetStreamClusterWorkQueueAfterScaleUp(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 1,
Subjects: []string{"WQ"},
Retention: nats.WorkQueuePolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "d1",
DeliverSubject: "d1",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

wch := make(chan bool, 1)
_, err = nc.Subscribe("d1", func(msg *nats.Msg) {
msg.AckSync()
wch <- true
})
require_NoError(t, err)

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 3,
Subjects: []string{"WQ"},
Retention: nats.WorkQueuePolicy,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

sendStreamMsg(t, nc, "WQ", "SOME WORK")
<-wch

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == 0)
}

0 comments on commit 3ecf55b

Please sign in to comment.