Skip to content

Commit

Permalink
Consistent error value of context timeout on subscription Fetch() int…
Browse files Browse the repository at this point in the history
…erface. (#1011)

* Add test case about subscription timeout behavior

* Makes context timeout error behavior consistent.

If user provides context object and context timeout,
always return the timeout error from context.
  • Loading branch information
wdhongtw committed Jul 4, 2022
1 parent d29a40a commit 8a4b9f4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
4 changes: 2 additions & 2 deletions js.go
Expand Up @@ -2574,9 +2574,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// Check if context not done already before making the request.
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
if o.ctx != nil { // Timeout or Cancel triggered by context object option
err = ctx.Err()
} else {
} else { // Timeout triggered by timeout option
err = ErrTimeout
}
default:
Expand Down
16 changes: 16 additions & 0 deletions test/js_test.go
Expand Up @@ -6867,6 +6867,22 @@ func testJetStreamFetchContext(t *testing.T, srvs ...*jsServer) {
t.Errorf("Expected %d pending messages, got: %d", pending, total)
}
})

t.Run("MaxWait timeout should return nats error", func(t *testing.T) {
_, err := sub.Fetch(1, nats.MaxWait(1*time.Nanosecond))
if !errors.Is(err, nats.ErrTimeout) {
t.Fatalf("Expect ErrTimeout, got err=%#v", err)
}
})

t.Run("Context timeout should return context error", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
_, err := sub.Fetch(1, nats.Context(ctx))
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("Expect context.DeadlineExceeded, got err=%#v", err)
}
})
}

func TestJetStreamSubscribeContextCancel(t *testing.T) {
Expand Down

0 comments on commit 8a4b9f4

Please sign in to comment.