Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] Ordered consumer recreate on missing heartbeat #1097

Merged
merged 3 commits into from Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 16 additions & 9 deletions js.go
Expand Up @@ -1966,22 +1966,23 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {

resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
if errors.Is(err, ErrNoResponders) || errors.Is(err, ErrTimeout) {
// if creating consumer failed, retry
return
}
pushErr(err)
pushErr(fmt.Errorf("%w: recreating ordered consumer", err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the %w doing? Also, since you seem to have this fmt.Errorf() almost every time you call pushErr(), should you have that part of pushErr() itself? (I know there is a check/pushErr in case of request marshaling failing, but that is very unlikely, and still it would be a failure recreating the ordered consumer anyway).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%w is the error wrapping semantic. Thanks to it, if you call e.g. errors.Is(err, ErrStreamNotFound) on this error it will unwrap correctly and return true for fmt.Errorf("%w: recreating ordered consumer", ErrStreamNotFound).

As to your second point, you're right, I'll change it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, I added the recreating ordered consumer part to at least give some insight to the user as to where an error happened (async error callback does not give any information on where the error comes from, so this is better than nothing).
In the near future I'll be working on the async errors revamp (at least for ordered consumers) to be able to identify them and react more easily)

return
}

var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
pushErr(err)
pushErr(fmt.Errorf("%w: recreating ordered consumer", err))
return
}

if cinfo.Error != nil {
pushErr(cinfo.Error)
pushErr(fmt.Errorf("%w: recreating ordered consumer", cinfo.Error))
return
}

Expand Down Expand Up @@ -2040,11 +2041,17 @@ func (sub *Subscription) activityCheck() {
sub.mu.Unlock()

if !active {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
if !jsi.ordered || nc.Status() != CONNECTED {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
return
}
nc.mu.Unlock()
sub.mu.Lock()
sub.resetOrderedConsumer(jsi.sseq + 1)
sub.mu.Unlock()
}
}

Expand Down
89 changes: 59 additions & 30 deletions js_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -292,7 +293,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
testSyncConsumer()
}

func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
func TestOrderedConsumerDeleteAssets(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to maintain TestJetStream prefix so that if you want to quickly check that your JS changes are having an impact, you can limit the tests locally to run JetStream tests...

s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

Expand Down Expand Up @@ -340,15 +341,9 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
}
}

type asset int
const (
deleteStream asset = iota
deleteConsumer
)
t.Run("remove stream, expect error", func(t *testing.T) {
createStream()

testSubError := func(a asset) {
t.Helper()
// Again here the IdleHeartbeat is not required, just overriding top shorten test time.
sub, err := js.SubscribeSync("a", OrderedConsumer(), IdleHeartbeat(200*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand All @@ -358,37 +353,71 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
// Since we are sync we will be paused here due to flow control.
time.Sleep(100 * time.Millisecond)
// Now delete the asset and make sure we get an error.
switch a {
case deleteStream:
if err := js.DeleteStream("OBJECT"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
case deleteConsumer:
// We need to grab our consumer name.
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := js.DeleteConsumer("OBJECT", ci.Name); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := js.DeleteStream("OBJECT"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Make sure we get an error.
select {
case err := <-errCh:
if err != ErrConsumerNotActive {
t.Fatalf("Got wrong error, wanted %v, got %v", ErrConsumerNotActive, err)
if !errors.Is(err, ErrStreamNotFound) {
t.Fatalf("Got wrong error, wanted %v, got %v", ErrStreamNotFound, err)
}
case <-time.After(time.Second):
t.Fatalf("Did not receive err message as expected")
}
}
})

createStream()
testSubError(deleteStream)
t.Run("remove consumer, expect it to be recreated", func(t *testing.T) {
createStream()

createStream()
testSubError(deleteConsumer)
createConsSub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.OBJECT")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer createConsSub.Unsubscribe()
// Again here the IdleHeartbeat is not required, just overriding top shorten test time.
sub, err := js.SubscribeSync("a", OrderedConsumer(), IdleHeartbeat(200*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

createConsMsg, err := createConsSub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(createConsMsg.Data), `"stream_name":"OBJECT"`) {
t.Fatalf("Invalid message on create consumer subject: %q", string(createConsMsg.Data))
}

time.Sleep(100 * time.Millisecond)
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
consName := ci.Name

if err := js.DeleteConsumer("OBJECT", consName); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
createConsMsg, err = createConsSub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(createConsMsg.Data), `"stream_name":"OBJECT"`) {
t.Fatalf("Invalid message on create consumer subject: %q", string(createConsMsg.Data))
}

time.Sleep(100 * time.Millisecond)
ci, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
newConsName := ci.Name
if consName == newConsName {
t.Fatalf("Consumer should be recreated, but consumer name is the same")
}
})
}

func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
Expand Down
75 changes: 75 additions & 0 deletions test/js_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
mrand "math/rand"
"net"
"net/url"
"os"
"reflect"
"strconv"
Expand Down Expand Up @@ -52,6 +53,24 @@ func shutdownJSServerAndRemoveStorage(t *testing.T, s *server.Server) {
s.WaitForShutdown()
}

func restartBasicJSServer(t *testing.T, s *server.Server) *server.Server {
opts := natsserver.DefaultTestOptions
clientURL, err := url.Parse(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
port, err := strconv.Atoi(clientURL.Port())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
opts.Port = port
opts.JetStream = true
opts.StoreDir = s.JetStreamConfig().StoreDir
s.Shutdown()
s.WaitForShutdown()
return RunServerWithOptions(opts)
}

func TestJetStreamNotEnabled(t *testing.T) {
s := RunServerOnPort(-1)
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down Expand Up @@ -8126,3 +8145,59 @@ func TestJetStreamMsgAckShouldErrForConsumerAckNone(t *testing.T) {
t.Fatalf("Expected error indicating that sub is AckNone, got %v", err)
}
}

func TestOrderedConsumerRecreateAfterReconnect(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment than before.

s := RunBasicJetStreamServer()

// monitor for ErrConsumerNotActive error and suppress logging
hbMissed := make(chan struct{})
errHandler := func(c *nats.Conn, s *nats.Subscription, err error) {
if !errors.Is(err, nats.ErrConsumerNotActive) {
t.Fatalf("Unexpected error: %v", err)
}
hbMissed <- struct{}{}
}
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler))
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := js.SubscribeSync("FOO.A", nats.OrderedConsumer(), nats.IdleHeartbeat(100*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish("FOO.A", []byte("msg 1")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(msg.Data) != "msg 1" {
t.Fatalf("Invalid msg value; want: 'msg 1'; got: %q", string(msg.Data))
}

// restart the server
s = restartBasicJSServer(t, s)
defer shutdownJSServerAndRemoveStorage(t, s)

// wait until we miss heartbeat
select {
case <-hbMissed:
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive consumer not active error")
}
if _, err := js.Publish("FOO.A", []byte("msg 2")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// make sure we pick up where we left off
if string(msg.Data) != "msg 2" {
t.Fatalf("Invalid msg value; want: 'msg 2'; got: %q", string(msg.Data))
}
}