Skip to content

Commit

Permalink
Fix timeouts in HTTP service invocation when resiliency policies with…
Browse files Browse the repository at this point in the history
… timeouts are applied (#7270)

* Fix race condition in policy runner when there's a timeout

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Better way to fix the error

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle and dapr-bot committed Dec 11, 2023
1 parent ed34172 commit 53ceb4a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 48 deletions.
97 changes: 56 additions & 41 deletions pkg/http/api_directmessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"net/url"
"path"
"strings"
"sync/atomic"

"github.com/cenkalti/backoff/v4"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -32,7 +34,6 @@ import (
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/resiliency/breaker"
)

// directMessagingSpanData is the data passed by the onDirectMessage endpoint to the tracing middleware
Expand Down Expand Up @@ -150,6 +151,7 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
},
)
success := atomic.Bool{}
// Since we don't want to return the actual error, we have to extract several things in order to construct our response.
resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
rResp, rErr := a.directMessaging.Invoke(ctx, targetID, req)
Expand Down Expand Up @@ -198,63 +200,76 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
contentType: rResp.ContentType(),
}
}
return rResp, nil
})

// Special case for timeouts/circuit breakers since they won't go through the rest of the logic.
if errors.Is(err, context.DeadlineExceeded) || breaker.IsErrorPermanent(err) {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
// If we get to this point, we must consider the operation as successful, so we invoke this only once and we consider all errors returned by this to be permanent (so the policy function doesn't retry)
// We still need to be within the policy function because if we return, the context passed to `Invoke` is canceled, so the `Copy` operation below can fail with a ContextCanceled error
if !success.CompareAndSwap(false, true) {
// This error will never be returned to a client but it's here to prevent retries
return rResp, backoff.Permanent(errors.New("already completed"))
}

var codeErr codeError
if errors.As(err, &codeErr) {
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
if rResp == nil {
return nil, backoff.Permanent(errors.New("response object is nil"))
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
if resp != nil {
_ = resp.Close()

headers := rResp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}

defer rResp.Close()

if ct := rResp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(int(rResp.Status().GetCode()))

_, rErr = io.Copy(w, rResp.RawData())
if rErr != nil {
// Do not return rResp here, we already have a deferred `Close` call on it
return nil, backoff.Permanent(rErr)
}

// Do not return rResp here, we already have a deferred `Close` call on it
return nil, nil
})

// If there's no error, then everything is done already
if err == nil {
return
}

if resp != nil {
defer resp.Close()

// Set headers if present (if resp is not nil, they haven't been sent already)
headers := resp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}
}

var invokeErr invokeError
if errors.As(err, &invokeErr) {
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
if resp != nil {
_ = resp.Close()
// Handle errors; successful operations are already complete
var (
codeErr codeError
invokeErr invokeError
)
switch {
case errors.As(err, &codeErr):
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
return
}

if resp == nil {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, "response object is nil"))
case errors.As(err, &invokeErr):
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
return
}
defer resp.Close()

statusCode := int(resp.Status().GetCode())

if ct := resp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(statusCode)

_, err = io.Copy(w, resp.RawData())
if err != nil {
default:
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
Expand Down
33 changes: 26 additions & 7 deletions pkg/resiliency/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,43 @@ func NewRunnerWithOptions[T any](ctx context.Context, def *PolicyDefinition, opt
ctx, cancel := context.WithTimeout(ctx, def.t)
defer cancel()

done := make(chan doneCh[T])
timedOut := atomic.Bool{}
done := make(chan doneCh[T], 1)
go func() {
rRes, rErr := operCopy(ctx)
if !timedOut.Load() {
done <- doneCh[T]{rRes, rErr}
} else if opts.Disposer != nil && !isZero(rRes) {

// If the channel is full, it means we had a timeout
select {
case done <- doneCh[T]{rRes, rErr}:
// No timeout, all good
default:
// The operation has timed out
// Invoke the disposer if we have a non-zero return value
// Note that in case of timeouts we do not invoke the accumulator
opts.Disposer(rRes)
if opts.Disposer != nil && !isZero(rRes) {
opts.Disposer(rRes)
}
}
}()

select {
case v := <-done:
return v.res, v.err
case <-ctx.Done():
timedOut.Store(true)
// Because done has a capacity of 1, adding a message on the channel signals that there was a timeout
// However, the response may have arrived in the meanwhile, so we need to also check if something was added
select {
case done <- doneCh[T]{}:
// All good, nothing to do here
default:
// The response arrived at the same time as the context deadline, and the channel has a message
v := <-done

// Invoke the disposer if the return value is non-zero
// Note that in case of timeouts we do not invoke the accumulator
if opts.Disposer != nil && !isZero(v.res) {
opts.Disposer(v.res)
}
}
if def.addTimeoutActivatedMetric != nil && timeoutMetricsActivated.CompareAndSwap(false, true) {
def.addTimeoutActivatedMetric()
}
Expand Down

0 comments on commit 53ceb4a

Please sign in to comment.