Skip to content

Commit

Permalink
[release-1.12] Fix timeouts in HTTP service invocation when resilienc…
Browse files Browse the repository at this point in the history
…y policies with timeouts are applied

Backport of dapr#7270

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Dec 16, 2023
1 parent 9ec8497 commit 5644949
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 51 deletions.
103 changes: 59 additions & 44 deletions pkg/http/api_directmessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
"net/url"
"path"
"strings"
"sync/atomic"

"github.com/cenkalti/backoff/v4"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/resiliency/breaker"
"github.com/dapr/dapr/utils/responsewriter"
)

Expand Down Expand Up @@ -104,6 +105,7 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
},
)
success := atomic.Bool{}
// Since we don't want to return the actual error, we have to extract several things in order to construct our response.
resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
rResp, rErr := a.directMessaging.Invoke(ctx, targetID, req)
Expand All @@ -125,7 +127,7 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
// Construct response if not HTTP
resStatus := rResp.Status()
if !rResp.IsHTTPResponse() {
statusCode := int32(invokev1.HTTPStatusFromCode(codes.Code(resStatus.Code)))
statusCode := int32(invokev1.HTTPStatusFromCode(codes.Code(resStatus.GetCode())))
if statusCode != http.StatusOK {
// Close the response to replace the body
_ = rResp.Close()
Expand All @@ -142,73 +144,86 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
} else {
resStatus.Code = statusCode
}
} else if resStatus.Code < 200 || resStatus.Code > 399 {
} else if resStatus.GetCode() < 200 || resStatus.GetCode() > 399 {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
statusCode: int(resStatus.GetCode()),
msg: msg,
contentType: rResp.ContentType(),
}
}
return rResp, nil
})

// Special case for timeouts/circuit breakers since they won't go through the rest of the logic.
if errors.Is(err, context.DeadlineExceeded) || breaker.IsErrorPermanent(err) {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
// If we get to this point, we must consider the operation as successful, so we invoke this only once and we consider all errors returned by this to be permanent (so the policy function doesn't retry)
// We still need to be within the policy function because if we return, the context passed to `Invoke` is canceled, so the `Copy` operation below can fail with a ContextCanceled error
if !success.CompareAndSwap(false, true) {
// This error will never be returned to a client but it's here to prevent retries
return rResp, backoff.Permanent(errors.New("already completed"))
}

var codeErr codeError
if errors.As(err, &codeErr) {
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
if rResp == nil {
return nil, backoff.Permanent(errors.New("response object is nil"))
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
if resp != nil {
_ = resp.Close()

headers := rResp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}

defer rResp.Close()

if ct := rResp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(int(rResp.Status().GetCode()))

_, rErr = io.Copy(w, rResp.RawData())
if rErr != nil {
// Do not return rResp here, we already have a deferred `Close` call on it
return nil, backoff.Permanent(rErr)
}

// Do not return rResp here, we already have a deferred `Close` call on it
return nil, nil
})

// If there's no error, then everything is done already
if err == nil {
return
}

if resp != nil {
defer resp.Close()

// Set headers if present (if resp is not nil, they haven't been sent already)
headers := resp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}
}

var invokeErr invokeError
if errors.As(err, &invokeErr) {
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
if resp != nil {
_ = resp.Close()
// Handle errors; successful operations are already complete
var (
codeErr codeError
invokeErr invokeError
)
switch {
case errors.As(err, &codeErr):
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
return
}

if resp == nil {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, "response object is nil"))
case errors.As(err, &invokeErr):
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
return
}
defer resp.Close()

statusCode := int(resp.Status().Code)

if ct := resp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(statusCode)

_, err = io.Copy(w, resp.RawData())
if err != nil {
default:
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
Expand Down
33 changes: 26 additions & 7 deletions pkg/resiliency/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,43 @@ func NewRunnerWithOptions[T any](ctx context.Context, def *PolicyDefinition, opt
ctx, cancel := context.WithTimeout(ctx, def.t)
defer cancel()

done := make(chan doneCh[T])
timedOut := atomic.Bool{}
done := make(chan doneCh[T], 1)
go func() {
rRes, rErr := operCopy(ctx)
if !timedOut.Load() {
done <- doneCh[T]{rRes, rErr}
} else if opts.Disposer != nil && !isZero(rRes) {

// If the channel is full, it means we had a timeout
select {
case done <- doneCh[T]{rRes, rErr}:
// No timeout, all good
default:
// The operation has timed out
// Invoke the disposer if we have a non-zero return value
// Note that in case of timeouts we do not invoke the accumulator
opts.Disposer(rRes)
if opts.Disposer != nil && !isZero(rRes) {
opts.Disposer(rRes)
}
}
}()

select {
case v := <-done:
return v.res, v.err
case <-ctx.Done():
timedOut.Store(true)
// Because done has a capacity of 1, adding a message on the channel signals that there was a timeout
// However, the response may have arrived in the meanwhile, so we need to also check if something was added
select {
case done <- doneCh[T]{}:
// All good, nothing to do here
default:
// The response arrived at the same time as the context deadline, and the channel has a message
v := <-done

// Invoke the disposer if the return value is non-zero
// Note that in case of timeouts we do not invoke the accumulator
if opts.Disposer != nil && !isZero(v.res) {
opts.Disposer(v.res)
}
}
if def.addTimeoutActivatedMetric != nil && timeoutMetricsActivated.CompareAndSwap(false, true) {
def.addTimeoutActivatedMetric()
}
Expand Down

0 comments on commit 5644949

Please sign in to comment.