Skip to content

Commit

Permalink
Actor State: Ignore Components which hot reload the actor state (#7441)
Browse files Browse the repository at this point in the history
* Exit error if actor state store hot reloaded

Updates the hot reloading reconciler so that Daprd will exit error when
a actor state store enabled Component is hot reloaded. This is chosen
because today, the actors subsystem is not written with any closing or
dynamic support. Doing so will cause panics/corruption in its current
state.

Exiting error is the safest option as this ensures consistency across a
replica set and ensures there is no surprise for the user that behaviour
does not match given configuration.

See also #7433

Signed-off-by: joshvanl <me@joshvanl.dev>

* Remove expected workflow.dapr component from actorastate tests

Signed-off-by: joshvanl <me@joshvanl.dev>

* Increase assert Eventually timeout

Signed-off-by: joshvanl <me@joshvanl.dev>

* Only log error and don't exit error when reocnciling actor state store

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
JoshVanL and artursouza committed Jan 30, 2024
1 parent 2c09401 commit 4d44561
Show file tree
Hide file tree
Showing 13 changed files with 543 additions and 81 deletions.
4 changes: 4 additions & 0 deletions pkg/runtime/compstore/compstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type ComponentStore struct {
components []compsv1alpha1.Component
subscriptions []rtpubsub.Subscription
httpEndpoints []httpEndpointV1alpha1.HTTPEndpoint
actorStateStore struct {
name string
store state.Store
}

compPendingLock sync.Mutex
compPending *compsv1alpha1.Component
Expand Down
31 changes: 30 additions & 1 deletion pkg/runtime/compstore/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,30 @@ limitations under the License.

package compstore

import "github.com/dapr/components-contrib/state"
import (
"fmt"

"github.com/dapr/components-contrib/state"
)

func (c *ComponentStore) AddStateStore(name string, store state.Store) {
c.lock.Lock()
defer c.lock.Unlock()
c.states[name] = store
}

func (c *ComponentStore) AddStateStoreActor(name string, store state.Store) error {
c.lock.Lock()
defer c.lock.Unlock()
if c.actorStateStore.store != nil && c.actorStateStore.name != name {
return fmt.Errorf("detected duplicate actor state store: %s and %s", c.actorStateStore.name, name)
}
c.states[name] = store
c.actorStateStore.name = name
c.actorStateStore.store = store
return nil
}

func (c *ComponentStore) GetStateStore(name string) (state.Store, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
Expand All @@ -37,6 +53,10 @@ func (c *ComponentStore) ListStateStores() map[string]state.Store {
func (c *ComponentStore) DeleteStateStore(name string) {
c.lock.Lock()
defer c.lock.Unlock()
if c.actorStateStore.name == name {
c.actorStateStore.name = ""
c.actorStateStore.store = nil
}
delete(c.states, name)
}

Expand All @@ -45,3 +65,12 @@ func (c *ComponentStore) StateStoresLen() int {
defer c.lock.RUnlock()
return len(c.states)
}

func (c *ComponentStore) GetStateStoreActor() (state.Store, string, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.actorStateStore.store == nil {
return nil, "", false
}
return c.actorStateStore.store, c.actorStateStore.name, true
}
42 changes: 40 additions & 2 deletions pkg/runtime/hotreload/reconciler/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ package reconciler

import (
"context"
"strings"

componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
"github.com/dapr/dapr/pkg/runtime/authorizer"
"github.com/dapr/dapr/pkg/runtime/compstore"
"github.com/dapr/dapr/pkg/runtime/hotreload/differ"
"github.com/dapr/dapr/pkg/runtime/hotreload/loader"
"github.com/dapr/dapr/pkg/runtime/processor"
"github.com/dapr/dapr/pkg/runtime/processor/state"
)

type component struct {
Expand All @@ -36,6 +38,10 @@ type component struct {
//
//nolint:unused
func (c *component) update(ctx context.Context, comp componentsapi.Component) {
if !c.verify(comp) {
return
}

oldComp, exists := c.store.GetComponent(comp.Name)
_, _ = c.proc.Secret().ProcessResource(ctx, comp)

Expand All @@ -48,7 +54,7 @@ func (c *component) update(ctx context.Context, comp componentsapi.Component) {
log.Infof("Closing existing Component to reload: %s", oldComp.LogName())
// TODO: change close to accept pointer
if err := c.proc.Close(oldComp); err != nil {
log.Errorf("error closing old component: %w", err)
log.Errorf("error closing old component: %s", err)
return
}
}
Expand All @@ -67,7 +73,39 @@ func (c *component) update(ctx context.Context, comp componentsapi.Component) {

//nolint:unused
func (c *component) delete(comp componentsapi.Component) {
if !c.verify(comp) {
return
}

if err := c.proc.Close(comp); err != nil {
log.Errorf("error closing deleted component: %w", err)
log.Errorf("error closing deleted component: %s", err)
}
}

//nolint:unused
func (c *component) verify(vcomp componentsapi.Component) bool {
toverify := []componentsapi.Component{vcomp}
if comp, ok := c.store.GetComponent(vcomp.Name); ok {
toverify = append(toverify, comp)
}

// Reject component if it has the same name as the actor state store.
if _, name, ok := c.store.GetStateStoreActor(); ok && name == vcomp.Name {
log.Errorf("Aborting to hot-reload a state store component that is used as an actor state store: %s", vcomp.LogName())
return false
}

// Reject component if it is being marked as the actor state store.
for _, comp := range toverify {
if strings.HasPrefix(comp.Spec.Type, "state.") {
for _, meta := range comp.Spec.Metadata {
if strings.EqualFold(meta.Name, state.PropertyKeyActorStateStore) {
log.Errorf("Aborting to hot-reload a state store component that is used as an actor state store: %s", comp.LogName())
return false
}
}
}
}

return true
}
21 changes: 8 additions & 13 deletions pkg/runtime/hotreload/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ func (r *Reconciler[T]) Run(ctx context.Context) error {
return fmt.Errorf("error starting component stream: %w", err)
}

r.watchForEvents(ctx, stream)

return nil
return r.watchForEvents(ctx, stream)
}

func (r *Reconciler[T]) Close() error {
Expand All @@ -94,7 +92,7 @@ func (r *Reconciler[T]) Close() error {
return nil
}

func (r *Reconciler[T]) watchForEvents(ctx context.Context, stream <-chan *loader.Event[T]) {
func (r *Reconciler[T]) watchForEvents(ctx context.Context, stream <-chan *loader.Event[T]) error {
log.Infof("Starting to watch %s updates", r.kind)

ticker := r.clock.NewTicker(time.Second * 60)
Expand All @@ -106,9 +104,9 @@ func (r *Reconciler[T]) watchForEvents(ctx context.Context, stream <-chan *loade
for {
select {
case <-ctx.Done():
return
return nil
case <-r.closeCh:
return
return nil
case <-ticker.C():
log.Debugf("Running scheduled %s reconcile", r.kind)
resources, err := r.manager.List(ctx)
Expand All @@ -117,18 +115,16 @@ func (r *Reconciler[T]) watchForEvents(ctx context.Context, stream <-chan *loade
continue
}

if err := r.reconcile(ctx, differ.Diff(resources)); err != nil {
log.Errorf("Error reconciling %s: %s", r.kind, err)
}
r.reconcile(ctx, differ.Diff(resources))
case event := <-stream:
r.handleEvent(ctx, event)
}
}
}

func (r *Reconciler[T]) reconcile(ctx context.Context, result *differ.Result[T]) error {
func (r *Reconciler[T]) reconcile(ctx context.Context, result *differ.Result[T]) {
if result == nil {
return nil
return
}

var wg sync.WaitGroup
Expand All @@ -150,10 +146,9 @@ func (r *Reconciler[T]) reconcile(ctx context.Context, result *differ.Result[T])
})
}(resource, group.eventType)
}

wg.Wait()
}

return nil
}

func (r *Reconciler[T]) handleEvent(ctx context.Context, event *loader.Event[T]) {
Expand Down
14 changes: 6 additions & 8 deletions pkg/runtime/hotreload/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ func Test_reconcile(t *testing.T) {
}

t.Run("events should be sent in the correct grouped order", func(t *testing.T) {
errCh := make(chan error)
recDone := make(chan struct{})
go func() {
errCh <- r.reconcile(context.Background(), &differ.Result[componentsapi.Component]{
defer close(recDone)
r.reconcile(context.Background(), &differ.Result[componentsapi.Component]{
Deleted: deleted,
Updated: updated,
Created: created,
Expand Down Expand Up @@ -235,8 +236,7 @@ func Test_reconcile(t *testing.T) {
assert.ElementsMatch(t, created, got)

select {
case err := <-errCh:
require.NoError(t, err)
case <-recDone:
case <-time.After(time.Second * 3):
t.Error("did not get reconcile return in time")
}
Expand Down Expand Up @@ -293,10 +293,8 @@ type fakeManager struct {

func newFakeManager() *fakeManager {
return &fakeManager{
updateFn: func(context.Context, componentsapi.Component) {
},
deleteFn: func(componentsapi.Component) {
},
updateFn: func(context.Context, componentsapi.Component) {},
deleteFn: func(componentsapi.Component) {},
}
}

Expand Down
107 changes: 55 additions & 52 deletions pkg/runtime/processor/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

const (
propertyKeyActorStateStore = "actorstatestore"
PropertyKeyActorStateStore = "actorstatestore"
)

var log = logger.NewLogger("dapr.runtime.processor.state")
Expand Down Expand Up @@ -79,71 +79,74 @@ func (s *state) Init(ctx context.Context, comp compapi.Component) error {
return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err)
}

if store != nil {
secretStoreName := s.meta.AuthSecretStoreOrDefault(&comp)
if store == nil {
return nil
}

secretStore, _ := s.compStore.GetSecretStore(secretStoreName)
encKeys, encErr := encryption.ComponentEncryptionKey(comp, secretStore)
if encErr != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "creation", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err)
}
secretStoreName := s.meta.AuthSecretStoreOrDefault(&comp)

if encKeys.Primary.Key != "" {
ok := encryption.AddEncryptedStateStore(comp.ObjectMeta.Name, encKeys)
if ok {
log.Infof("automatic encryption enabled for state store %s", comp.ObjectMeta.Name)
}
}
secretStore, _ := s.compStore.GetSecretStore(secretStoreName)
encKeys, encErr := encryption.ComponentEncryptionKey(comp, secretStore)
if encErr != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "creation", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err)
}

meta, err := s.meta.ToBaseMetadata(comp)
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.InitComponentFailure, fName, err)
if encKeys.Primary.Key != "" {
ok := encryption.AddEncryptedStateStore(comp.ObjectMeta.Name, encKeys)
if ok {
log.Infof("automatic encryption enabled for state store %s", comp.ObjectMeta.Name)
}
}

props := meta.Properties
err = store.Init(ctx, contribstate.Metadata{Base: meta})
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.InitComponentFailure, fName, err)
}
meta, err := s.meta.ToBaseMetadata(comp)
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.InitComponentFailure, fName, err)
}

s.compStore.AddStateStore(comp.ObjectMeta.Name, store)
err = compstate.SaveStateConfiguration(comp.ObjectMeta.Name, props)
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
wrapError := fmt.Errorf("failed to save lock keyprefix: %s", err.Error())
return rterrors.NewInit(rterrors.InitComponentFailure, fName, wrapError)
}
props := meta.Properties
err = store.Init(ctx, contribstate.Metadata{Base: meta})
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.InitComponentFailure, fName, err)
}

s.outbox.AddOrUpdateOutbox(comp)

// when placement address list is not empty, set specified actor store.
if s.actorsEnabled {
// set specified actor store if "actorStateStore" is true in the spec.
actorStoreSpecified := false
for k, v := range props {
//nolint:gocritic
if strings.ToLower(k) == propertyKeyActorStateStore {
actorStoreSpecified = utils.IsTruthy(v)
break
}
// when placement address list is not empty, set specified actor store.
if s.actorsEnabled {
// set specified actor store if "actorStateStore" is true in the spec.
actorStoreSpecified := false
for k, v := range props {
//nolint:gocritic
if strings.ToLower(k) == PropertyKeyActorStateStore {
actorStoreSpecified = utils.IsTruthy(v)
break
}
}

if actorStoreSpecified {
if s.actorStateStoreName == nil {
log.Info("Using '" + comp.ObjectMeta.Name + "' as actor state store")
s.actorStateStoreName = &comp.ObjectMeta.Name
} else if *s.actorStateStoreName != comp.ObjectMeta.Name {
return fmt.Errorf("detected duplicate actor state store: %s and %s", *s.actorStateStoreName, comp.ObjectMeta.Name)
}
if actorStoreSpecified {
if s.actorStateStoreName == nil {
log.Info("Using '" + comp.ObjectMeta.Name + "' as actor state store")
s.actorStateStoreName = &comp.ObjectMeta.Name
} else if *s.actorStateStoreName != comp.ObjectMeta.Name {
return fmt.Errorf("detected duplicate actor state store: %s and %s", *s.actorStateStoreName, comp.ObjectMeta.Name)
}
s.compStore.AddStateStoreActor(comp.ObjectMeta.Name, store)
}
}

diag.DefaultMonitoring.ComponentInitialized(comp.Spec.Type)
s.compStore.AddStateStore(comp.ObjectMeta.Name, store)
err = compstate.SaveStateConfiguration(comp.ObjectMeta.Name, props)
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
wrapError := fmt.Errorf("failed to save lock keyprefix: %s", err.Error())
return rterrors.NewInit(rterrors.InitComponentFailure, fName, wrapError)
}

s.outbox.AddOrUpdateOutbox(comp)

diag.DefaultMonitoring.ComponentInitialized(comp.Spec.Type)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/framework/process/logline/logline.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,5 @@ func (l *LogLine) Stderr() io.WriteCloser {
}

func (l *LogLine) EventuallyFoundAll(t *testing.T) {
assert.Eventually(t, l.FoundAll, time.Second*5, time.Millisecond*100)
assert.Eventually(t, l.FoundAll, time.Second*20, time.Millisecond*100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (a *allenabled) Run(t *testing.T, ctx context.Context) {
assert.Len(c, meta.GetActorRuntime().GetActiveActors(), 1)
assert.Equal(c, rtv1.ActorRuntime_RUNNING, meta.GetActorRuntime().GetRuntimeStatus())
assert.Equal(c, "placement: connected", meta.GetActorRuntime().GetPlacement())
}, time.Second*15, time.Millisecond*100)
}, time.Second*20, time.Millisecond*100)

select {
case <-a.healthzCalled:
Expand Down

0 comments on commit 4d44561

Please sign in to comment.