Skip to content

Commit

Permalink
Merge branch 'master' into metrics-cardinality
Browse files Browse the repository at this point in the history
  • Loading branch information
dapr-bot committed Sep 25, 2023
2 parents 22ba88c + e4f8901 commit 458ff08
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
3 changes: 1 addition & 2 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,8 @@ func (a *actorsRuntime) callRemoteActorWithRetry(
Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
},
)
attempts := atomic.Int32{}
return policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
attempt := attempts.Add(1)
attempt := resiliency.GetAttempt(ctx)
rResp, teardown, rErr := fn(ctx, targetAddress, targetID, req)
if rErr == nil {
teardown(false)
Expand Down
4 changes: 1 addition & 3 deletions pkg/messaging/direct_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"io"
"os"
"strings"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -196,9 +195,8 @@ func (d *directMessaging) invokeWithRetry(
Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
},
)
attempts := atomic.Int32{}
return policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
attempt := attempts.Add(1)
attempt := resiliency.GetAttempt(ctx)
rResp, teardown, rErr := fn(ctx, app.id, app.namespace, app.address, req)
if rErr == nil {
teardown(false)
Expand Down
21 changes: 19 additions & 2 deletions pkg/resiliency/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type doneCh[T any] struct {
err error
}

type attemptsCtxKey struct{}

// PolicyDefinition contains a definition for a policy, used to create a Runner.
type PolicyDefinition struct {
log logger.Logger
Expand Down Expand Up @@ -186,9 +188,12 @@ func NewRunnerWithOptions[T any](ctx context.Context, def *PolicyDefinition, opt

// Use retry/back off
b := def.r.NewBackOffWithContext(ctx)
attempts := atomic.Int32{}
return retry.NotifyRecoverWithData(
func() (T, error) {
rRes, rErr := operation(ctx)
attempt := attempts.Add(1)
opCtx := context.WithValue(ctx, attemptsCtxKey{}, attempt)
rRes, rErr := operation(opCtx)
// In case of an error, if we have a disposer we invoke it with the return value, then reset the return value
if rErr != nil && opts.Disposer != nil && !isZero(rRes) {
opts.Disposer(rRes)
Expand All @@ -205,7 +210,7 @@ func NewRunnerWithOptions[T any](ctx context.Context, def *PolicyDefinition, opt
def.log.Debugf("Error for operation %s was: %v", def.name, opErr)
},
func() {
def.log.Infof("Recovered processing operation %s.", def.name)
def.log.Infof("Recovered processing operation %s after %d attempts", def.name, attempts.Load())
},
)
}
Expand All @@ -216,6 +221,18 @@ func DisposerCloser[T io.Closer](obj T) {
_ = obj.Close()
}

// GetAttempt returns the attempt number from a context
// Attempts are numbered from 1 onwards.
// If the context doesn't have an attempt number, returns 0
func GetAttempt(ctx context.Context) int32 {
v := ctx.Value(attemptsCtxKey{})
attempt, ok := v.(int32)
if !ok {
return 0
}
return attempt
}

func isZero(val any) bool {
return reflect.ValueOf(val).IsZero()
}
18 changes: 13 additions & 5 deletions pkg/resiliency/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slices"

Expand Down Expand Up @@ -238,21 +239,28 @@ func TestPolicyRetry(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
called := atomic.Int32{}
maxCalls := test.maxCalls
fn := func(ctx context.Context) (any, error) {
fn := func(ctx context.Context) (struct{}, error) {
v := called.Add(1)
attempt := GetAttempt(ctx)
if attempt != v {
return struct{}{}, backoff.Permanent(fmt.Errorf("expected attempt in context to be %d but got %d", v, attempt))
}
if v <= maxCalls {
return nil, fmt.Errorf("called (%d) vs Max (%d)", v-1, maxCalls)
return struct{}{}, fmt.Errorf("called (%d) vs Max (%d)", v-1, maxCalls)
}
return nil, nil
return struct{}{}, nil
}

policy := NewRunner[any](context.Background(), &PolicyDefinition{
policy := NewRunner[struct{}](context.Background(), &PolicyDefinition{
log: testLog,
name: "retry",
t: 10 * time.Millisecond,
r: &retry.Config{MaxRetries: test.maxRetries},
})
policy(fn)
_, err := policy(fn)
if err != nil {
assert.NotContains(t, err.Error(), "expected attempt in context to be")
}
assert.Equal(t, test.expected, called.Load())
})
}
Expand Down

0 comments on commit 458ff08

Please sign in to comment.