Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backports from v2.10 for v2.9.23 release #4647

Merged
merged 4 commits into from Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/client.go
Expand Up @@ -4375,6 +4375,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}

// If we are a spoke leaf node make sure to not forward across routes.
// This mimics same behavior for normal subs above.
if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER {
continue
}

// We have taken care of preferring local subs for a message from a route above.
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
Expand Down
11 changes: 9 additions & 2 deletions server/consumer.go
Expand Up @@ -1816,8 +1816,12 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
return
}

forwardProposals := func() {
forwardProposals := func() error {
o.mu.Lock()
if o.node != node || node.State() != Leader {
o.mu.Unlock()
return errors.New("no longer leader")
}
proposal := o.phead
o.phead, o.ptail = nil, nil
o.mu.Unlock()
Expand All @@ -1839,6 +1843,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
if len(entries) > 0 {
node.ProposeDirect(entries)
}
return nil
}

// In case we have anything pending on entry.
Expand All @@ -1850,7 +1855,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
forwardProposals()
return
case <-pch:
forwardProposals()
if err := forwardProposals(); err != nil {
return
}
}
}
}
Expand Down
73 changes: 72 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -1159,6 +1159,65 @@ func (js *jetStream) checkForOrphans() {
}
}

// Check and delete any orphans we may come across.
func (s *Server) checkForNRGOrphans() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || js.isMetaRecovering() {
// No cluster means no NRGs. Also return if still recovering.
return
}

// Track which assets R>1 should be on this server.
nrgMap := make(map[string]struct{})
trackGroup := func(rg *raftGroup) {
// If R>1 track this as a legit NRG.
if rg.node != nil {
nrgMap[rg.Name] = struct{}{}
}
}
// Register our meta.
js.mu.RLock()
meta := cc.meta
if meta == nil {
js.mu.RUnlock()
// Bail with no meta node.
return
}

ourID := meta.ID()
nrgMap[meta.Group()] = struct{}{}

// Collect all valid groups from our assignments.
for _, asa := range cc.streams {
for _, sa := range asa {
if sa.Group.isMember(ourID) && sa.Restore == nil {
trackGroup(sa.Group)
for _, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
trackGroup(ca.Group)
}
}
}
}
}
js.mu.RUnlock()

// Check NRGs that are running.
var needDelete []RaftNode
s.rnMu.RLock()
for name, n := range s.raftNodes {
if _, ok := nrgMap[name]; !ok {
needDelete = append(needDelete, n)
}
}
s.rnMu.RUnlock()

for _, n := range needDelete {
s.Warnf("Detected orphaned NRG %q, will cleanup", n.Group())
n.Delete()
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1189,6 +1248,8 @@ func (js *jetStream) monitorCluster() {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
s.Warnf("%v", hs.Error)
}
// Also check for orphaned NRGs.
s.checkForNRGOrphans()
}

var (
Expand Down Expand Up @@ -1269,7 +1330,6 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
Expand All @@ -1280,6 +1340,8 @@ func (js *jetStream) monitorCluster() {
doSnapshot()
}
ce.ReturnToPool()
} else {
s.Warnf("Error applying JetStream cluster entries: %v", err)
}
}
aq.recycle(&ces)
Expand Down Expand Up @@ -2028,6 +2090,15 @@ func (mset *stream) removeNode() {
}
}

func (mset *stream) clearRaftNode() {
if mset == nil {
return
}
mset.mu.Lock()
defer mset.mu.Unlock()
mset.node = nil
}

// Helper function to generate peer info.
// lists and sets for old and new.
func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) {
Expand Down
177 changes: 176 additions & 1 deletion server/jetstream_cluster_3_test.go
Expand Up @@ -18,6 +18,7 @@ package server

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -5305,7 +5306,7 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Nowmal Stream
// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Expand Down Expand Up @@ -5384,3 +5385,177 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
require_True(t, blkSize(fs) == defaultMediumBlockSize)
}
}

func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// We will force an orphan for a certain server.
s := c.randomNonStreamLeader(globalAccountName, "TEST")

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
sgn := mset.raftNode().Group()
mset.clearRaftNode()

o := mset.lookupConsumer("DC")
require_True(t, o != nil)
ogn := o.raftNode().Group()
o.clearRaftNode()

require_NoError(t, js.DeleteStream("TEST"))

// Check that we do in fact have orphans.
require_True(t, s.numRaftNodes() > 1)

// This function will detect orphans and clean them up.
s.checkForNRGOrphans()

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}

func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
t.Skip("This test takes too long, need to make shorter")

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

nc2, producer := jsClientConnect(t, s)
defer nc2.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

end := time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i))
require_NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}

msgs, err := sub.Fetch(1)
if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) {
t.Logf("Pull Error: %v", err)
}
for _, msg := range msgs {
msg.Ack()
}
}
}()
}
c.lameDuckRestartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")

// Swap the logger to try to detect the condition after the restart.
loggers := make([]*captureDebugLogger, 3)
for i, srv := range c.servers {
l := &captureDebugLogger{dbgCh: make(chan string, 10)}
loggers[i] = l
srv.SetLogger(l, true, false)
}
condition := `Direct proposal ignored, not leader (state: CLOSED)`
errCh := make(chan error, 10)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case dl := <-loggers[0].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[1].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[2].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case <-ctx.Done():
return
}
}
}()

// Start publishing again for a while.
end = time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

// Try to do a stream edit back to R=1 after doing all the upgrade.
info, _ := js.StreamInfo("TEST")
sconfig := info.Config
sconfig.Replicas = 1
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

// Leave running for some time after the update.
time.Sleep(2 * time.Second)

info, _ = js.StreamInfo("TEST")
sconfig = info.Config
sconfig.Replicas = 3
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

select {
case e := <-errCh:
t.Fatalf("Bad condition on raft node: %v", e)
case <-time.After(2 * time.Second):
// Done
}

// Stop goroutines and wait for them to exit.
cancel()
wg.Wait()
}
15 changes: 15 additions & 0 deletions server/jetstream_helpers_test.go
Expand Up @@ -1521,6 +1521,21 @@ func (c *cluster) restartAll() {
c.waitOnClusterReady()
}

func (c *cluster) lameDuckRestartAll() {
c.t.Helper()
for i, s := range c.servers {
s.lameDuckMode()
s.WaitForShutdown()
if !s.Running() {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
}
c.waitOnClusterReady()
}

func (c *cluster) restartAllSamePorts() {
c.t.Helper()
for i, s := range c.servers {
Expand Down