Skip to content

Commit

Permalink
Hot Reloading: Actor State Store Reloading
Browse files Browse the repository at this point in the history
Part of dapr#1172
Branched from dapr#7260

Adds support for actor state store hot reloading.

PR updates the actors package so that the actor state store is dynamically
retrieved from the component store every time the state it invoked. The
passed static actor state store name previously used to retrieve the
state store has been removed.

The component store has been updated to track the current actor state
store. It is no longer tracked by the state processor, and instead the
state processor updates the latest in the component store.

Like before, the actor state store _must_ be configured on Dapr startup,
else the actor subsystem runtime will not be enabled, even if an actor
state store is hot reloaded later.

Adds actor hotreloading integration tests for the actor state store.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Dec 11, 2023
1 parent 4aee6ed commit fa51653
Show file tree
Hide file tree
Showing 27 changed files with 1,411 additions and 184 deletions.
35 changes: 16 additions & 19 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ type actorsRuntime struct {
actorsTable *sync.Map
tracingSpec configuration.TracingSpec
resiliency resiliency.Provider
storeName string
compStore *compstore.ComponentStore
clock clock.WithTicker
internalActors map[string]InternalActor
Expand All @@ -139,7 +138,6 @@ type ActorsOpts struct {
Config Config
TracingSpec configuration.TracingSpec
Resiliency resiliency.Provider
StateStoreName string
CompStore *compstore.ComponentStore
Security security.Handler

Expand All @@ -163,7 +161,6 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) ActorRuntime {
timers: timers.NewTimersProvider(clock),
tracingSpec: opts.TracingSpec,
resiliency: opts.Resiliency,
storeName: opts.StateStoreName,
placement: opts.MockPlacement,
actorsTable: &sync.Map{},
clock: clock,
Expand All @@ -179,8 +176,8 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) ActorRuntime {

// Init reminders
a.actorsReminders = reminders.NewRemindersProvider(a.clock, internal.RemindersProviderOpts{
StoreName: a.storeName,
Config: a.actorsConfig.Config,
ComponentStore: a.compStore,
Config: a.actorsConfig.Config,
})
a.actorsReminders.SetExecuteReminderFn(a.executeReminder)
a.actorsReminders.SetResiliencyProvider(a.resiliency)
Expand Down Expand Up @@ -210,7 +207,7 @@ func (a *actorsRuntime) isActorLocallyHosted(ctx context.Context, actorType stri
}

func (a *actorsRuntime) haveCompatibleStorage() bool {
store, ok := a.compStore.GetStateStore(a.storeName)
store, _, ok := a.compStore.GetStateStoreActor()
if !ok {
// If we have hosted actors and no store, we can't initialize the actor runtime
return false
Expand Down Expand Up @@ -676,7 +673,7 @@ func (a *actorsRuntime) isActorLocal(targetActorAddress, hostAddress string, grp
}

func (a *actorsRuntime) GetState(ctx context.Context, req *GetStateRequest) (*StateResponse, error) {
store, err := a.stateStore()
store, storeName, err := a.stateStore()
if err != nil {
return nil, err
}
Expand All @@ -688,7 +685,7 @@ func (a *actorsRuntime) GetState(ctx context.Context, req *GetStateRequest) (*St
key := a.constructActorStateKey(actorKey, req.Key)

policyRunner := resiliency.NewRunner[*state.GetResponse](ctx,
a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
a.resiliency.ComponentOutboundPolicy(storeName, resiliency.Statestore),
)
storeReq := &state.GetRequest{
Key: key,
Expand All @@ -712,7 +709,7 @@ func (a *actorsRuntime) GetState(ctx context.Context, req *GetStateRequest) (*St
}

func (a *actorsRuntime) GetBulkState(ctx context.Context, req *GetBulkStateRequest) (BulkStateResponse, error) {
store, err := a.stateStore()
store, storeName, err := a.stateStore()
if err != nil {
return nil, err
}
Expand All @@ -730,7 +727,7 @@ func (a *actorsRuntime) GetBulkState(ctx context.Context, req *GetBulkStateReque
}

policyRunner := resiliency.NewRunner[[]state.BulkGetResponse](ctx,
a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
a.resiliency.ComponentOutboundPolicy(storeName, resiliency.Statestore),
)
res, err := policyRunner(func(ctx context.Context) ([]state.BulkGetResponse, error) {
return store.BulkGet(ctx, bulkReqs, state.BulkGetOpts{})
Expand All @@ -756,7 +753,7 @@ func (a *actorsRuntime) GetBulkState(ctx context.Context, req *GetBulkStateReque
}

func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *TransactionalRequest) error {
store, err := a.stateStore()
store, storeName, err := a.stateStore()
if err != nil {
return err
}
Expand All @@ -776,10 +773,10 @@ func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *Tr
}
}

return a.executeStateStoreTransaction(ctx, store, operations, metadata)
return a.executeStateStoreTransaction(ctx, store, storeName, operations, metadata)
}

func (a *actorsRuntime) executeStateStoreTransaction(ctx context.Context, store internal.TransactionalStateStore, operations []state.TransactionalStateOperation, metadata map[string]string) error {
func (a *actorsRuntime) executeStateStoreTransaction(ctx context.Context, store internal.TransactionalStateStore, storeName string, operations []state.TransactionalStateOperation, metadata map[string]string) error {
if maxMulti, ok := store.(state.TransactionalStoreMultiMaxSize); ok {
max := maxMulti.MultiMaxSize()
if max > 0 && len(operations) > max {
Expand All @@ -791,7 +788,7 @@ func (a *actorsRuntime) executeStateStoreTransaction(ctx context.Context, store
Metadata: metadata,
}
policyRunner := resiliency.NewRunner[struct{}](ctx,
a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
a.resiliency.ComponentOutboundPolicy(storeName, resiliency.Statestore),
)
_, err := policyRunner(func(ctx context.Context) (struct{}, error) {
return struct{}{}, store.Multi(ctx, stateReq)
Expand Down Expand Up @@ -1111,16 +1108,16 @@ func ValidateHostEnvironment(mTLSEnabled bool, mode modes.DaprMode, namespace st
return nil
}

func (a *actorsRuntime) stateStore() (internal.TransactionalStateStore, error) {
storeS, ok := a.compStore.GetStateStore(a.storeName)
func (a *actorsRuntime) stateStore() (internal.TransactionalStateStore, string, error) {
storeS, name, ok := a.compStore.GetStateStoreActor()
if !ok {
return nil, errors.New(errStateStoreNotFound)
return nil, "", errors.New(errStateStoreNotFound)
}

store, ok := storeS.(internal.TransactionalStateStore)
if !ok || !state.FeatureETag.IsPresent(store.Features()) || !state.FeatureTransactional.IsPresent(store.Features()) {
return nil, errors.New(errStateStoreNotConfigured)
return nil, "", errors.New(errStateStoreNotConfigured)
}

return store, nil
return store, name, nil
}
52 changes: 24 additions & 28 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,13 @@ func (b *runtimeBuilder) buildActorRuntime() *actorsRuntime {
}

compStore := compstore.New()
compStore.AddStateStore(storeName, store)
compStore.AddStateStoreActor(storeName, store)
a := newActorsWithClock(ActorsOpts{
CompStore: compStore,
AppChannel: b.appChannel,
Config: *b.config,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.FromConfigurations(log, testResiliency),
StateStoreName: storeName,
CompStore: compStore,
AppChannel: b.appChannel,
Config: *b.config,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.FromConfigurations(log, testResiliency),
}, clock)

return a.(*actorsRuntime)
Expand All @@ -216,15 +215,14 @@ func newTestActorsRuntimeWithMock(appChannel channel.AppChannel) *actorsRuntime
clock := clocktesting.NewFakeClock(startOfTime)

compStore := compstore.New()
compStore.AddStateStore("actorStore", fakeStore())
compStore.AddStateStoreActor("actorStore", fakeStore())
a := newActorsWithClock(ActorsOpts{
CompStore: compStore,
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
MockPlacement: NewMockPlacement(TestAppID),
CompStore: compStore,
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
MockPlacement: NewMockPlacement(TestAppID),
}, clock)

return a.(*actorsRuntime)
Expand All @@ -240,12 +238,11 @@ func newTestActorsRuntimeWithMockWithoutPlacement(appChannel channel.AppChannel)
clock := clocktesting.NewFakeClock(startOfTime)

a := newActorsWithClock(ActorsOpts{
CompStore: compstore.New(),
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
CompStore: compstore.New(),
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
}, clock)

return a.(*actorsRuntime)
Expand All @@ -261,12 +258,11 @@ func newTestActorsRuntimeWithMockAndNoStore(appChannel channel.AppChannel) *acto
clock := clocktesting.NewFakeClock(startOfTime)

a := newActorsWithClock(ActorsOpts{
CompStore: compstore.New(),
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
CompStore: compstore.New(),
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
}, clock)

return a.(*actorsRuntime)
Expand Down Expand Up @@ -895,7 +891,7 @@ func TestTransactionalState(t *testing.T) {
testActorsRuntime := newTestActorsRuntime()
defer testActorsRuntime.Close()

store, err := testActorsRuntime.stateStore()
store, _, err := testActorsRuntime.stateStore()
require.NoError(t, err)
fakeStore, ok := store.(*daprt.FakeStateStore)
require.True(t, ok)
Expand Down
9 changes: 5 additions & 4 deletions pkg/actors/internal/reminders.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"

"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/runtime/compstore"
)

// ErrReminderCanceled is returned when the reminder has been canceled.
Expand All @@ -31,13 +32,13 @@ type ExecuteReminderFn func(reminder *Reminder) bool
// LookupActorFn is the type of a function that returns whether an actor is locally-hosted and the address of its host.
type LookupActorFn func(ctx context.Context, actorType string, actorID string) (isLocal bool, actorAddress string)

// StateStoreProviderFn is the type of a function that returns the state store provider.
type StateStoreProviderFn func() (TransactionalStateStore, error)
// StateStoreProviderFn is the type of a function that returns the state store provider and store name.
type StateStoreProviderFn func() (TransactionalStateStore, string, error)

// RemindersProviderOpts contains the options for the reminders provider.
type RemindersProviderOpts struct {
StoreName string
Config Config
ComponentStore *compstore.ComponentStore
Config Config
}

// RemindersProvider is the interface for the object that provides reminders services.
Expand Down
15 changes: 7 additions & 8 deletions pkg/actors/internal_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,14 @@ func newTestActorsRuntimeWithInternalActors(internalActors map[string]InternalAc
})

compStore := compstore.New()
compStore.AddStateStore("actorStore", store)
compStore.AddStateStoreActor("actorStore", store)
a := NewActors(ActorsOpts{
CompStore: compStore,
Config: config,
TracingSpec: spec,
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Security: fake.New(),
MockPlacement: NewMockPlacement(TestAppID),
CompStore: compStore,
Config: config,
TracingSpec: spec,
Resiliency: resiliency.New(log),
Security: fake.New(),
MockPlacement: NewMockPlacement(TestAppID),
})

for actorType, actor := range internalActors {
Expand Down

0 comments on commit fa51653

Please sign in to comment.