Skip to content

Commit

Permalink
Workflow backends: ignore on hot reloading (#7433)
Browse files Browse the repository at this point in the history
* Workflow backends: exit error on hot reload

Updates the hot reloading reconciler so that Daprd will exit error when
a workflowbackend Component is hot reloaded. This is chosen because
today, the actors and workflow subsystems are not written with any
closing or dynamic support. Doing so will cause panics/corruption in its
current state.

Exiting error is the safest option as this ensures consistency across a
replica set and ensures there is no surprise for the user that
behaviour does not match given configuration.

Adds integration tests for daprd ensuring workflow backends can be
loaded on boot.

Adds tests to ensure daprd will exist error on a workflow backend being
hot reloaded.

We should do the same for actor state store and the actor subsystem.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Review loop

Signed-off-by: joshvanl <me@joshvanl.dev>

* Update workflowbackend int tests for removed workflow comoponent
registered

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix namespacing in workflowbackend integration test

Signed-off-by: joshvanl <me@joshvanl.dev>

* Changes hot reloading to only log an error when workflow backend is
reconciled

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix bad merge conflict resolution.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
3 people committed Jan 30, 2024
1 parent db1ece4 commit 2fe1316
Show file tree
Hide file tree
Showing 21 changed files with 1,362 additions and 372 deletions.
8 changes: 4 additions & 4 deletions pkg/components/wfbackend/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// Registry is an interface for a component that returns registered workflow backend implementations.
type Registry struct {
Logger logger.Logger
workflowBackendComponents map[string]WorkflowBackend
workflowBackendComponents map[string]workflowBackendFactory
}

// DefaultRegistry is the singleton with the registry.
Expand All @@ -35,12 +35,12 @@ var DefaultRegistry *Registry = NewRegistry()
// NewRegistry is used to create workflow registry.
func NewRegistry() *Registry {
return &Registry{
workflowBackendComponents: make(map[string]WorkflowBackend),
workflowBackendComponents: make(map[string]workflowBackendFactory),
}
}

// RegisterComponent adds a new workflow to the registry.
func (s *Registry) RegisterComponent(componentFactory WorkflowBackend, names ...string) {
func (s *Registry) RegisterComponent(componentFactory workflowBackendFactory, names ...string) {
for _, name := range names {
s.workflowBackendComponents[createFullName(name)] = componentFactory
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func (s *Registry) getWorkflowBackendComponent(name, version, logName string) (f
return nil, false
}

func (s *Registry) wrapFn(componentFactory WorkflowBackend, logName string) func(Metadata) (backend.Backend, error) {
func (s *Registry) wrapFn(componentFactory workflowBackendFactory, logName string) func(Metadata) (backend.Backend, error) {
return func(m Metadata) (backend.Backend, error) {
l := s.Logger
if logName != "" && l != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/components/wfbackend/wfbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ import (
"github.com/dapr/kit/logger"
)

// WorkflowBackend is a function that returns a workflow backend
type WorkflowBackend = func(Metadata, logger.Logger) (backend.Backend, error)
// workflowBackendFactory is a function that returns a workflow backend
type workflowBackendFactory func(Metadata, logger.Logger) (backend.Backend, error)
7 changes: 4 additions & 3 deletions pkg/runtime/compstore/compstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package compstore
import (
"sync"

"github.com/microsoft/durabletask-go/backend"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/configuration"
"github.com/dapr/components-contrib/crypto"
Expand All @@ -25,7 +27,6 @@ import (
"github.com/dapr/components-contrib/workflows"
compsv1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
httpEndpointV1alpha1 "github.com/dapr/dapr/pkg/apis/httpEndpoint/v1alpha1"
wfbe "github.com/dapr/dapr/pkg/components/wfbackend"
"github.com/dapr/dapr/pkg/config"
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
)
Expand All @@ -48,7 +49,7 @@ type ComponentStore struct {
pubSubs map[string]PubsubItem
topicRoutes map[string]TopicRoutes
workflowComponents map[string]workflows.Workflow
workflowBackends map[string]wfbe.WorkflowBackend
workflowBackends map[string]backend.Backend
cryptoProviders map[string]crypto.SubtleCrypto
components []compsv1alpha1.Component
subscriptions []rtpubsub.Subscription
Expand All @@ -75,7 +76,7 @@ func New() *ComponentStore {
locks: make(map[string]lock.Store),
pubSubs: make(map[string]PubsubItem),
workflowComponents: make(map[string]workflows.Workflow),
workflowBackends: make(map[string]wfbe.WorkflowBackend),
workflowBackends: make(map[string]backend.Backend),
cryptoProviders: make(map[string]crypto.SubtleCrypto),
topicRoutes: make(map[string]TopicRoutes),
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/runtime/compstore/workflowbackend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package compstore

import (
"github.com/microsoft/durabletask-go/backend"
)

func (c *ComponentStore) AddWorkflowBackend(name string, backend backend.Backend) {
c.lock.Lock()
defer c.lock.Unlock()
c.workflowBackends[name] = backend
}

func (c *ComponentStore) GetWorkflowBackend(name string) (backend.Backend, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
backend, ok := c.workflowBackends[name]
return backend, ok
}

func (c *ComponentStore) ListWorkflowBackends() map[string]backend.Backend {
c.lock.RLock()
defer c.lock.RUnlock()
return c.workflowBackends
}

func (c *ComponentStore) DeleteWorkflowBackend(name string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.workflowBackends, name)
}
14 changes: 14 additions & 0 deletions pkg/runtime/hotreload/reconciler/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (c *component) update(ctx context.Context, comp componentsapi.Component) {
log.Infof("Component updated: %s", comp.LogName())
c.proc.WaitForEmptyComponentQueue()
}

return
}

//nolint:unused
Expand Down Expand Up @@ -107,5 +109,17 @@ func (c *component) verify(vcomp componentsapi.Component) bool {
}
}

for backendName := range c.store.ListWorkflowBackends() {
if backendName == vcomp.Name {
log.Errorf("Aborting to hot-reload a workflowbackend component which is not supported: %s", vcomp.LogName())
return false
}
}

if strings.HasPrefix(vcomp.Spec.Type, "workflowbackend.") {
log.Errorf("Aborting to hot-reload a workflowbackend component which is not supported: %s", vcomp.LogName())
return false
}

return true
}
2 changes: 1 addition & 1 deletion pkg/runtime/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type BindingManager interface {
}

type WorkflowBackendManager interface {
GetBackend() (backend.Backend, bool)
Backend() (backend.Backend, bool)
}

func (p *Processor) managerFromComp(comp componentsapi.Component) (manager, error) {
Expand Down
45 changes: 30 additions & 15 deletions pkg/runtime/processor/wfbackend/wfbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"sync"
"time"

"github.com/microsoft/durabletask-go/backend"

Expand Down Expand Up @@ -56,18 +57,18 @@ func New(opts Options) *workflowBackend {
}
}

func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) error {
wfbe.lock.Lock()
defer wfbe.lock.Unlock()
func (w *workflowBackend) Init(ctx context.Context, comp compapi.Component) error {
w.lock.Lock()
defer w.lock.Unlock()

if wfbe.backend != nil {
if w.backend != nil {
// Can only have 1 workflow backend component
return errors.New("cannot create more than one workflow backend component")
}

// Create the component
fName := comp.LogName()
beFactory, err := wfbe.registry.Create(comp.Spec.Type, comp.Spec.Version, fName)
beFactory, err := w.registry.Create(comp.Spec.Type, comp.Spec.Version, fName)
if err != nil {
log.Errorf("Error creating workflow backend component (%s): %v", fName, err)
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
Expand All @@ -79,14 +80,14 @@ func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) e
}

// Initialization
baseMetadata, err := wfbe.meta.ToBaseMetadata(comp)
baseMetadata, err := w.meta.ToBaseMetadata(comp)
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.InitComponentFailure, fName, err)
}

be, err := beFactory(wfbeComp.Metadata{
AppID: wfbe.appID,
AppID: w.appID,
Base: baseMetadata,
})
if err != nil {
Expand All @@ -96,21 +97,35 @@ func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) e

log.Infof("Using %s as workflow backend", comp.Spec.Type)
diag.DefaultMonitoring.ComponentInitialized(comp.Spec.Type)
wfbe.backend = be
w.backend = be
w.compStore.AddWorkflowBackend(comp.Name, be)

return nil
}

func (wfbe *workflowBackend) Close(comp compapi.Component) error {
return nil
func (w *workflowBackend) Close(comp compapi.Component) error {
w.lock.Lock()
defer w.lock.Unlock()

backend, ok := w.compStore.GetWorkflowBackend(comp.Name)
if !ok {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
defer w.compStore.DeleteWorkflowBackend(comp.Name)
w.backend = nil

return backend.Stop(ctx)
}

func (wfbe *workflowBackend) GetBackend() (backend.Backend, bool) {
wfbe.lock.Lock()
defer wfbe.lock.Unlock()
func (w *workflowBackend) Backend() (backend.Backend, bool) {
w.lock.Lock()
defer w.lock.Unlock()

if wfbe.backend == nil {
if w.backend == nil {
return nil, false
}
return wfbe.backend, true
return w.backend, true
}
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/wfengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewWorkflowEngine(appID string, spec config.WorkflowSpec, backendManager pr

var ok bool
if backendManager != nil {
engine.Backend, ok = backendManager.GetBackend()
engine.Backend, ok = backendManager.Backend()
}
if !ok {
// If no backend was initialized by the manager, create a backend backed by actors
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/framework/binary/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Build(t *testing.T, name string) {

t.Logf("Root dir: %q", rootDir)
t.Logf("Compiling %q binary to: %q", name, binPath)
cmd := exec.Command("go", "build", "-tags=allcomponents", "-v", "-o", binPath, "./cmd/"+name)
cmd := exec.Command("go", "build", "-tags=allcomponents,wfbackendsqlite", "-v", "-o", binPath, "./cmd/"+name)
cmd.Dir = rootDir
cmd.Stdout = ioout
cmd.Stderr = ioerr
Expand Down
8 changes: 6 additions & 2 deletions tests/integration/framework/process/daprd/daprd.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,19 @@ func (d *Daprd) WaitUntilAppHealth(t *testing.T, ctx context.Context) {
}
}

func (d *Daprd) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient {
func (d *Daprd) GRPCConn(t *testing.T, ctx context.Context) *grpc.ClientConn {
conn, err := grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", d.GRPCPort()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, conn.Close()) })

return rtv1.NewDaprClient(conn)
return conn
}

func (d *Daprd) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient {
return rtv1.NewDaprClient(d.GRPCConn(t, ctx))
}

func (d *Daprd) AppID() string {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/framework/process/daprd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func WithAppHealthProbeThreshold(threshold int) Option {

func WithResourceFiles(files ...string) Option {
return func(o *options) {
o.resourceFiles = files
o.resourceFiles = append(o.resourceFiles, files...)
}
}

Expand Down

0 comments on commit 2fe1316

Please sign in to comment.