Skip to content

Commit

Permalink
js: Fix context usage with sub.Fetch and msg.Ack
Browse files Browse the repository at this point in the history
The deadline of a context is now used to calculate
the time used for `expires` instead of the default `ttl`
of the JetStream context which was 5s.  This was preventing
library users from passing a context with a custom timeout.

This also disallows the usage of `context.Background`
to make it explicit that `sub.Fetch` has to be used
with a context that has a timeout since each fetch
request has to include an expire time anyway.

In case `context.WithCancel` is used, then a child context
with the same duration as the JetStream context default
timeout will be created.

Also in case msg.Ack it was possible to pass both timeout
and a context which would have been ambiguous and only
context option being used.

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Oct 5, 2021
1 parent 4a0ad2a commit ed9b816
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 70 deletions.
44 changes: 36 additions & 8 deletions js.go
Expand Up @@ -2148,6 +2148,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
js := sub.jsi.js
pmc := len(sub.mch) > 0

// All fetch requests have an expiration, in case of no explicit expiration
// then the default timeout of the JetStream context is used.
ttl := o.ttl
if ttl == 0 {
ttl = js.opts.wait
Expand All @@ -2161,9 +2163,20 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
err error
cancel context.CancelFunc
)
if o.ctx == nil {
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), ttl)
defer cancel()
} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// Prevent from passing the background context which will just block
// and cannot be canceled either.
if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
return nil, ErrNoDeadlineContext
}

// If the context did not have a deadline, then create a new child context
// that will use the default timeout from the JS context.
ctx, cancel = context.WithTimeout(ctx, ttl)
defer cancel()
}

// Check if context not done already before making the request.
Expand All @@ -2180,6 +2193,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
return nil, err
}

// Use the deadline of the context to base the expire times.
deadline, _ := ctx.Deadline()
ttl = time.Until(deadline)
checkCtxErr := func(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
return ErrTimeout
Expand All @@ -2188,9 +2204,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}

var (
msgs = make([]*Msg, 0, batch)
msg *Msg
start = time.Now()
msgs = make([]*Msg, 0, batch)
msg *Msg
)
for pmc && len(msgs) < batch {
// Check next msg with booleans that say that this is an internal call
Expand Down Expand Up @@ -2218,11 +2233,18 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
var nr nextRequest

sendReq := func() error {
ttl -= time.Since(start)
if ttl < 0 {
// At this point consider that we have timed-out
return context.DeadlineExceeded
// The current deadline for the context will be used
// to set the expires TTL for a fetch request.
deadline, _ = ctx.Deadline()
ttl = time.Until(deadline)

// Check if context has already been canceled or expired.
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
Expand Down Expand Up @@ -2343,6 +2365,12 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {

usesCtx := o.ctx != nil
usesWait := o.ttl > 0

// Only allow either AckWait or Context option to set the timeout.
if usesWait && usesCtx {
return ErrContextAndTimeout
}

sync = sync || usesCtx || usesWait
ctx := o.ctx
wait := defaultRequestWait
Expand Down

0 comments on commit ed9b816

Please sign in to comment.