New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHANGED] Ordered consumer recreate on missing heartbeat #1097
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"crypto/sha256" | ||
"encoding/base64" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io/ioutil" | ||
"math/rand" | ||
|
@@ -292,7 +293,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { | |
testSyncConsumer() | ||
} | ||
|
||
func TestJetStreamOrderedConsumerWithErrors(t *testing.T) { | ||
func TestOrderedConsumerDeleteAssets(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to maintain |
||
s := RunBasicJetStreamServer() | ||
defer shutdownJSServerAndRemoveStorage(t, s) | ||
|
||
|
@@ -340,15 +341,9 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) { | |
} | ||
} | ||
|
||
type asset int | ||
const ( | ||
deleteStream asset = iota | ||
deleteConsumer | ||
) | ||
t.Run("remove stream, expect error", func(t *testing.T) { | ||
createStream() | ||
|
||
testSubError := func(a asset) { | ||
t.Helper() | ||
// Again here the IdleHeartbeat is not required, just overriding top shorten test time. | ||
sub, err := js.SubscribeSync("a", OrderedConsumer(), IdleHeartbeat(200*time.Millisecond)) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
|
@@ -358,37 +353,71 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) { | |
// Since we are sync we will be paused here due to flow control. | ||
time.Sleep(100 * time.Millisecond) | ||
// Now delete the asset and make sure we get an error. | ||
switch a { | ||
case deleteStream: | ||
if err := js.DeleteStream("OBJECT"); err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
case deleteConsumer: | ||
// We need to grab our consumer name. | ||
ci, err := sub.ConsumerInfo() | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
if err := js.DeleteConsumer("OBJECT", ci.Name); err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
if err := js.DeleteStream("OBJECT"); err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
// Make sure we get an error. | ||
select { | ||
case err := <-errCh: | ||
if err != ErrConsumerNotActive { | ||
t.Fatalf("Got wrong error, wanted %v, got %v", ErrConsumerNotActive, err) | ||
if !errors.Is(err, ErrStreamNotFound) { | ||
t.Fatalf("Got wrong error, wanted %v, got %v", ErrStreamNotFound, err) | ||
} | ||
case <-time.After(time.Second): | ||
t.Fatalf("Did not receive err message as expected") | ||
} | ||
} | ||
}) | ||
|
||
createStream() | ||
testSubError(deleteStream) | ||
t.Run("remove consumer, expect it to be recreated", func(t *testing.T) { | ||
createStream() | ||
|
||
createStream() | ||
testSubError(deleteConsumer) | ||
createConsSub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.OBJECT") | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
defer createConsSub.Unsubscribe() | ||
// Again here the IdleHeartbeat is not required, just overriding top shorten test time. | ||
sub, err := js.SubscribeSync("a", OrderedConsumer(), IdleHeartbeat(200*time.Millisecond)) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
defer sub.Unsubscribe() | ||
|
||
createConsMsg, err := createConsSub.NextMsg(time.Second) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
if !strings.Contains(string(createConsMsg.Data), `"stream_name":"OBJECT"`) { | ||
t.Fatalf("Invalid message on create consumer subject: %q", string(createConsMsg.Data)) | ||
} | ||
|
||
time.Sleep(100 * time.Millisecond) | ||
ci, err := sub.ConsumerInfo() | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
consName := ci.Name | ||
|
||
if err := js.DeleteConsumer("OBJECT", consName); err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
createConsMsg, err = createConsSub.NextMsg(time.Second) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
if !strings.Contains(string(createConsMsg.Data), `"stream_name":"OBJECT"`) { | ||
t.Fatalf("Invalid message on create consumer subject: %q", string(createConsMsg.Data)) | ||
} | ||
|
||
time.Sleep(100 * time.Millisecond) | ||
ci, err = sub.ConsumerInfo() | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
newConsName := ci.Name | ||
if consName == newConsName { | ||
t.Fatalf("Consumer should be recreated, but consumer name is the same") | ||
} | ||
}) | ||
} | ||
|
||
func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"io/ioutil" | ||
mrand "math/rand" | ||
"net" | ||
"net/url" | ||
"os" | ||
"reflect" | ||
"strconv" | ||
|
@@ -52,6 +53,24 @@ func shutdownJSServerAndRemoveStorage(t *testing.T, s *server.Server) { | |
s.WaitForShutdown() | ||
} | ||
|
||
func restartBasicJSServer(t *testing.T, s *server.Server) *server.Server { | ||
opts := natsserver.DefaultTestOptions | ||
clientURL, err := url.Parse(s.ClientURL()) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
port, err := strconv.Atoi(clientURL.Port()) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
opts.Port = port | ||
opts.JetStream = true | ||
opts.StoreDir = s.JetStreamConfig().StoreDir | ||
s.Shutdown() | ||
s.WaitForShutdown() | ||
return RunServerWithOptions(opts) | ||
} | ||
|
||
func TestJetStreamNotEnabled(t *testing.T) { | ||
s := RunServerOnPort(-1) | ||
defer shutdownJSServerAndRemoveStorage(t, s) | ||
|
@@ -8126,3 +8145,59 @@ func TestJetStreamMsgAckShouldErrForConsumerAckNone(t *testing.T) { | |
t.Fatalf("Expected error indicating that sub is AckNone, got %v", err) | ||
} | ||
} | ||
|
||
func TestOrderedConsumerRecreateAfterReconnect(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment than before. |
||
s := RunBasicJetStreamServer() | ||
|
||
// monitor for ErrConsumerNotActive error and suppress logging | ||
hbMissed := make(chan struct{}) | ||
errHandler := func(c *nats.Conn, s *nats.Subscription, err error) { | ||
if !errors.Is(err, nats.ErrConsumerNotActive) { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
hbMissed <- struct{}{} | ||
} | ||
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler)) | ||
defer nc.Close() | ||
|
||
if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
sub, err := js.SubscribeSync("FOO.A", nats.OrderedConsumer(), nats.IdleHeartbeat(100*time.Millisecond)) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
if _, err := js.Publish("FOO.A", []byte("msg 1")); err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
msg, err := sub.NextMsg(time.Second) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
if string(msg.Data) != "msg 1" { | ||
t.Fatalf("Invalid msg value; want: 'msg 1'; got: %q", string(msg.Data)) | ||
} | ||
|
||
// restart the server | ||
s = restartBasicJSServer(t, s) | ||
defer shutdownJSServerAndRemoveStorage(t, s) | ||
|
||
// wait until we miss heartbeat | ||
select { | ||
case <-hbMissed: | ||
case <-time.After(10 * time.Second): | ||
t.Fatalf("Did not receive consumer not active error") | ||
} | ||
if _, err := js.Publish("FOO.A", []byte("msg 2")); err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
msg, err = sub.NextMsg(time.Second) | ||
if err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
|
||
// make sure we pick up where we left off | ||
if string(msg.Data) != "msg 2" { | ||
t.Fatalf("Invalid msg value; want: 'msg 2'; got: %q", string(msg.Data)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the
%w
doing? Also, since you seem to have this fmt.Errorf() almost every time you call pushErr(), should you have that part of pushErr() itself? (I know there is a check/pushErr in case of request marshaling failing, but that is very unlikely, and still it would be a failure recreating the ordered consumer anyway).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
%w
is the error wrapping semantic. Thanks to it, if you call e.g.errors.Is(err, ErrStreamNotFound)
on this error it will unwrap correctly and return true forfmt.Errorf("%w: recreating ordered consumer", ErrStreamNotFound)
.As to your second point, you're right, I'll change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, I added the
recreating ordered consumer
part to at least give some insight to the user as to where an error happened (async error callback does not give any information on where the error comes from, so this is better than nothing).In the near future I'll be working on the async errors revamp (at least for ordered consumers) to be able to identify them and react more easily)