Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Do not health check streams that are actively being restored. #4277

Merged
merged 2 commits into from Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
121 changes: 121 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4398,3 +4398,124 @@ func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) {
// Make sure no other errors showed up
require_True(t, len(errCh) == 0)
}

func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

toSend, msg := 1000, bytes.Repeat([]byte("Z"), 1024)
for i := 0; i < toSend; i++ {
_, err := js.PublishAsync("foo", msg)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

sreq := &JSApiStreamSnapshotRequest{
DeliverSubject: nats.NewInbox(),
ChunkSize: 512,
}
req, _ := json.Marshal(sreq)
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamSnapshotT, "TEST"), req, time.Second)
require_NoError(t, err)

var resp JSApiStreamSnapshotResponse
json.Unmarshal(rmsg.Data, &resp)
require_True(t, resp.Error == nil)

state := *resp.State
cfg := *resp.Config

var snapshot []byte
done := make(chan bool)

sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) {
// EOF
if len(m.Data) == 0 {
done <- true
return
}
// Could be writing to a file here too.
snapshot = append(snapshot, m.Data...)
// Flow ack
m.Respond(nil)
})
defer sub.Unsubscribe()

// Wait to receive the snapshot.
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive our snapshot in time")
}

// Delete before we try to restore.
require_NoError(t, js.DeleteStream("TEST"))

checkHealth := func() {
for _, s := range c.servers {
s.healthz(nil)
}
}

var rresp JSApiStreamRestoreResponse
rreq := &JSApiStreamRestoreRequest{
Config: cfg,
State: state,
}
req, _ = json.Marshal(rreq)

rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second)
require_NoError(t, err)

rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
require_True(t, resp.Error == nil)

checkHealth()

// We will now chunk the snapshot responses (and EOF).
var chunk [1024]byte
for i, r := 0, bytes.NewReader(snapshot); ; {
n, err := r.Read(chunk[:])
if err != nil {
break
}
nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
i++
// We will call healthz for all servers half way through the restore.
if i%100 == 0 {
checkHealth()
}
}
rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second)
require_NoError(t, err)
rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
require_True(t, resp.Error == nil)

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == uint64(toSend))

// Make sure stepdown works, this would fail before the fix.
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, 5*time.Second)
require_NoError(t, err)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == uint64(toSend))
}
3 changes: 2 additions & 1 deletion server/monitor.go
Expand Up @@ -3147,7 +3147,8 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
for acc, asa := range cc.streams {
nasa := make(map[string]*streamAssignment)
for stream, sa := range asa {
if sa.Group.isMember(ourID) {
// If we are a member and we are not being restored, select for check.
if sa.Group.isMember(ourID) && sa.Restore == nil {
csa := sa.copyGroup()
csa.consumers = make(map[string]*consumerAssignment)
for consumer, ca := range sa.consumers {
Expand Down