Skip to content

Commit

Permalink
Hot Reloading: Actor State Store Reloading
Browse files Browse the repository at this point in the history
Part of dapr#1172
Branched from dapr#7260

Adds support for actor state store hot reloading.

PR updates the actors package so that the actor state store is dynamically
retrieved from the component store every time the state it invoked. The
passed static actor state store name previously used to retrieve the
state store has been removed.

The component store has been updated to track the current actor state
store. It is no longer tracked by the state processor, and instead the
state processor updates the latest in the component store.

Like before, the actor state store _must_ be configured on Dapr startup,
else the actor subsystem runtime will not be enabled, even if an actor
state store is hot reloaded later.

Adds actor hotreloading integration tests for the actor state store.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Jan 9, 2024
1 parent 5715596 commit b172d75
Show file tree
Hide file tree
Showing 26 changed files with 1,359 additions and 129 deletions.
13 changes: 6 additions & 7 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ type actorsRuntime struct {
actorsTable *sync.Map
tracingSpec configuration.TracingSpec
resiliency resiliency.Provider
storeName string
compStore *compstore.ComponentStore
clock clock.WithTicker
internalActors map[string]InternalActor
Expand All @@ -140,7 +139,6 @@ type ActorsOpts struct {
Config Config
TracingSpec configuration.TracingSpec
Resiliency resiliency.Provider
StateStoreName string
CompStore *compstore.ComponentStore
Security security.Handler

Expand All @@ -164,7 +162,6 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) ActorRuntime {
timers: timers.NewTimersProvider(clock),
tracingSpec: opts.TracingSpec,
resiliency: opts.Resiliency,
storeName: opts.StateStoreName,
placement: opts.MockPlacement,
actorsTable: &sync.Map{},
clock: clock,
Expand Down Expand Up @@ -225,7 +222,7 @@ func (a *actorsRuntime) isActorLocallyHosted(ctx context.Context, actorType stri
}

func (a *actorsRuntime) haveCompatibleStorage() bool {
store, ok := a.compStore.GetStateStore(a.storeName)
store, _, ok := a.compStore.GetStateStoreActor()
if !ok {
// If we have hosted actors and no store, we can't initialize the actor runtime
return false
Expand Down Expand Up @@ -762,7 +759,9 @@ func (a *actorsRuntime) GetBulkState(ctx context.Context, req *GetBulkStateReque
return bulkRes, nil
}

func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *TransactionalRequest) (err error) {
func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *TransactionalRequest) error {
var err error

operations := make([]state.TransactionalStateOperation, len(req.Operations))
baseKey := constructCompositeKey(a.actorsConfig.Config.AppID, req.ActorKey())
metadata := map[string]string{metadataPartitionKey: baseKey}
Expand Down Expand Up @@ -1119,7 +1118,7 @@ func ValidateHostEnvironment(mTLSEnabled bool, mode modes.DaprMode, namespace st
}

func (a *actorsRuntime) stateStore() (string, internal.TransactionalStateStore, error) {
storeS, ok := a.compStore.GetStateStore(a.storeName)
storeS, name, ok := a.compStore.GetStateStoreActor()
if !ok {
return "", nil, errors.New(errStateStoreNotFound)
}
Expand All @@ -1129,5 +1128,5 @@ func (a *actorsRuntime) stateStore() (string, internal.TransactionalStateStore,
return "", nil, errors.New(errStateStoreNotConfigured)
}

return a.storeName, store, nil
return name, store, nil
}
50 changes: 23 additions & 27 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,13 @@ func (b *runtimeBuilder) buildActorRuntime() *actorsRuntime {
}

compStore := compstore.New()
compStore.AddStateStore(storeName, store)
compStore.AddStateStoreActor(storeName, store)
a := newActorsWithClock(ActorsOpts{
CompStore: compStore,
AppChannel: b.appChannel,
Config: *b.config,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.FromConfigurations(log, testResiliency),
StateStoreName: storeName,
CompStore: compStore,
AppChannel: b.appChannel,
Config: *b.config,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.FromConfigurations(log, testResiliency),
}, clock)

return a.(*actorsRuntime)
Expand All @@ -216,15 +215,14 @@ func newTestActorsRuntimeWithMock(appChannel channel.AppChannel) *actorsRuntime
clock := clocktesting.NewFakeClock(startOfTime)

compStore := compstore.New()
compStore.AddStateStore("actorStore", fakeStore())
compStore.AddStateStoreActor("actorStore", fakeStore())
a := newActorsWithClock(ActorsOpts{
CompStore: compStore,
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
MockPlacement: NewMockPlacement(TestAppID),
CompStore: compStore,
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
MockPlacement: NewMockPlacement(TestAppID),
}, clock)

return a.(*actorsRuntime)
Expand All @@ -240,12 +238,11 @@ func newTestActorsRuntimeWithMockWithoutPlacement(appChannel channel.AppChannel)
clock := clocktesting.NewFakeClock(startOfTime)

a := newActorsWithClock(ActorsOpts{
CompStore: compstore.New(),
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
CompStore: compstore.New(),
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
}, clock)

return a.(*actorsRuntime)
Expand All @@ -261,12 +258,11 @@ func newTestActorsRuntimeWithMockAndNoStore(appChannel channel.AppChannel) *acto
clock := clocktesting.NewFakeClock(startOfTime)

a := newActorsWithClock(ActorsOpts{
CompStore: compstore.New(),
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
CompStore: compstore.New(),
AppChannel: appChannel,
Config: conf,
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
}, clock)

return a.(*actorsRuntime)
Expand Down
8 changes: 8 additions & 0 deletions pkg/actors/internal/reminders.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"errors"
"io"

"github.com/dapr/dapr/pkg/runtime/compstore"
)

// ErrReminderCanceled is returned when the reminder has been canceled.
Expand All @@ -32,6 +34,12 @@ type LookupActorFn func(ctx context.Context, actorType string, actorID string) (
// StateStoreProviderFn is the type of a function that returns the state store provider and its name.
type StateStoreProviderFn func() (string, TransactionalStateStore, error)

// RemindersProviderOpts contains the options for the reminders provider.
type RemindersProviderOpts struct {
ComponentStore *compstore.ComponentStore
Config Config
}

// RemindersProvider is the interface for the object that provides reminders services.
type RemindersProvider interface {
io.Closer
Expand Down
15 changes: 7 additions & 8 deletions pkg/actors/internal_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,14 @@ func newTestActorsRuntimeWithInternalActors(internalActors map[string]InternalAc
})

compStore := compstore.New()
compStore.AddStateStore("actorStore", store)
compStore.AddStateStoreActor("actorStore", store)
a := NewActors(ActorsOpts{
CompStore: compStore,
Config: config,
TracingSpec: spec,
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Security: fake.New(),
MockPlacement: NewMockPlacement(TestAppID),
CompStore: compStore,
Config: config,
TracingSpec: spec,
Resiliency: resiliency.New(log),
Security: fake.New(),
MockPlacement: NewMockPlacement(TestAppID),
})

for actorType, actor := range internalActors {
Expand Down
7 changes: 0 additions & 7 deletions pkg/actors/reminders/reminders.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ type reminders struct {
metricsCollector remindersMetricsCollectorFn
}

// NewRemindersProviderOpts contains the options for the NewRemindersProvider function.
type NewRemindersProviderOpts struct {
StoreName string
Config internal.Config
APILevel *atomic.Uint32
}

// NewRemindersProvider returns a reminders provider.
func NewRemindersProvider(opts internal.ActorsProviderOptions) internal.RemindersProvider {
return &reminders{
Expand Down
4 changes: 4 additions & 0 deletions pkg/runtime/compstore/compstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type ComponentStore struct {
components []compsv1alpha1.Component
subscriptions []rtpubsub.Subscription
httpEndpoints []httpEndpointV1alpha1.HTTPEndpoint
actorStateStore struct {
name string
store state.Store
}

compPendingLock sync.Mutex
compPending *compsv1alpha1.Component
Expand Down
31 changes: 30 additions & 1 deletion pkg/runtime/compstore/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,30 @@ limitations under the License.

package compstore

import "github.com/dapr/components-contrib/state"
import (
"fmt"

"github.com/dapr/components-contrib/state"
)

func (c *ComponentStore) AddStateStore(name string, store state.Store) {
c.lock.Lock()
defer c.lock.Unlock()
c.states[name] = store
}

func (c *ComponentStore) AddStateStoreActor(name string, store state.Store) error {
c.lock.Lock()
defer c.lock.Unlock()
if c.actorStateStore.store != nil && c.actorStateStore.name != name {
return fmt.Errorf("detected duplicate actor state store: %s and %s", c.actorStateStore.name, name)
}
c.states[name] = store
c.actorStateStore.name = name
c.actorStateStore.store = store
return nil
}

func (c *ComponentStore) GetStateStore(name string) (state.Store, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
Expand All @@ -37,6 +53,10 @@ func (c *ComponentStore) ListStateStores() map[string]state.Store {
func (c *ComponentStore) DeleteStateStore(name string) {
c.lock.Lock()
defer c.lock.Unlock()
if c.actorStateStore.name == name {
c.actorStateStore.name = ""
c.actorStateStore.store = nil
}
delete(c.states, name)
}

Expand All @@ -45,3 +65,12 @@ func (c *ComponentStore) StateStoresLen() int {
defer c.lock.RUnlock()
return len(c.states)
}

func (c *ComponentStore) GetStateStoreActor() (state.Store, string, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.actorStateStore.store == nil {
return nil, "", false
}
return c.actorStateStore.store, c.actorStateStore.name, true
}
11 changes: 0 additions & 11 deletions pkg/runtime/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ type manager interface {
Close(componentsapi.Component) error
}

type StateManager interface {
ActorStateStoreName() (string, bool)
manager
}

type SecretManager interface {
ProcessResource(context.Context, meta.Resource) (bool, string)
manager
Expand Down Expand Up @@ -67,12 +62,6 @@ func (p *Processor) managerFromComp(comp componentsapi.Component) (manager, erro
return m, nil
}

func (p *Processor) State() StateManager {
p.lock.RLock()
defer p.lock.RUnlock()
return p.state
}

func (p *Processor) Secret() SecretManager {
p.lock.RLock()
defer p.lock.RUnlock()
Expand Down
18 changes: 7 additions & 11 deletions pkg/runtime/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ type Options struct {
type Processor struct {
compStore *compstore.ComponentStore
managers map[components.Category]manager
state StateManager
secret SecretManager
pubsub PubsubManager
binding BindingManager
Expand Down Expand Up @@ -143,14 +142,6 @@ func New(opts Options) *Processor {
ResourcesPath: opts.Standalone.ResourcesPath,
})

state := state.New(state.Options{
PlacementEnabled: opts.PlacementEnabled,
Registry: opts.Registry.StateStores(),
ComponentStore: opts.ComponentStore,
Meta: opts.Meta,
Outbox: ps.Outbox(),
})

secret := secret.New(secret.Options{
Registry: opts.Registry.SecretStores(),
ComponentStore: opts.ComponentStore,
Expand All @@ -175,7 +166,6 @@ func New(opts Options) *Processor {
pendingComponentDependents: make(map[string][]componentsapi.Component),
closedCh: make(chan struct{}),
compStore: opts.ComponentStore,
state: state,
pubsub: ps,
binding: binding,
secret: secret,
Expand All @@ -198,7 +188,13 @@ func New(opts Options) *Processor {
}),
components.CategoryPubSub: ps,
components.CategorySecretStore: secret,
components.CategoryStateStore: state,
components.CategoryStateStore: state.New(state.Options{
PlacementEnabled: opts.PlacementEnabled,
Registry: opts.Registry.StateStores(),
ComponentStore: opts.ComponentStore,
Meta: opts.Meta,
Outbox: ps.Outbox(),
}),
components.CategoryWorkflow: workflow.New(workflow.Options{
Registry: opts.Registry.Workflows(),
ComponentStore: opts.ComponentStore,
Expand Down

0 comments on commit b172d75

Please sign in to comment.