Skip to content

Commit

Permalink
[CHANGED] Ordered consumer recreate on missing heartbeat (#1097)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Oct 4, 2022
1 parent 36d2b65 commit 1ee76ea
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 37 deletions.
21 changes: 14 additions & 7 deletions js.go
Expand Up @@ -1937,7 +1937,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc.mu.Unlock()

pushErr := func(err error) {
nc.handleConsumerSequenceMismatch(sub, err)
nc.handleConsumerSequenceMismatch(sub, fmt.Errorf("%w: recreating ordered consumer", err))
nc.unsubscribe(sub, 0, true)
}

Expand Down Expand Up @@ -1966,8 +1966,9 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {

resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
if errors.Is(err, ErrNoResponders) || errors.Is(err, ErrTimeout) {
// if creating consumer failed, retry
return
}
pushErr(err)
return
Expand Down Expand Up @@ -2040,11 +2041,17 @@ func (sub *Subscription) activityCheck() {
sub.mu.Unlock()

if !active {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
if !jsi.ordered || nc.Status() != CONNECTED {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
return
}
nc.mu.Unlock()
sub.mu.Lock()
sub.resetOrderedConsumer(jsi.sseq + 1)
sub.mu.Unlock()
}
}

Expand Down
89 changes: 59 additions & 30 deletions js_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -292,7 +293,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
testSyncConsumer()
}

func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
func TestJetStreamOrderedConsumerDeleteAssets(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

Expand Down Expand Up @@ -340,15 +341,9 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
}
}

type asset int
const (
deleteStream asset = iota
deleteConsumer
)
t.Run("remove stream, expect error", func(t *testing.T) {
createStream()

testSubError := func(a asset) {
t.Helper()
// Again here the IdleHeartbeat is not required, just overriding top shorten test time.
sub, err := js.SubscribeSync("a", OrderedConsumer(), IdleHeartbeat(200*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand All @@ -358,37 +353,71 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
// Since we are sync we will be paused here due to flow control.
time.Sleep(100 * time.Millisecond)
// Now delete the asset and make sure we get an error.
switch a {
case deleteStream:
if err := js.DeleteStream("OBJECT"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
case deleteConsumer:
// We need to grab our consumer name.
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := js.DeleteConsumer("OBJECT", ci.Name); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := js.DeleteStream("OBJECT"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Make sure we get an error.
select {
case err := <-errCh:
if err != ErrConsumerNotActive {
t.Fatalf("Got wrong error, wanted %v, got %v", ErrConsumerNotActive, err)
if !errors.Is(err, ErrStreamNotFound) {
t.Fatalf("Got wrong error, wanted %v, got %v", ErrStreamNotFound, err)
}
case <-time.After(time.Second):
t.Fatalf("Did not receive err message as expected")
}
}
})

createStream()
testSubError(deleteStream)
t.Run("remove consumer, expect it to be recreated", func(t *testing.T) {
createStream()

createStream()
testSubError(deleteConsumer)
createConsSub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.OBJECT")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer createConsSub.Unsubscribe()
// Again here the IdleHeartbeat is not required, just overriding top shorten test time.
sub, err := js.SubscribeSync("a", OrderedConsumer(), IdleHeartbeat(200*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

createConsMsg, err := createConsSub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(createConsMsg.Data), `"stream_name":"OBJECT"`) {
t.Fatalf("Invalid message on create consumer subject: %q", string(createConsMsg.Data))
}

time.Sleep(100 * time.Millisecond)
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
consName := ci.Name

if err := js.DeleteConsumer("OBJECT", consName); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
createConsMsg, err = createConsSub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(createConsMsg.Data), `"stream_name":"OBJECT"`) {
t.Fatalf("Invalid message on create consumer subject: %q", string(createConsMsg.Data))
}

time.Sleep(100 * time.Millisecond)
ci, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
newConsName := ci.Name
if consName == newConsName {
t.Fatalf("Consumer should be recreated, but consumer name is the same")
}
})
}

func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
Expand Down
75 changes: 75 additions & 0 deletions test/js_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
mrand "math/rand"
"net"
"net/url"
"os"
"reflect"
"strconv"
Expand Down Expand Up @@ -52,6 +53,24 @@ func shutdownJSServerAndRemoveStorage(t *testing.T, s *server.Server) {
s.WaitForShutdown()
}

func restartBasicJSServer(t *testing.T, s *server.Server) *server.Server {
opts := natsserver.DefaultTestOptions
clientURL, err := url.Parse(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
port, err := strconv.Atoi(clientURL.Port())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
opts.Port = port
opts.JetStream = true
opts.StoreDir = s.JetStreamConfig().StoreDir
s.Shutdown()
s.WaitForShutdown()
return RunServerWithOptions(opts)
}

func TestJetStreamNotEnabled(t *testing.T) {
s := RunServerOnPort(-1)
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down Expand Up @@ -8126,3 +8145,59 @@ func TestJetStreamMsgAckShouldErrForConsumerAckNone(t *testing.T) {
t.Fatalf("Expected error indicating that sub is AckNone, got %v", err)
}
}

func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) {
s := RunBasicJetStreamServer()

// monitor for ErrConsumerNotActive error and suppress logging
hbMissed := make(chan struct{})
errHandler := func(c *nats.Conn, s *nats.Subscription, err error) {
if !errors.Is(err, nats.ErrConsumerNotActive) {
t.Fatalf("Unexpected error: %v", err)
}
hbMissed <- struct{}{}
}
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler))
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := js.SubscribeSync("FOO.A", nats.OrderedConsumer(), nats.IdleHeartbeat(100*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish("FOO.A", []byte("msg 1")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(msg.Data) != "msg 1" {
t.Fatalf("Invalid msg value; want: 'msg 1'; got: %q", string(msg.Data))
}

// restart the server
s = restartBasicJSServer(t, s)
defer shutdownJSServerAndRemoveStorage(t, s)

// wait until we miss heartbeat
select {
case <-hbMissed:
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive consumer not active error")
}
if _, err := js.Publish("FOO.A", []byte("msg 2")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// make sure we pick up where we left off
if string(msg.Data) != "msg 2" {
t.Fatalf("Invalid msg value; want: 'msg 2'; got: %q", string(msg.Data))
}
}

0 comments on commit 1ee76ea

Please sign in to comment.