Skip to content

Commit

Permalink
Merge branch 'master' into actor-health-check-tune
Browse files Browse the repository at this point in the history
  • Loading branch information
dapr-bot committed Oct 12, 2023
2 parents cb793e8 + c4284d3 commit bf27f83
Show file tree
Hide file tree
Showing 46 changed files with 2,032 additions and 206 deletions.
482 changes: 482 additions & 0 deletions docs/release_notes/v1.12.0.md

Large diffs are not rendered by default.

88 changes: 44 additions & 44 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -193,17 +192,20 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) ActorRuntime {
return a
}

func (a *actorsRuntime) isActorLocallyHosted(actorType string, actorID string) (isLocal bool, actorAddress string) {
targetActorAddress, _ := a.placement.LookupActor(actorType, actorID)
if targetActorAddress == "" {
log.Warn("Did not find address for actor with actorType %s and actorID %s", actorType, actorID)
func (a *actorsRuntime) isActorLocallyHosted(ctx context.Context, actorType string, actorID string) (isLocal bool, actorAddress string) {
lar, err := a.placement.LookupActor(ctx, internal.LookupActorRequest{
ActorType: actorType,
ActorID: actorID,
})
if err != nil {
log.Warn(err.Error())
return false, ""
}

if a.isActorLocal(targetActorAddress, a.actorsConfig.Config.HostAddress, a.actorsConfig.Config.Port) {
return true, targetActorAddress
if a.isActorLocal(lar.Address, a.actorsConfig.Config.HostAddress, a.actorsConfig.Config.Port) {
return true, lar.Address
}
return false, targetActorAddress
return false, lar.Address
}

func (a *actorsRuntime) haveCompatibleStorage() bool {
Expand Down Expand Up @@ -232,8 +234,6 @@ func (a *actorsRuntime) Init(ctx context.Context) error {
}
}

hostname := net.JoinHostPort(a.actorsConfig.Config.HostAddress, strconv.Itoa(a.actorsConfig.Config.Port))

a.actorsReminders.Init(ctx)
a.timers.Init(ctx)

Expand All @@ -248,7 +248,7 @@ func (a *actorsRuntime) Init(ctx context.Context) error {
ServerAddrs: a.actorsConfig.Config.PlacementAddresses,
Security: a.sec,
AppID: a.actorsConfig.Config.AppID,
RuntimeHostname: hostname,
RuntimeHostname: a.actorsConfig.GetRuntimeHostname(),
PodName: a.actorsConfig.Config.PodName,
ActorTypes: a.actorsConfig.Config.HostedActorTypes.ListActorTypes(),
AppHealthFn: func(ctx context.Context) <-chan bool {
Expand All @@ -257,6 +257,7 @@ func (a *actorsRuntime) Init(ctx context.Context) error {
}
return a.checker.HealthChannel()
},
Resiliency: a.resiliency,
AfterTableUpdateFn: func() {
a.drainRebalancedActors()
a.actorsReminders.OnPlacementTablesUpdated(ctx)
Expand All @@ -272,11 +273,13 @@ func (a *actorsRuntime) Init(ctx context.Context) error {
}
}

a.wg.Add(2)
a.wg.Add(1)
go func() {
defer a.wg.Done()
a.placement.Start(ctx)
}()

a.wg.Add(1)
go func() {
defer a.wg.Done()
a.deactivationTicker(a.actorsConfig, a.deactivateActor)
Expand Down Expand Up @@ -350,8 +353,8 @@ func (a *actorsRuntime) removeActorFromTable(actorType, actorID string) {
}

func (a *actorsRuntime) getActorTypeAndIDFromKey(key string) (string, string) {
arr := strings.Split(key, daprSeparator)
return arr[0], arr[1]
typ, id, _ := strings.Cut(key, daprSeparator)
return typ, id
}

type deactivateFn = func(actorType string, actorID string) error
Expand Down Expand Up @@ -392,42 +395,25 @@ func (a *actorsRuntime) deactivationTicker(configuration Config, deactivateFn de
}
}

type lookupActorRes struct {
targetActorAddress string
appID string
}

func (a *actorsRuntime) Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
err := a.placement.WaitUntilReady(ctx)
if err != nil {
return nil, fmt.Errorf("failed to wait for placement readiness: %w", err)
}

actor := req.Actor()
// Retry here to allow placement table dissemination/rebalancing to happen.
policyDef := a.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries)
policyRunner := resiliency.NewRunner[*lookupActorRes](ctx, policyDef)
lar, err := policyRunner(func(ctx context.Context) (*lookupActorRes, error) {
rAddr, rAppID := a.placement.LookupActor(actor.GetActorType(), actor.GetActorId())
if rAddr == "" {
return nil, fmt.Errorf("error finding address for actor type %s with id %s", actor.GetActorType(), actor.GetActorId())
}
return &lookupActorRes{
targetActorAddress: rAddr,
appID: rAppID,
}, nil
lar, err := a.placement.LookupActor(ctx, internal.LookupActorRequest{
ActorType: actor.GetActorType(),
ActorID: actor.GetActorId(),
})
if err != nil {
return nil, err
}
if lar == nil {
lar = &lookupActorRes{}
}
var resp *invokev1.InvokeMethodResponse
if a.isActorLocal(lar.targetActorAddress, a.actorsConfig.Config.HostAddress, a.actorsConfig.Config.Port) {
if a.isActorLocal(lar.Address, a.actorsConfig.Config.HostAddress, a.actorsConfig.Config.Port) {
resp, err = a.callLocalActor(ctx, req)
} else {
resp, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, lar.targetActorAddress, lar.appID, req)
resp, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, lar.Address, lar.AppID, req)
}

if err != nil {
Expand Down Expand Up @@ -545,6 +531,11 @@ func (a *actorsRuntime) callLocalActor(ctx context.Context, req *invokev1.Invoke
msg.HttpExtension.Verb = commonv1pb.HTTPExtension_PUT //nolint:nosnakecase
}

appCh := a.getAppChannel(act.actorType)
if appCh == nil {
return nil, fmt.Errorf("app channel for actor type %s is nil", act.actorType)
}

policyDef := a.resiliency.ActorPostLockPolicy(act.actorType, act.actorID)

// If the request can be retried, we need to enable replaying
Expand All @@ -558,7 +549,7 @@ func (a *actorsRuntime) callLocalActor(ctx context.Context, req *invokev1.Invoke
},
)
resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
return a.getAppChannel(act.actorType).InvokeMethod(ctx, req, "")
return appCh.InvokeMethod(ctx, req, "")
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -783,8 +774,11 @@ func (a *actorsRuntime) drainRebalancedActors() {
// for each actor, deactivate if no longer hosted locally
actorKey := key.(string)
actorType, actorID := a.getActorTypeAndIDFromKey(actorKey)
address, _ := a.placement.LookupActor(actorType, actorID)
if address != "" && !a.isActorLocal(address, a.actorsConfig.Config.HostAddress, a.actorsConfig.Config.Port) {
lar, _ := a.placement.LookupActor(context.TODO(), internal.LookupActorRequest{
ActorType: actorType,
ActorID: actorID,
})
if lar.Address != "" && !a.isActorLocal(lar.Address, a.actorsConfig.Config.HostAddress, a.actorsConfig.Config.Port) {
// actor has been moved to a different host, deactivate when calls are done cancel any reminders
// each item in reminders contain a struct with some metadata + the actual reminder struct
a.actorsReminders.DrainRebalancedReminders(actorType, actorID)
Expand Down Expand Up @@ -835,7 +829,7 @@ func (a *actorsRuntime) executeTimer(reminder *internal.Reminder) bool {
return false
}

err := a.doExecuteReminderOrTimer(reminder, true)
err := a.doExecuteReminderOrTimer(context.TODO(), reminder, true)
diag.DefaultMonitoring.ActorTimerFired(reminder.ActorType, err == nil)
if err != nil {
log.Errorf("error invoking timer on actor %s: %s", reminder.ActorKey(), err)
Expand All @@ -848,7 +842,7 @@ func (a *actorsRuntime) executeTimer(reminder *internal.Reminder) bool {

// executeReminder implements reminders.ExecuteReminderFn.
func (a *actorsRuntime) executeReminder(reminder *internal.Reminder) bool {
err := a.doExecuteReminderOrTimer(reminder, false)
err := a.doExecuteReminderOrTimer(context.TODO(), reminder, false)
diag.DefaultMonitoring.ActorReminderFired(reminder.ActorType, err == nil)
if err != nil {
if errors.Is(err, ErrReminderCanceled) {
Expand All @@ -863,13 +857,19 @@ func (a *actorsRuntime) executeReminder(reminder *internal.Reminder) bool {
}

// Executes a reminder or timer
func (a *actorsRuntime) doExecuteReminderOrTimer(reminder *internal.Reminder, isTimer bool) (err error) {
func (a *actorsRuntime) doExecuteReminderOrTimer(ctx context.Context, reminder *internal.Reminder, isTimer bool) (err error) {
var (
data any
logName string
invokeMethod string
)

// Sanity check: make sure the actor is actually locally-hosted
isLocal, _ := a.isActorLocallyHosted(ctx, reminder.ActorType, reminder.ActorID)
if !isLocal {
return errors.New("actor is not locally hosted")
}

if isTimer {
logName = "timer"
invokeMethod = "timer/" + reminder.Name
Expand Down Expand Up @@ -900,7 +900,7 @@ func (a *actorsRuntime) doExecuteReminderOrTimer(reminder *internal.Reminder, is
}
defer req.Close()

policyRunner := resiliency.NewRunnerWithOptions(context.TODO(), policyDef,
policyRunner := resiliency.NewRunnerWithOptions(ctx, policyDef,
resiliency.RunnerOpts[*invokev1.InvokeMethodResponse]{
Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
},
Expand Down Expand Up @@ -983,7 +983,7 @@ func (a *actorsRuntime) RegisterInternalActor(ctx context.Context, actorType str
actor.SetActorRuntime(a)
a.actorsConfig.Config.HostedActorTypes.AddActorType(actorType, actorIdleTimeout)
if a.placement != nil {
if err := a.placement.AddHostedActorType(actorType); err != nil {
if err := a.placement.AddHostedActorType(actorType, actorIdleTimeout); err != nil {
return fmt.Errorf("error updating hosted actor types: %s", err)
}
}
Expand Down
60 changes: 56 additions & 4 deletions pkg/actors/actors_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func newTestActorsRuntimeWithMock(appChannel channel.AppChannel) *actorsRuntime
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
MockPlacement: NewMockPlacement(TestAppID),
}, clock)

return a.(*actorsRuntime)
Expand Down Expand Up @@ -451,7 +452,7 @@ func TestTimerExecution(t *testing.T) {
fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock)

period, _ := internal.NewReminderPeriod("2s")
err := testActorsRuntime.doExecuteReminderOrTimer(&internal.Reminder{
err := testActorsRuntime.doExecuteReminderOrTimer(context.Background(), &internal.Reminder{
ActorType: actorType,
ActorID: actorID,
Name: "timer1",
Expand All @@ -472,7 +473,7 @@ func TestReminderExecution(t *testing.T) {
fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock)

period, _ := internal.NewReminderPeriod("2s")
err := testActorsRuntime.doExecuteReminderOrTimer(&internal.Reminder{
err := testActorsRuntime.doExecuteReminderOrTimer(context.Background(), &internal.Reminder{
ActorType: actorType,
ActorID: actorID,
RegisteredTime: time.Now().Add(2 * time.Second),
Expand Down
6 changes: 6 additions & 0 deletions pkg/actors/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ limitations under the License.
package internal

import (
"net"
"net/http"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -45,6 +47,10 @@ type Config struct {
PodName string
}

func (c Config) GetRuntimeHostname() string {
return net.JoinHostPort(c.HostAddress, strconv.Itoa(c.Port))
}

// Remap of daprAppConfig.EntityConfig but with more useful types for actors.go.
type EntityConfig struct {
Entities []string
Expand Down
22 changes: 20 additions & 2 deletions pkg/actors/internal/placement_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package internal
import (
"context"
"io"
"time"
)

// PlacementService allows for interacting with the actor placement service.
Expand All @@ -24,6 +25,23 @@ type PlacementService interface {

Start(context.Context) error
WaitUntilReady(ctx context.Context) error
LookupActor(actorType, actorID string) (host string, appID string)
AddHostedActorType(actorType string) error
LookupActor(ctx context.Context, req LookupActorRequest) (LookupActorResponse, error)
AddHostedActorType(actorType string, idleTimeout time.Duration) error
}

// LookupActorRequest is the request for LookupActor.
type LookupActorRequest struct {
ActorType string
ActorID string
}

// ActorKey returns the key for the actor, which is "type/id".
func (lar LookupActorRequest) ActorKey() string {
return lar.ActorType + "/" + lar.ActorID
}

// LookupActorResponse is the response from LookupActor.
type LookupActorResponse struct {
Address string
AppID string
}
2 changes: 1 addition & 1 deletion pkg/actors/internal/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *Reminder) UpdateFromTrack(track *ReminderTrack) {

// ScheduledTime returns the time the reminder is scheduled to be executed at.
// This is implemented to comply with the queueable interface.
func (r *Reminder) ScheduledTime() time.Time {
func (r Reminder) ScheduledTime() time.Time {
return r.RegisteredTime
}

Expand Down

0 comments on commit bf27f83

Please sign in to comment.