Skip to content

Commit

Permalink
Merge pull request #838 from nats-io/js-ctx-fixes
Browse files Browse the repository at this point in the history
js: Fix context usage with sub.Fetch
  • Loading branch information
wallyqs committed Oct 5, 2021
2 parents 4a0ad2a + 8f86c1d commit 8c2b0bf
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 70 deletions.
44 changes: 36 additions & 8 deletions js.go
Expand Up @@ -2148,6 +2148,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
js := sub.jsi.js
pmc := len(sub.mch) > 0

// All fetch requests have an expiration, in case of no explicit expiration
// then the default timeout of the JetStream context is used.
ttl := o.ttl
if ttl == 0 {
ttl = js.opts.wait
Expand All @@ -2161,9 +2163,20 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
err error
cancel context.CancelFunc
)
if o.ctx == nil {
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), ttl)
defer cancel()
} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// Prevent from passing the background context which will just block
// and cannot be canceled either.
if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
return nil, ErrNoDeadlineContext
}

// If the context did not have a deadline, then create a new child context
// that will use the default timeout from the JS context.
ctx, cancel = context.WithTimeout(ctx, ttl)
defer cancel()
}

// Check if context not done already before making the request.
Expand All @@ -2180,6 +2193,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
return nil, err
}

// Use the deadline of the context to base the expire times.
deadline, _ := ctx.Deadline()
ttl = time.Until(deadline)
checkCtxErr := func(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
return ErrTimeout
Expand All @@ -2188,9 +2204,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}

var (
msgs = make([]*Msg, 0, batch)
msg *Msg
start = time.Now()
msgs = make([]*Msg, 0, batch)
msg *Msg
)
for pmc && len(msgs) < batch {
// Check next msg with booleans that say that this is an internal call
Expand Down Expand Up @@ -2218,11 +2233,18 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
var nr nextRequest

sendReq := func() error {
ttl -= time.Since(start)
if ttl < 0 {
// At this point consider that we have timed-out
return context.DeadlineExceeded
// The current deadline for the context will be used
// to set the expires TTL for a fetch request.
deadline, _ = ctx.Deadline()
ttl = time.Until(deadline)

// Check if context has already been canceled or expired.
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
Expand Down Expand Up @@ -2343,6 +2365,12 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {

usesCtx := o.ctx != nil
usesWait := o.ttl > 0

// Only allow either AckWait or Context option to set the timeout.
if usesWait && usesCtx {
return ErrContextAndTimeout
}

sync = sync || usesCtx || usesWait
ctx := o.ctx
wait := defaultRequestWait
Expand Down

0 comments on commit 8c2b0bf

Please sign in to comment.