Skip to content

Commit

Permalink
Exit error if actor state store hot reloaded
Browse files Browse the repository at this point in the history
Updates the hot reloading reconciler so that Daprd will exit error when
a actor state store enabled Component is hot reloaded. This is chosen
because today, the actors subsystem is not written with any closing or
dynamic support. Doing so will cause panics/corruption in its current
state.

Exiting error is the safest option as this ensures consistency across a
replica set and ensures there is no surprise for the user that behaviour
does not match given configuration.

See also #7433

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Jan 24, 2024
1 parent 75f0b43 commit 8aaf919
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 48 deletions.
49 changes: 39 additions & 10 deletions pkg/runtime/hotreload/reconciler/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package reconciler

import (
"context"
"fmt"
"strings"

componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/dapr/dapr/pkg/runtime/hotreload/differ"
"github.com/dapr/dapr/pkg/runtime/hotreload/loader"
"github.com/dapr/dapr/pkg/runtime/processor"
"github.com/dapr/dapr/pkg/runtime/processor/state"
)

type component struct {
Expand All @@ -36,10 +38,9 @@ type component struct {
// the generic reconciler.
//
//nolint:unused
func (c *component) update(ctx context.Context, comp componentsapi.Component) {
if strings.HasPrefix(comp.Spec.Type, "middleware.") {
log.Warnf("Hotreload is not supported for middleware components: %s", comp.LogName())
return
func (c *component) update(ctx context.Context, comp componentsapi.Component) error {
if err := c.verify(comp); err != nil {
return err

Check warning on line 43 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L42-L43

Added lines #L42 - L43 were not covered by tests
}

oldComp, exists := c.store.GetComponent(comp.Name)
Expand All @@ -48,32 +49,60 @@ func (c *component) update(ctx context.Context, comp componentsapi.Component) {
if exists {
if differ.AreSame(oldComp, comp) {
log.Debugf("Component update skipped: no changes detected: %s", comp.LogName())
return
return nil

Check warning on line 52 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L52

Added line #L52 was not covered by tests
}

log.Infof("Closing existing Component to reload: %s", oldComp.LogName())
// TODO: change close to accept pointer
if err := c.proc.Close(oldComp); err != nil {
log.Errorf("error closing old component: %w", err)
return
log.Errorf("error closing old component: %s", err)
return nil

Check warning on line 59 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L58-L59

Added lines #L58 - L59 were not covered by tests
}
}

if !c.auth.IsObjectAuthorized(comp) {
log.Warnf("Received unauthorized component update, ignored: %s", comp.LogName())
return
return nil

Check warning on line 65 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L65

Added line #L65 was not covered by tests
}

log.Infof("Adding Component for processing: %s", comp.LogName())
if c.proc.AddPendingComponent(ctx, comp) {
log.Infof("Component updated: %s", comp.LogName())
c.proc.WaitForEmptyComponentQueue()
}

return nil

Check warning on line 74 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L74

Added line #L74 was not covered by tests
}

//nolint:unused
func (c *component) delete(comp componentsapi.Component) {
func (c *component) delete(comp componentsapi.Component) error {
if err := c.verify(comp); err != nil {
return err

Check warning on line 80 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L79-L80

Added lines #L79 - L80 were not covered by tests
}

if err := c.proc.Close(comp); err != nil {
log.Errorf("error closing deleted component: %w", err)
log.Errorf("error closing deleted component: %s", err)

Check warning on line 84 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L84

Added line #L84 was not covered by tests
}

return nil

Check warning on line 87 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L87

Added line #L87 was not covered by tests
}

//nolint:unused
func (c *component) verify(vcomp componentsapi.Component) error {
toverify := []componentsapi.Component{vcomp}
if comp, ok := c.store.GetComponent(vcomp.Name); ok {
toverify = append(toverify, comp)

Check warning on line 94 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L92-L94

Added lines #L92 - L94 were not covered by tests
}

for _, comp := range toverify {
if strings.HasPrefix(comp.Spec.Type, "state.") {

Check warning on line 98 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L98

Added line #L98 was not covered by tests
for _, meta := range comp.Spec.Metadata {
if strings.EqualFold(meta.Name, state.PropertyKeyActorStateStore) {
return fmt.Errorf("aborting to hot-reload a state store component that is used as an actor state store: %s", comp.LogName())

Check warning on line 101 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L100-L101

Added lines #L100 - L101 were not covered by tests
}
}
}
}

return nil
}

Check warning on line 108 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L107-L108

Added lines #L107 - L108 were not covered by tests
46 changes: 27 additions & 19 deletions pkg/runtime/hotreload/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package reconciler

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -53,8 +54,8 @@ type Reconciler[T differ.Resource] struct {

type manager[T differ.Resource] interface {
loader.Loader[T]
update(context.Context, T)
delete(T)
update(context.Context, T) error
delete(T) error
}

func NewComponent(opts Options[componentsapi.Component]) *Reconciler[componentsapi.Component] {
Expand All @@ -80,9 +81,7 @@ func (r *Reconciler[T]) Run(ctx context.Context) error {
return fmt.Errorf("error starting component stream: %w", err)
}

r.watchForEvents(ctx, stream)

return nil
return r.watchForEvents(ctx, stream)
}

func (r *Reconciler[T]) Close() error {
Expand All @@ -94,7 +93,7 @@ func (r *Reconciler[T]) Close() error {
return nil
}

func (r *Reconciler[T]) watchForEvents(ctx context.Context, stream <-chan *loader.Event[T]) {
func (r *Reconciler[T]) watchForEvents(ctx context.Context, stream <-chan *loader.Event[T]) error {
log.Infof("Starting to watch %s updates", r.kind)

ticker := r.clock.NewTicker(time.Second * 60)
Expand All @@ -106,9 +105,9 @@ func (r *Reconciler[T]) watchForEvents(ctx context.Context, stream <-chan *loade
for {
select {
case <-ctx.Done():
return
return nil
case <-r.closeCh:
return
return nil
case <-ticker.C():
log.Debugf("Running scheduled %s reconcile", r.kind)
resources, err := r.manager.List(ctx)
Expand All @@ -118,10 +117,12 @@ func (r *Reconciler[T]) watchForEvents(ctx context.Context, stream <-chan *loade
}

if err := r.reconcile(ctx, differ.Diff(resources)); err != nil {
log.Errorf("Error reconciling %s: %s", r.kind, err)
return err

Check warning on line 120 in pkg/runtime/hotreload/reconciler/reconciler.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/reconciler.go#L120

Added line #L120 was not covered by tests
}
case event := <-stream:
r.handleEvent(ctx, event)
if err := r.handleEvent(ctx, event); err != nil {
return err

Check warning on line 124 in pkg/runtime/hotreload/reconciler/reconciler.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/reconciler.go#L124

Added line #L124 was not covered by tests
}
}
}
}
Expand All @@ -131,7 +132,6 @@ func (r *Reconciler[T]) reconcile(ctx context.Context, result *differ.Result[T])
return nil
}

var wg sync.WaitGroup
for _, group := range []struct {
resources []T
eventType operatorpb.ResourceEventType
Expand All @@ -140,34 +140,42 @@ func (r *Reconciler[T]) reconcile(ctx context.Context, result *differ.Result[T])
{result.Updated, operatorpb.ResourceEventType_UPDATED},
{result.Created, operatorpb.ResourceEventType_CREATED},
} {
wg.Add(len(group.resources))
errCh := make(chan error, len(group.resources))
for _, resource := range group.resources {
go func(resource T, eventType operatorpb.ResourceEventType) {
defer wg.Done()
r.handleEvent(ctx, &loader.Event[T]{
errCh <- r.handleEvent(ctx, &loader.Event[T]{
Type: eventType,
Resource: resource,
})
}(resource, group.eventType)
}
wg.Wait()

errs := make([]error, 0, len(group.resources))
for range group.resources {
errs = append(errs, <-errCh)
}
if err := errors.Join(errs...); err != nil {
return fmt.Errorf("error reconciling %s: %w", r.kind, err)

Check warning on line 158 in pkg/runtime/hotreload/reconciler/reconciler.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/reconciler.go#L158

Added line #L158 was not covered by tests
}
}

return nil
}

func (r *Reconciler[T]) handleEvent(ctx context.Context, event *loader.Event[T]) {
func (r *Reconciler[T]) handleEvent(ctx context.Context, event *loader.Event[T]) error {
log.Debugf("Received %s event %s: %s", event.Resource.Kind(), event.Type, event.Resource.LogName())

switch event.Type {
case operatorpb.ResourceEventType_CREATED:
log.Infof("Received %s creation: %s", r.kind, event.Resource.LogName())
r.manager.update(ctx, event.Resource)
return r.manager.update(ctx, event.Resource)
case operatorpb.ResourceEventType_UPDATED:
log.Infof("Received %s update: %s", r.kind, event.Resource.LogName())
r.manager.update(ctx, event.Resource)
return r.manager.update(ctx, event.Resource)
case operatorpb.ResourceEventType_DELETED:
log.Infof("Received %s deletion, closing: %s", r.kind, event.Resource.LogName())
r.manager.delete(event.Resource)
return r.manager.delete(event.Resource)
}

return nil

Check warning on line 180 in pkg/runtime/hotreload/reconciler/reconciler.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/reconciler.go#L180

Added line #L180 was not covered by tests
}
36 changes: 22 additions & 14 deletions pkg/runtime/hotreload/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ func Test_Run(t *testing.T) {
mngr.Loader = compLoader
updateCh := make(chan componentsapi.Component)
deleteCh := make(chan componentsapi.Component)
mngr.deleteFn = func(c componentsapi.Component) {
mngr.deleteFn = func(c componentsapi.Component) error {
deleteCh <- c
return nil
}
mngr.updateFn = func(_ context.Context, c componentsapi.Component) {
mngr.updateFn = func(_ context.Context, c componentsapi.Component) error {
updateCh <- c
return nil
}

r.manager = mngr
Expand Down Expand Up @@ -180,11 +182,13 @@ func Test_reconcile(t *testing.T) {

eventCh := make(chan componentsapi.Component)
mngr := newFakeManager()
mngr.deleteFn = func(c componentsapi.Component) {
mngr.deleteFn = func(c componentsapi.Component) error {
eventCh <- c
return nil
}
mngr.updateFn = func(_ context.Context, c componentsapi.Component) {
mngr.updateFn = func(_ context.Context, c componentsapi.Component) error {
eventCh <- c
return nil
}

r := &Reconciler[componentsapi.Component]{
Expand Down Expand Up @@ -249,13 +253,15 @@ func Test_handleEvent(t *testing.T) {
updateCalled, deleteCalled := 0, 0
comp1 := componentsapi.Component{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}

mngr.deleteFn = func(c componentsapi.Component) {
mngr.deleteFn = func(c componentsapi.Component) error {
assert.Equal(t, comp1, c)
deleteCalled++
return nil
}
mngr.updateFn = func(_ context.Context, c componentsapi.Component) {
mngr.updateFn = func(_ context.Context, c componentsapi.Component) error {
assert.Equal(t, comp1, c)
updateCalled++
return nil
}

r := &Reconciler[componentsapi.Component]{manager: mngr}
Expand Down Expand Up @@ -287,25 +293,27 @@ func Test_handleEvent(t *testing.T) {

type fakeManager struct {
loader.Loader[componentsapi.Component]
updateFn func(context.Context, componentsapi.Component)
deleteFn func(componentsapi.Component)
updateFn func(context.Context, componentsapi.Component) error
deleteFn func(componentsapi.Component) error
}

func newFakeManager() *fakeManager {
return &fakeManager{
updateFn: func(context.Context, componentsapi.Component) {
updateFn: func(context.Context, componentsapi.Component) error {
return nil
},
deleteFn: func(componentsapi.Component) {
deleteFn: func(componentsapi.Component) error {
return nil
},
}
}

//nolint:unused
func (f *fakeManager) update(ctx context.Context, comp componentsapi.Component) {
f.updateFn(ctx, comp)
func (f *fakeManager) update(ctx context.Context, comp componentsapi.Component) error {
return f.updateFn(ctx, comp)
}

//nolint:unused
func (f *fakeManager) delete(comp componentsapi.Component) {
f.deleteFn(comp)
func (f *fakeManager) delete(comp componentsapi.Component) error {
return f.deleteFn(comp)
}
4 changes: 2 additions & 2 deletions pkg/runtime/processor/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

const (
propertyKeyActorStateStore = "actorstatestore"
PropertyKeyActorStateStore = "actorstatestore"
)

var log = logger.NewLogger("dapr.runtime.processor.state")
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *state) Init(ctx context.Context, comp compapi.Component) error {
actorStoreSpecified := false
for k, v := range props {
//nolint:gocritic
if strings.ToLower(k) == propertyKeyActorStateStore {
if strings.ToLower(k) == PropertyKeyActorStateStore {

Check warning on line 128 in pkg/runtime/processor/state/state.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/processor/state/state.go#L128

Added line #L128 was not covered by tests
actorStoreSpecified = utils.IsTruthy(v)
break
}
Expand Down
17 changes: 14 additions & 3 deletions tests/integration/framework/process/logline/logline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"errors"
"io"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -83,12 +85,10 @@ func New(t *testing.T, fopts ...Option) *LogLine {
func (l *LogLine) Run(t *testing.T, ctx context.Context) {
go func() {
res := l.checkOut(t, ctx, l.stdoutLineContains, l.stdoutExp, l.stdout)
l.done.Add(1)
l.outCheck <- res
}()
go func() {
res := l.checkOut(t, ctx, l.stderrLinContains, l.stderrExp, l.stderr)
l.done.Add(1)
l.outCheck <- res
}()
}
Expand All @@ -109,6 +109,11 @@ func (l *LogLine) Cleanup(t *testing.T) {
func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[string]bool, closer io.WriteCloser, reader io.Reader) map[string]bool {
t.Helper()

if len(expLines) == 0 {
l.done.Add(1)
return expLines
}

go func() {
select {
case <-ctx.Done():
Expand All @@ -117,10 +122,12 @@ func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[strin
}
}()

var once sync.Once

breader := bufio.NewReader(reader)
for {
if len(expLines) == 0 {
break
once.Do(func() { l.done.Add(1) })
}

line, _, err := breader.ReadLine()
Expand All @@ -146,3 +153,7 @@ func (l *LogLine) Stdout() io.WriteCloser {
func (l *LogLine) Stderr() io.WriteCloser {
return l.stderrExp
}

func (l *LogLine) EventuallyFoundAll(t *testing.T) {
assert.Eventually(t, l.FoundAll, time.Second*20, time.Millisecond*100)
}

0 comments on commit 8aaf919

Please sign in to comment.