Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timeouts in HTTP service invocation when resiliency policies with timeouts are applied #7270

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
97 changes: 56 additions & 41 deletions pkg/http/api_directmessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
"net/url"
"path"
"strings"
"sync/atomic"

"github.com/cenkalti/backoff/v4"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -32,7 +34,6 @@
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/resiliency/breaker"
)

// directMessagingSpanData is the data passed by the onDirectMessage endpoint to the tracing middleware
Expand Down Expand Up @@ -150,6 +151,7 @@
Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
},
)
success := atomic.Bool{}
// Since we don't want to return the actual error, we have to extract several things in order to construct our response.
resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
rResp, rErr := a.directMessaging.Invoke(ctx, targetID, req)
Expand Down Expand Up @@ -198,63 +200,76 @@
contentType: rResp.ContentType(),
}
}
return rResp, nil
})

// Special case for timeouts/circuit breakers since they won't go through the rest of the logic.
if errors.Is(err, context.DeadlineExceeded) || breaker.IsErrorPermanent(err) {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
// If we get to this point, we must consider the operation as successful, so we invoke this only once and we consider all errors returned by this to be permanent (so the policy function doesn't retry)
// We still need to be within the policy function because if we return, the context passed to `Invoke` is canceled, so the `Copy` operation below can fail with a ContextCanceled error
if !success.CompareAndSwap(false, true) {
// This error will never be returned to a client but it's here to prevent retries
return rResp, backoff.Permanent(errors.New("already completed"))

Check warning on line 208 in pkg/http/api_directmessaging.go

View check run for this annotation

Codecov / codecov/patch

pkg/http/api_directmessaging.go#L208

Added line #L208 was not covered by tests
}

var codeErr codeError
if errors.As(err, &codeErr) {
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
if rResp == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are multiple instances of the function running in parallel? Otherwise, how does execution get to 211 if the first execution returns an error that prevents retries? (I'm sure it's just my lack of experience with Go and the policy runner APIs.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, multiple instances of the function can be running in parallel if one of them times out. When there's a timeout the policy runner cancels the context, which is a request (not an order) to stop processing; at the same time, it invokes the function again.

The reason why we don't have issues with concurrency is the lines just above:

if !success.CompareAndSwap(false, true) {
	return rResp, backoff.Permanent(errors.New("already completed"))
}

This code uses an atomic compare-and-swap:

  • if success is false (initial value), then its value is changed to true (atomically) and the function returns true
  • if success is already true, then the function returns false, so we return a permanent error

In all the lines below that, we return all errors as backoff.Permanent, which makes the policy runner not retry in case of errors.

If the retry was due to a timeout, instead, it would hit the compare-and-swap and return right away. In this case we can't go past this if block because we have already started sending data to the client.

return nil, backoff.Permanent(errors.New("response object is nil"))

Check warning on line 212 in pkg/http/api_directmessaging.go

View check run for this annotation

Codecov / codecov/patch

pkg/http/api_directmessaging.go#L212

Added line #L212 was not covered by tests
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
if resp != nil {
_ = resp.Close()

headers := rResp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}

defer rResp.Close()

if ct := rResp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(int(rResp.Status().GetCode()))

_, rErr = io.Copy(w, rResp.RawData())
if rErr != nil {
// Do not return rResp here, we already have a deferred `Close` call on it
return nil, backoff.Permanent(rErr)

Check warning on line 231 in pkg/http/api_directmessaging.go

View check run for this annotation

Codecov / codecov/patch

pkg/http/api_directmessaging.go#L231

Added line #L231 was not covered by tests
}

// Do not return rResp here, we already have a deferred `Close` call on it
return nil, nil
})

// If there's no error, then everything is done already
if err == nil {
return
}

if resp != nil {
defer resp.Close()

// Set headers if present (if resp is not nil, they haven't been sent already)
headers := resp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}
}

var invokeErr invokeError
if errors.As(err, &invokeErr) {
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
if resp != nil {
_ = resp.Close()
// Handle errors; successful operations are already complete
var (
codeErr codeError
invokeErr invokeError
)
switch {
case errors.As(err, &codeErr):
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
return
}

if resp == nil {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, "response object is nil"))
case errors.As(err, &invokeErr):
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
return
}
defer resp.Close()

statusCode := int(resp.Status().GetCode())

if ct := resp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(statusCode)

_, err = io.Copy(w, resp.RawData())
if err != nil {
default:
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
Expand Down
33 changes: 26 additions & 7 deletions pkg/resiliency/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,43 @@
ctx, cancel := context.WithTimeout(ctx, def.t)
defer cancel()

done := make(chan doneCh[T])
timedOut := atomic.Bool{}
done := make(chan doneCh[T], 1)
go func() {
rRes, rErr := operCopy(ctx)
if !timedOut.Load() {
done <- doneCh[T]{rRes, rErr}
} else if opts.Disposer != nil && !isZero(rRes) {

// If the channel is full, it means we had a timeout
select {
case done <- doneCh[T]{rRes, rErr}:
// No timeout, all good
default:
// The operation has timed out
// Invoke the disposer if we have a non-zero return value
// Note that in case of timeouts we do not invoke the accumulator
opts.Disposer(rRes)
if opts.Disposer != nil && !isZero(rRes) {
opts.Disposer(rRes)
}
}
}()

select {
case v := <-done:
return v.res, v.err
case <-ctx.Done():
timedOut.Store(true)
// Because done has a capacity of 1, adding a message on the channel signals that there was a timeout
// However, the response may have arrived in the meanwhile, so we need to also check if something was added
select {
case done <- doneCh[T]{}:
// All good, nothing to do here
default:

Check warning on line 149 in pkg/resiliency/policy.go

View check run for this annotation

Codecov / codecov/patch

pkg/resiliency/policy.go#L149

Added line #L149 was not covered by tests
// The response arrived at the same time as the context deadline, and the channel has a message
v := <-done

Check warning on line 151 in pkg/resiliency/policy.go

View check run for this annotation

Codecov / codecov/patch

pkg/resiliency/policy.go#L151

Added line #L151 was not covered by tests

// Invoke the disposer if the return value is non-zero
// Note that in case of timeouts we do not invoke the accumulator
if opts.Disposer != nil && !isZero(v.res) {
opts.Disposer(v.res)

Check warning on line 156 in pkg/resiliency/policy.go

View check run for this annotation

Codecov / codecov/patch

pkg/resiliency/policy.go#L155-L156

Added lines #L155 - L156 were not covered by tests
}
}
if def.addTimeoutActivatedMetric != nil && timeoutMetricsActivated.CompareAndSwap(false, true) {
def.addTimeoutActivatedMetric()
}
Expand Down