Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow backends: ignore on hot reloading #7433

Merged
merged 12 commits into from
Jan 30, 2024
Merged
8 changes: 4 additions & 4 deletions pkg/components/wfbackend/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// Registry is an interface for a component that returns registered workflow backend implementations.
type Registry struct {
Logger logger.Logger
workflowBackendComponents map[string]WorkflowBackend
workflowBackendComponents map[string]workflowBackendFactory
}

// DefaultRegistry is the singleton with the registry.
Expand All @@ -35,12 +35,12 @@ var DefaultRegistry *Registry = NewRegistry()
// NewRegistry is used to create workflow registry.
func NewRegistry() *Registry {
return &Registry{
workflowBackendComponents: make(map[string]WorkflowBackend),
workflowBackendComponents: make(map[string]workflowBackendFactory),
}
}

// RegisterComponent adds a new workflow to the registry.
func (s *Registry) RegisterComponent(componentFactory WorkflowBackend, names ...string) {
func (s *Registry) RegisterComponent(componentFactory workflowBackendFactory, names ...string) {
for _, name := range names {
s.workflowBackendComponents[createFullName(name)] = componentFactory
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func (s *Registry) getWorkflowBackendComponent(name, version, logName string) (f
return nil, false
}

func (s *Registry) wrapFn(componentFactory WorkflowBackend, logName string) func(Metadata) (backend.Backend, error) {
func (s *Registry) wrapFn(componentFactory workflowBackendFactory, logName string) func(Metadata) (backend.Backend, error) {
return func(m Metadata) (backend.Backend, error) {
l := s.Logger
if logName != "" && l != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/components/wfbackend/wfbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ import (
"github.com/dapr/kit/logger"
)

// WorkflowBackend is a function that returns a workflow backend
type WorkflowBackend = func(Metadata, logger.Logger) (backend.Backend, error)
// workflowBackendFactory is a function that returns a workflow backend
type workflowBackendFactory func(Metadata, logger.Logger) (backend.Backend, error)
7 changes: 4 additions & 3 deletions pkg/runtime/compstore/compstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package compstore
import (
"sync"

"github.com/microsoft/durabletask-go/backend"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/configuration"
"github.com/dapr/components-contrib/crypto"
Expand All @@ -25,7 +27,6 @@ import (
"github.com/dapr/components-contrib/workflows"
compsv1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
httpEndpointV1alpha1 "github.com/dapr/dapr/pkg/apis/httpEndpoint/v1alpha1"
wfbe "github.com/dapr/dapr/pkg/components/wfbackend"
"github.com/dapr/dapr/pkg/config"
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
)
Expand All @@ -48,7 +49,7 @@ type ComponentStore struct {
pubSubs map[string]PubsubItem
topicRoutes map[string]TopicRoutes
workflowComponents map[string]workflows.Workflow
workflowBackends map[string]wfbe.WorkflowBackend
workflowBackends map[string]backend.Backend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the correct type for the workflow backend implementation interface. Notice the other component types are using the interface types defined in components-contrib however backends is using durabletask-go directly. This wasn't picked up before as this field wasn't previously being used (workflowbackend.go didn't exist). wfbe.WorkflowBackend is a factory func type.

cryptoProviders map[string]crypto.SubtleCrypto
components []compsv1alpha1.Component
subscriptions []rtpubsub.Subscription
Expand All @@ -75,7 +76,7 @@ func New() *ComponentStore {
locks: make(map[string]lock.Store),
pubSubs: make(map[string]PubsubItem),
workflowComponents: make(map[string]workflows.Workflow),
workflowBackends: make(map[string]wfbe.WorkflowBackend),
workflowBackends: make(map[string]backend.Backend),
cryptoProviders: make(map[string]crypto.SubtleCrypto),
topicRoutes: make(map[string]TopicRoutes),
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/runtime/compstore/workflowbackend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package compstore

import (
"github.com/microsoft/durabletask-go/backend"
)

func (c *ComponentStore) AddWorkflowBackend(name string, backend backend.Backend) {
c.lock.Lock()
defer c.lock.Unlock()
c.workflowBackends[name] = backend
}

func (c *ComponentStore) GetWorkflowBackend(name string) (backend.Backend, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
backend, ok := c.workflowBackends[name]
return backend, ok
}

func (c *ComponentStore) ListWorkflowBackends() map[string]backend.Backend {
c.lock.RLock()
defer c.lock.RUnlock()
return c.workflowBackends
}

func (c *ComponentStore) DeleteWorkflowBackend(name string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.workflowBackends, name)
}
19 changes: 19 additions & 0 deletions pkg/runtime/hotreload/reconciler/component.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*

Check failure on line 1 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / lint & proto validation (linux, amd64)

: # github.com/dapr/dapr/pkg/runtime/hotreload/reconciler [github.com/dapr/dapr/pkg/runtime/hotreload/reconciler.test]
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,8 @@
log.Infof("Component updated: %s", comp.LogName())
c.proc.WaitForEmptyComponentQueue()
}

return
}

//nolint:unused
Expand Down Expand Up @@ -109,3 +111,20 @@

return true
}

//nolint:unused
func (c *component) verify(vcomp componentsapi.Component) bool {

Check failure on line 116 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / Standalone validations

method component.verify already declared at pkg/runtime/hotreload/reconciler/component.go:88:21

Check failure on line 116 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / e2e (v1.25.3, non-ha)

method component.verify already declared at pkg/runtime/hotreload/reconciler/component.go:88:21

Check failure on line 116 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / e2e (v1.24.7, ha)

method component.verify already declared at pkg/runtime/hotreload/reconciler/component.go:88:21

Check failure on line 116 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / e2e (v1.24.7, non-ha)

method component.verify already declared at pkg/runtime/hotreload/reconciler/component.go:88:21

Check failure on line 116 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / e2e (v1.25.3, ha)

method component.verify already declared at pkg/runtime/hotreload/reconciler/component.go:88:21

Check failure on line 116 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / e2e (v1.23.13, ha)

method component.verify already declared at pkg/runtime/hotreload/reconciler/component.go:88:21

Check failure on line 116 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / lint & proto validation (linux, amd64)

method component.verify already declared at pkg/runtime/hotreload/reconciler/component.go:88:21) (typecheck)

Check failure on line 116 in pkg/runtime/hotreload/reconciler/component.go

View workflow job for this annotation

GitHub Actions / lint & proto validation (linux, amd64)

method component.verify already declared at pkg/runtime/hotreload/reconciler/component.go:88:21 (typecheck)
for backendName := range c.store.ListWorkflowBackends() {
if backendName == vcomp.Name {
log.Errorf("Aborting to hot-reload a workflowbackend component which is not supported: %s", vcomp.LogName())
return false
}
}

if strings.HasPrefix(vcomp.Spec.Type, "workflowbackend.") {
log.Errorf("Aborting to hot-reload a workflowbackend component which is not supported: %s", vcomp.LogName())
return false
}

return true
}
2 changes: 1 addition & 1 deletion pkg/runtime/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type BindingManager interface {
}

type WorkflowBackendManager interface {
GetBackend() (backend.Backend, bool)
Backend() (backend.Backend, bool)
}

func (p *Processor) managerFromComp(comp componentsapi.Component) (manager, error) {
Expand Down
45 changes: 30 additions & 15 deletions pkg/runtime/processor/wfbackend/wfbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"sync"
"time"

"github.com/microsoft/durabletask-go/backend"

Expand Down Expand Up @@ -56,18 +57,18 @@ func New(opts Options) *workflowBackend {
}
}

func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) error {
wfbe.lock.Lock()
defer wfbe.lock.Unlock()
func (w *workflowBackend) Init(ctx context.Context, comp compapi.Component) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't rename variables for no reason, it causes unnecessary merge conflicts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention we use single letter variable names for method receiver variables. This gives overhead to the reader as it is unexpected. wfbe initially tripped my up as I was confused as to where this variable was coming from until I realized it was the receiver.

w.lock.Lock()
defer w.lock.Unlock()

if wfbe.backend != nil {
if w.backend != nil {
// Can only have 1 workflow backend component
return errors.New("cannot create more than one workflow backend component")
}

// Create the component
fName := comp.LogName()
beFactory, err := wfbe.registry.Create(comp.Spec.Type, comp.Spec.Version, fName)
beFactory, err := w.registry.Create(comp.Spec.Type, comp.Spec.Version, fName)
if err != nil {
log.Errorf("Error creating workflow backend component (%s): %v", fName, err)
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
Expand All @@ -79,14 +80,14 @@ func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) e
}

// Initialization
baseMetadata, err := wfbe.meta.ToBaseMetadata(comp)
baseMetadata, err := w.meta.ToBaseMetadata(comp)
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.InitComponentFailure, fName, err)
}

be, err := beFactory(wfbeComp.Metadata{
AppID: wfbe.appID,
AppID: w.appID,
Base: baseMetadata,
})
if err != nil {
Expand All @@ -96,21 +97,35 @@ func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) e

log.Infof("Using %s as workflow backend", comp.Spec.Type)
diag.DefaultMonitoring.ComponentInitialized(comp.Spec.Type)
wfbe.backend = be
w.backend = be
w.compStore.AddWorkflowBackend(comp.Name, be)

return nil
}

func (wfbe *workflowBackend) Close(comp compapi.Component) error {
return nil
func (w *workflowBackend) Close(comp compapi.Component) error {
w.lock.Lock()
defer w.lock.Unlock()

backend, ok := w.compStore.GetWorkflowBackend(comp.Name)
if !ok {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
defer w.compStore.DeleteWorkflowBackend(comp.Name)
w.backend = nil

return backend.Stop(ctx)
}

func (wfbe *workflowBackend) GetBackend() (backend.Backend, bool) {
wfbe.lock.Lock()
defer wfbe.lock.Unlock()
func (w *workflowBackend) Backend() (backend.Backend, bool) {
w.lock.Lock()
defer w.lock.Unlock()

if wfbe.backend == nil {
if w.backend == nil {
return nil, false
}
return wfbe.backend, true
return w.backend, true
}
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/wfengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewWorkflowEngine(appID string, spec config.WorkflowSpec, backendManager pr

var ok bool
if backendManager != nil {
engine.Backend, ok = backendManager.GetBackend()
engine.Backend, ok = backendManager.Backend()
}
if !ok {
// If no backend was initialized by the manager, create a backend backed by actors
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/framework/binary/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Build(t *testing.T, name string) {

t.Logf("Root dir: %q", rootDir)
t.Logf("Compiling %q binary to: %q", name, binPath)
cmd := exec.Command("go", "build", "-tags=allcomponents", "-v", "-o", binPath, "./cmd/"+name)
cmd := exec.Command("go", "build", "-tags=allcomponents,wfbackendsqlite", "-v", "-o", binPath, "./cmd/"+name)
cmd.Dir = rootDir
cmd.Stdout = ioout
cmd.Stderr = ioerr
Expand Down
8 changes: 6 additions & 2 deletions tests/integration/framework/process/daprd/daprd.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,19 @@ func (d *Daprd) WaitUntilAppHealth(t *testing.T, ctx context.Context) {
}
}

func (d *Daprd) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient {
func (d *Daprd) GRPCConn(t *testing.T, ctx context.Context) *grpc.ClientConn {
conn, err := grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", d.GRPCPort()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, conn.Close()) })

return rtv1.NewDaprClient(conn)
return conn
}

func (d *Daprd) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient {
return rtv1.NewDaprClient(d.GRPCConn(t, ctx))
}

func (d *Daprd) AppID() string {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/framework/process/daprd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func WithAppHealthProbeThreshold(threshold int) Option {

func WithResourceFiles(files ...string) Option {
return func(o *options) {
o.resourceFiles = files
o.resourceFiles = append(o.resourceFiles, files...)
}
}

Expand Down