Skip to content

Commit

Permalink
[release-1.12] Fix timeouts in HTTP service invocation when resilienc…
Browse files Browse the repository at this point in the history
…y policies with timeouts are applied (#7310)

* [release-1.12] Fix timeouts in HTTP service invocation when resiliency policies with timeouts are applied

Backport of #7270

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Added release notes

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Dec 16, 2023
1 parent 9ec8497 commit 9b6836f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 55 deletions.
28 changes: 24 additions & 4 deletions docs/release_notes/v1.12.3.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
# Dapr 1.12.3

This update includes a fix for the dissemination of placement tables.
- [Fix dissemination of placement tables](https://github.com/dapr/dapr/issues/7193)
This update includes bug fixes:

- [Fix timeouts in HTTP service invocation when resiliency policies with timeouts are applied](#fix-timeouts-in-http-service-invocation-when-resiliency-policies-with-timeouts-are-applied)
- [Fix dissemination of placement tables](#fix-dissemination-of-placement-tables)

## Fix timeouts in HTTP service invocation when resiliency policies with timeouts are applied

### Problem

In HTTP service invocation, in certain cases when a resiliency policy is applied (for example, one that includes timeouts), requests could be interrupted earlier with a "context deadline exceeded" error.

### Impact

Impacts users running Dapr 1.12.0-1.12.2 that use HTTP service invocation and who have resiliency policies applied

### Root cause

When resiliency policies with timeouts are applied, due to a bug the incorrect context was used while sending the response to the client, and in certain situations it could have been terminated earlier than the request.

### Solution

We fixed the code that handles HTTP service invocation to make sure the timeout is applied to the entire response.

## Fix dissemination of placement tables

Expand All @@ -14,13 +34,14 @@ Error invoke actor method: failed to invoke target x after 3 retries
```

Placement server logs:

```
level=error msg="Stream is disconnected before member is added
```

### Impact

The Dapr Placement service nodes had inconsistent tables and upon a stream disconnect, all connected nodes were left in a broken state.
Impacts users running Dapr 1.12.0-1.12.2

### Root cause

Expand All @@ -29,4 +50,3 @@ Dissemination did not use a background context. The logic to acquire a lock, dis
### Solution

Updated the Dapr Placement service to use a background context and follow a 3 step process to acquire a lock, disseminate the tables, and release the lock.

103 changes: 59 additions & 44 deletions pkg/http/api_directmessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
"net/url"
"path"
"strings"
"sync/atomic"

"github.com/cenkalti/backoff/v4"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/resiliency/breaker"
"github.com/dapr/dapr/utils/responsewriter"
)

Expand Down Expand Up @@ -104,6 +105,7 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
},
)
success := atomic.Bool{}
// Since we don't want to return the actual error, we have to extract several things in order to construct our response.
resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
rResp, rErr := a.directMessaging.Invoke(ctx, targetID, req)
Expand All @@ -125,7 +127,7 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
// Construct response if not HTTP
resStatus := rResp.Status()
if !rResp.IsHTTPResponse() {
statusCode := int32(invokev1.HTTPStatusFromCode(codes.Code(resStatus.Code)))
statusCode := int32(invokev1.HTTPStatusFromCode(codes.Code(resStatus.GetCode())))
if statusCode != http.StatusOK {
// Close the response to replace the body
_ = rResp.Close()
Expand All @@ -142,73 +144,86 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
} else {
resStatus.Code = statusCode
}
} else if resStatus.Code < 200 || resStatus.Code > 399 {
} else if resStatus.GetCode() < 200 || resStatus.GetCode() > 399 {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
statusCode: int(resStatus.GetCode()),
msg: msg,
contentType: rResp.ContentType(),
}
}
return rResp, nil
})

// Special case for timeouts/circuit breakers since they won't go through the rest of the logic.
if errors.Is(err, context.DeadlineExceeded) || breaker.IsErrorPermanent(err) {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
// If we get to this point, we must consider the operation as successful, so we invoke this only once and we consider all errors returned by this to be permanent (so the policy function doesn't retry)
// We still need to be within the policy function because if we return, the context passed to `Invoke` is canceled, so the `Copy` operation below can fail with a ContextCanceled error
if !success.CompareAndSwap(false, true) {
// This error will never be returned to a client but it's here to prevent retries
return rResp, backoff.Permanent(errors.New("already completed"))
}

var codeErr codeError
if errors.As(err, &codeErr) {
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
if rResp == nil {
return nil, backoff.Permanent(errors.New("response object is nil"))
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
if resp != nil {
_ = resp.Close()

headers := rResp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}

defer rResp.Close()

if ct := rResp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(int(rResp.Status().GetCode()))

_, rErr = io.Copy(w, rResp.RawData())
if rErr != nil {
// Do not return rResp here, we already have a deferred `Close` call on it
return nil, backoff.Permanent(rErr)
}

// Do not return rResp here, we already have a deferred `Close` call on it
return nil, nil
})

// If there's no error, then everything is done already
if err == nil {
return
}

if resp != nil {
defer resp.Close()

// Set headers if present (if resp is not nil, they haven't been sent already)
headers := resp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}
}

var invokeErr invokeError
if errors.As(err, &invokeErr) {
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
if resp != nil {
_ = resp.Close()
// Handle errors; successful operations are already complete
var (
codeErr codeError
invokeErr invokeError
)
switch {
case errors.As(err, &codeErr):
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
return
}

if resp == nil {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, "response object is nil"))
case errors.As(err, &invokeErr):
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
return
}
defer resp.Close()

statusCode := int(resp.Status().Code)

if ct := resp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(statusCode)

_, err = io.Copy(w, resp.RawData())
if err != nil {
default:
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
Expand Down
33 changes: 26 additions & 7 deletions pkg/resiliency/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,43 @@ func NewRunnerWithOptions[T any](ctx context.Context, def *PolicyDefinition, opt
ctx, cancel := context.WithTimeout(ctx, def.t)
defer cancel()

done := make(chan doneCh[T])
timedOut := atomic.Bool{}
done := make(chan doneCh[T], 1)
go func() {
rRes, rErr := operCopy(ctx)
if !timedOut.Load() {
done <- doneCh[T]{rRes, rErr}
} else if opts.Disposer != nil && !isZero(rRes) {

// If the channel is full, it means we had a timeout
select {
case done <- doneCh[T]{rRes, rErr}:
// No timeout, all good
default:
// The operation has timed out
// Invoke the disposer if we have a non-zero return value
// Note that in case of timeouts we do not invoke the accumulator
opts.Disposer(rRes)
if opts.Disposer != nil && !isZero(rRes) {
opts.Disposer(rRes)
}
}
}()

select {
case v := <-done:
return v.res, v.err
case <-ctx.Done():
timedOut.Store(true)
// Because done has a capacity of 1, adding a message on the channel signals that there was a timeout
// However, the response may have arrived in the meanwhile, so we need to also check if something was added
select {
case done <- doneCh[T]{}:
// All good, nothing to do here
default:
// The response arrived at the same time as the context deadline, and the channel has a message
v := <-done

// Invoke the disposer if the return value is non-zero
// Note that in case of timeouts we do not invoke the accumulator
if opts.Disposer != nil && !isZero(v.res) {
opts.Disposer(v.res)
}
}
if def.addTimeoutActivatedMetric != nil && timeoutMetricsActivated.CompareAndSwap(false, true) {
def.addTimeoutActivatedMetric()
}
Expand Down

0 comments on commit 9b6836f

Please sign in to comment.