Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

js: Fix context usage with sub.Fetch #838

Merged
merged 1 commit into from Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 36 additions & 8 deletions js.go
Expand Up @@ -2148,6 +2148,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
js := sub.jsi.js
pmc := len(sub.mch) > 0

// All fetch requests have an expiration, in case of no explicit expiration
// then the default timeout of the JetStream context is used.
ttl := o.ttl
if ttl == 0 {
ttl = js.opts.wait
Expand All @@ -2161,9 +2163,20 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
err error
cancel context.CancelFunc
)
if o.ctx == nil {
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), ttl)
defer cancel()
} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// Prevent from passing the background context which will just block
// and cannot be canceled either.
if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
return nil, ErrNoDeadlineContext
}

// If the context did not have a deadline, then create a new child context
// that will use the default timeout from the JS context.
ctx, cancel = context.WithTimeout(ctx, ttl)
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
}

// Check if context not done already before making the request.
Expand All @@ -2180,6 +2193,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
return nil, err
}

// Use the deadline of the context to base the expire times.
deadline, _ := ctx.Deadline()
ttl = time.Until(deadline)
checkCtxErr := func(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
return ErrTimeout
Expand All @@ -2188,9 +2204,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}

var (
msgs = make([]*Msg, 0, batch)
msg *Msg
start = time.Now()
msgs = make([]*Msg, 0, batch)
msg *Msg
)
for pmc && len(msgs) < batch {
// Check next msg with booleans that say that this is an internal call
Expand Down Expand Up @@ -2218,11 +2233,18 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
var nr nextRequest

sendReq := func() error {
ttl -= time.Since(start)
if ttl < 0 {
// At this point consider that we have timed-out
return context.DeadlineExceeded
// The current deadline for the context will be used
// to set the expires TTL for a fetch request.
deadline, _ = ctx.Deadline()
ttl = time.Until(deadline)

// Check if context has already been canceled or expired.
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
Expand Down Expand Up @@ -2343,6 +2365,12 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {

usesCtx := o.ctx != nil
usesWait := o.ttl > 0

// Only allow either AckWait or Context option to set the timeout.
if usesWait && usesCtx {
return ErrContextAndTimeout
}

sync = sync || usesCtx || usesWait
ctx := o.ctx
wait := defaultRequestWait
Expand Down