Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] Ordered consumer recreate on missing heartbeat #1097

Merged
merged 3 commits into from Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 14 additions & 7 deletions js.go
Expand Up @@ -1937,7 +1937,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc.mu.Unlock()

pushErr := func(err error) {
nc.handleConsumerSequenceMismatch(sub, err)
nc.handleConsumerSequenceMismatch(sub, fmt.Errorf("%w: recreating ordered consumer", err))
nc.unsubscribe(sub, 0, true)
}

Expand Down Expand Up @@ -1966,8 +1966,9 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {

resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
if errors.Is(err, ErrNoResponders) || errors.Is(err, ErrTimeout) {
// if creating consumer failed, retry
return
}
pushErr(err)
return
Expand Down Expand Up @@ -2040,11 +2041,17 @@ func (sub *Subscription) activityCheck() {
sub.mu.Unlock()

if !active {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
if !jsi.ordered || nc.Status() != CONNECTED {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
return
}
nc.mu.Unlock()
sub.mu.Lock()
sub.resetOrderedConsumer(jsi.sseq + 1)
sub.mu.Unlock()
}
}

Expand Down
89 changes: 59 additions & 30 deletions js_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -292,7 +293,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
testSyncConsumer()
}

func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
func TestJetStreamOrderedConsumerDeleteAssets(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

Expand Down Expand Up @@ -340,15 +341,9 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
}
}

type asset int
const (
deleteStream asset = iota
deleteConsumer
)
t.Run("remove stream, expect error", func(t *testing.T) {
createStream()

testSubError := func(a asset) {
t.Helper()
// Again here the IdleHeartbeat is not required, just overriding top shorten test time.
sub, err := js.SubscribeSync("a", OrderedConsumer(), IdleHeartbeat(200*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand All @@ -358,37 +353,71 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
// Since we are sync we will be paused here due to flow control.
time.Sleep(100 * time.Millisecond)
// Now delete the asset and make sure we get an error.
switch a {
case deleteStream:
if err := js.DeleteStream("OBJECT"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
case deleteConsumer:
// We need to grab our consumer name.
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := js.DeleteConsumer("OBJECT", ci.Name); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := js.DeleteStream("OBJECT"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Make sure we get an error.
select {
case err := <-errCh:
if err != ErrConsumerNotActive {
t.Fatalf("Got wrong error, wanted %v, got %v", ErrConsumerNotActive, err)
if !errors.Is(err, ErrStreamNotFound) {
t.Fatalf("Got wrong error, wanted %v, got %v", ErrStreamNotFound, err)
}
case <-time.After(time.Second):
t.Fatalf("Did not receive err message as expected")
}
}
})

createStream()
testSubError(deleteStream)
t.Run("remove consumer, expect it to be recreated", func(t *testing.T) {
createStream()

createStream()
testSubError(deleteConsumer)
createConsSub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.OBJECT")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer createConsSub.Unsubscribe()
// Again here the IdleHeartbeat is not required, just overriding top shorten test time.
sub, err := js.SubscribeSync("a", OrderedConsumer(), IdleHeartbeat(200*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

createConsMsg, err := createConsSub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(createConsMsg.Data), `"stream_name":"OBJECT"`) {
t.Fatalf("Invalid message on create consumer subject: %q", string(createConsMsg.Data))
}

time.Sleep(100 * time.Millisecond)
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
consName := ci.Name

if err := js.DeleteConsumer("OBJECT", consName); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
createConsMsg, err = createConsSub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(createConsMsg.Data), `"stream_name":"OBJECT"`) {
t.Fatalf("Invalid message on create consumer subject: %q", string(createConsMsg.Data))
}

time.Sleep(100 * time.Millisecond)
ci, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
newConsName := ci.Name
if consName == newConsName {
t.Fatalf("Consumer should be recreated, but consumer name is the same")
}
})
}

func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
Expand Down
75 changes: 75 additions & 0 deletions test/js_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
mrand "math/rand"
"net"
"net/url"
"os"
"reflect"
"strconv"
Expand Down Expand Up @@ -52,6 +53,24 @@ func shutdownJSServerAndRemoveStorage(t *testing.T, s *server.Server) {
s.WaitForShutdown()
}

func restartBasicJSServer(t *testing.T, s *server.Server) *server.Server {
opts := natsserver.DefaultTestOptions
clientURL, err := url.Parse(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
port, err := strconv.Atoi(clientURL.Port())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
opts.Port = port
opts.JetStream = true
opts.StoreDir = s.JetStreamConfig().StoreDir
s.Shutdown()
s.WaitForShutdown()
return RunServerWithOptions(opts)
}

func TestJetStreamNotEnabled(t *testing.T) {
s := RunServerOnPort(-1)
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down Expand Up @@ -8126,3 +8145,59 @@ func TestJetStreamMsgAckShouldErrForConsumerAckNone(t *testing.T) {
t.Fatalf("Expected error indicating that sub is AckNone, got %v", err)
}
}

func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) {
s := RunBasicJetStreamServer()

// monitor for ErrConsumerNotActive error and suppress logging
hbMissed := make(chan struct{})
errHandler := func(c *nats.Conn, s *nats.Subscription, err error) {
if !errors.Is(err, nats.ErrConsumerNotActive) {
t.Fatalf("Unexpected error: %v", err)
}
hbMissed <- struct{}{}
}
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler))
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := js.SubscribeSync("FOO.A", nats.OrderedConsumer(), nats.IdleHeartbeat(100*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish("FOO.A", []byte("msg 1")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(msg.Data) != "msg 1" {
t.Fatalf("Invalid msg value; want: 'msg 1'; got: %q", string(msg.Data))
}

// restart the server
s = restartBasicJSServer(t, s)
defer shutdownJSServerAndRemoveStorage(t, s)

// wait until we miss heartbeat
select {
case <-hbMissed:
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive consumer not active error")
}
if _, err := js.Publish("FOO.A", []byte("msg 2")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// make sure we pick up where we left off
if string(msg.Data) != "msg 2" {
t.Fatalf("Invalid msg value; want: 'msg 2'; got: %q", string(msg.Data))
}
}