Skip to content

Commit

Permalink
[FIXED] Do not health check streams that are actively being restored. (
Browse files Browse the repository at this point in the history
…#4277)

Could leave them in a bad state.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves #4270
  • Loading branch information
derekcollison committed Jun 29, 2023
2 parents 68a17a4 + a2b9ee9 commit 503de45
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 1 deletion.
121 changes: 121 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4398,3 +4398,124 @@ func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) {
// Make sure no other errors showed up
require_True(t, len(errCh) == 0)
}

func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

toSend, msg := 1000, bytes.Repeat([]byte("Z"), 1024)
for i := 0; i < toSend; i++ {
_, err := js.PublishAsync("foo", msg)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

sreq := &JSApiStreamSnapshotRequest{
DeliverSubject: nats.NewInbox(),
ChunkSize: 512,
}
req, _ := json.Marshal(sreq)
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamSnapshotT, "TEST"), req, time.Second)
require_NoError(t, err)

var resp JSApiStreamSnapshotResponse
json.Unmarshal(rmsg.Data, &resp)
require_True(t, resp.Error == nil)

state := *resp.State
cfg := *resp.Config

var snapshot []byte
done := make(chan bool)

sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) {
// EOF
if len(m.Data) == 0 {
done <- true
return
}
// Could be writing to a file here too.
snapshot = append(snapshot, m.Data...)
// Flow ack
m.Respond(nil)
})
defer sub.Unsubscribe()

// Wait to receive the snapshot.
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive our snapshot in time")
}

// Delete before we try to restore.
require_NoError(t, js.DeleteStream("TEST"))

checkHealth := func() {
for _, s := range c.servers {
s.healthz(nil)
}
}

var rresp JSApiStreamRestoreResponse
rreq := &JSApiStreamRestoreRequest{
Config: cfg,
State: state,
}
req, _ = json.Marshal(rreq)

rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second)
require_NoError(t, err)

rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
require_True(t, resp.Error == nil)

checkHealth()

// We will now chunk the snapshot responses (and EOF).
var chunk [1024]byte
for i, r := 0, bytes.NewReader(snapshot); ; {
n, err := r.Read(chunk[:])
if err != nil {
break
}
nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
i++
// We will call healthz for all servers half way through the restore.
if i%100 == 0 {
checkHealth()
}
}
rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second)
require_NoError(t, err)
rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
require_True(t, resp.Error == nil)

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == uint64(toSend))

// Make sure stepdown works, this would fail before the fix.
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, 5*time.Second)
require_NoError(t, err)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == uint64(toSend))
}
3 changes: 2 additions & 1 deletion server/monitor.go
Expand Up @@ -3147,7 +3147,8 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
for acc, asa := range cc.streams {
nasa := make(map[string]*streamAssignment)
for stream, sa := range asa {
if sa.Group.isMember(ourID) {
// If we are a member and we are not being restored, select for check.
if sa.Group.isMember(ourID) && sa.Restore == nil {
csa := sa.copyGroup()
csa.consumers = make(map[string]*consumerAssignment)
for consumer, ca := range sa.consumers {
Expand Down

0 comments on commit 503de45

Please sign in to comment.