Skip to content

Commit

Permalink
Better way to fix the error
Browse files Browse the repository at this point in the history
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Dec 5, 2023
1 parent 16bd291 commit 8822361
Showing 1 changed file with 56 additions and 41 deletions.
97 changes: 56 additions & 41 deletions pkg/http/api_directmessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"net/url"
"path"
"strings"
"sync/atomic"

"github.com/cenkalti/backoff/v4"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -32,7 +34,6 @@ import (
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/resiliency/breaker"
)

// directMessagingSpanData is the data passed by the onDirectMessage endpoint to the tracing middleware
Expand Down Expand Up @@ -150,6 +151,7 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
},
)
success := atomic.Bool{}
// Since we don't want to return the actual error, we have to extract several things in order to construct our response.
resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
rResp, rErr := a.directMessaging.Invoke(ctx, targetID, req)
Expand Down Expand Up @@ -198,63 +200,76 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
contentType: rResp.ContentType(),
}
}
return rResp, nil
})

// Special case for timeouts/circuit breakers since they won't go through the rest of the logic.
if errors.Is(err, context.DeadlineExceeded) || breaker.IsErrorPermanent(err) {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
// If we get to this point, we must consider the operation as successful, so we invoke this only once and we consider all errors returned by this to be permanent (so the policy function doesn't retry)
// We still need to be within the policy function because if we return, the context passed to `Invoke` is canceled, so the `Copy` operation below can fail with a ContextCanceled error
if !success.CompareAndSwap(false, true) {
// This error will never be returned to a client but it's here to prevent retries
return rResp, backoff.Permanent(errors.New("already completed"))
}

var codeErr codeError
if errors.As(err, &codeErr) {
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
if rResp == nil {
return nil, backoff.Permanent(errors.New("response object is nil"))
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
if resp != nil {
_ = resp.Close()

headers := rResp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}

defer rResp.Close()

if ct := rResp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(int(rResp.Status().GetCode()))

_, rErr = io.Copy(w, rResp.RawData())
if rErr != nil {
// Do not return rResp here, we already have a deferred `Close` call on it
return nil, backoff.Permanent(rErr)
}

// Do not return rResp here, we already have a deferred `Close` call on it
return nil, nil
})

// If there's no error, then everything is done already
if err == nil {
return
}

if resp != nil {
defer resp.Close()

// Set headers if present (if resp is not nil, they haven't been sent already)
headers := resp.Headers()
if len(headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), headers, w.Header().Add)
}
}

var invokeErr invokeError
if errors.As(err, &invokeErr) {
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
if resp != nil {
_ = resp.Close()
// Handle errors; successful operations are already complete
var (
codeErr codeError
invokeErr invokeError
)
switch {
case errors.As(err, &codeErr):
if len(codeErr.headers) > 0 {
invokev1.InternalMetadataToHTTPHeader(r.Context(), codeErr.headers, w.Header().Add)
}
respondWithHTTPRawResponse(w, &UniversalHTTPRawResponse{
Body: codeErr.msg,
ContentType: codeErr.contentType,
StatusCode: codeErr.statusCode,
}, codeErr.statusCode)
return
}

if resp == nil {
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, "response object is nil"))
case errors.As(err, &invokeErr):
respondWithData(w, invokeErr.statusCode, invokeErr.msg)
return
}
defer resp.Close()

statusCode := int(resp.Status().GetCode())

if ct := resp.ContentType(); ct != "" {
w.Header().Set("content-type", ct)
}

w.WriteHeader(statusCode)

_, err = io.Copy(w, resp.RawData())
if err != nil {
default:
respondWithError(w, messages.ErrDirectInvoke.WithFormat(targetID, err))
return
}
Expand Down

0 comments on commit 8822361

Please sign in to comment.