Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow backends: ignore on hot reloading #7433

Merged
merged 12 commits into from
Jan 30, 2024
Merged
8 changes: 4 additions & 4 deletions pkg/components/wfbackend/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// Registry is an interface for a component that returns registered workflow backend implementations.
type Registry struct {
Logger logger.Logger
workflowBackendComponents map[string]WorkflowBackend
workflowBackendComponents map[string]workflowBackendFactory
}

// DefaultRegistry is the singleton with the registry.
Expand All @@ -35,12 +35,12 @@ var DefaultRegistry *Registry = NewRegistry()
// NewRegistry is used to create workflow registry.
func NewRegistry() *Registry {
return &Registry{
workflowBackendComponents: make(map[string]WorkflowBackend),
workflowBackendComponents: make(map[string]workflowBackendFactory),
}
}

// RegisterComponent adds a new workflow to the registry.
func (s *Registry) RegisterComponent(componentFactory WorkflowBackend, names ...string) {
func (s *Registry) RegisterComponent(componentFactory workflowBackendFactory, names ...string) {
for _, name := range names {
s.workflowBackendComponents[createFullName(name)] = componentFactory
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func (s *Registry) getWorkflowBackendComponent(name, version, logName string) (f
return nil, false
}

func (s *Registry) wrapFn(componentFactory WorkflowBackend, logName string) func(Metadata) (backend.Backend, error) {
func (s *Registry) wrapFn(componentFactory workflowBackendFactory, logName string) func(Metadata) (backend.Backend, error) {
return func(m Metadata) (backend.Backend, error) {
l := s.Logger
if logName != "" && l != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/components/wfbackend/wfbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ import (
"github.com/dapr/kit/logger"
)

// WorkflowBackend is a function that returns a workflow backend
type WorkflowBackend = func(Metadata, logger.Logger) (backend.Backend, error)
// workflowBackendFactory is a function that returns a workflow backend
type workflowBackendFactory func(Metadata, logger.Logger) (backend.Backend, error)
7 changes: 4 additions & 3 deletions pkg/runtime/compstore/compstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package compstore
import (
"sync"

"github.com/microsoft/durabletask-go/backend"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/configuration"
"github.com/dapr/components-contrib/crypto"
Expand All @@ -25,7 +27,6 @@ import (
"github.com/dapr/components-contrib/workflows"
compsv1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
httpEndpointV1alpha1 "github.com/dapr/dapr/pkg/apis/httpEndpoint/v1alpha1"
wfbe "github.com/dapr/dapr/pkg/components/wfbackend"
"github.com/dapr/dapr/pkg/config"
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
)
Expand All @@ -48,7 +49,7 @@ type ComponentStore struct {
pubSubs map[string]PubsubItem
topicRoutes map[string]TopicRoutes
workflowComponents map[string]workflows.Workflow
workflowBackends map[string]wfbe.WorkflowBackend
workflowBackends map[string]backend.Backend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the correct type for the workflow backend implementation interface. Notice the other component types are using the interface types defined in components-contrib however backends is using durabletask-go directly. This wasn't picked up before as this field wasn't previously being used (workflowbackend.go didn't exist). wfbe.WorkflowBackend is a factory func type.

cryptoProviders map[string]crypto.SubtleCrypto
components []compsv1alpha1.Component
subscriptions []rtpubsub.Subscription
Expand All @@ -75,7 +76,7 @@ func New() *ComponentStore {
locks: make(map[string]lock.Store),
pubSubs: make(map[string]PubsubItem),
workflowComponents: make(map[string]workflows.Workflow),
workflowBackends: make(map[string]wfbe.WorkflowBackend),
workflowBackends: make(map[string]backend.Backend),
cryptoProviders: make(map[string]crypto.SubtleCrypto),
topicRoutes: make(map[string]TopicRoutes),
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/runtime/compstore/workflowbackend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package compstore

import (
"github.com/microsoft/durabletask-go/backend"
)

func (c *ComponentStore) AddWorkflowBackend(name string, backend backend.Backend) {
c.lock.Lock()
defer c.lock.Unlock()
c.workflowBackends[name] = backend
}

func (c *ComponentStore) GetWorkflowBackend(name string) (backend.Backend, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
backend, ok := c.workflowBackends[name]
return backend, ok
}

func (c *ComponentStore) ListWorkflowBackends() map[string]backend.Backend {
c.lock.RLock()
defer c.lock.RUnlock()
return c.workflowBackends
}

func (c *ComponentStore) DeleteWorkflowBackend(name string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.workflowBackends, name)
}
14 changes: 14 additions & 0 deletions pkg/runtime/hotreload/reconciler/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
log.Infof("Component updated: %s", comp.LogName())
c.proc.WaitForEmptyComponentQueue()
}

return

Check warning on line 73 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L73

Added line #L73 was not covered by tests
}

//nolint:unused
Expand Down Expand Up @@ -107,5 +109,17 @@
}
}

for backendName := range c.store.ListWorkflowBackends() {
if backendName == vcomp.Name {
log.Errorf("Aborting to hot-reload a workflowbackend component which is not supported: %s", vcomp.LogName())
return false

Check warning on line 115 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L113-L115

Added lines #L113 - L115 were not covered by tests
}
}

if strings.HasPrefix(vcomp.Spec.Type, "workflowbackend.") {
log.Errorf("Aborting to hot-reload a workflowbackend component which is not supported: %s", vcomp.LogName())
return false

Check warning on line 121 in pkg/runtime/hotreload/reconciler/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/reconciler/component.go#L119-L121

Added lines #L119 - L121 were not covered by tests
}

return true
}
2 changes: 1 addition & 1 deletion pkg/runtime/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type BindingManager interface {
}

type WorkflowBackendManager interface {
GetBackend() (backend.Backend, bool)
Backend() (backend.Backend, bool)
}

func (p *Processor) managerFromComp(comp componentsapi.Component) (manager, error) {
Expand Down
45 changes: 30 additions & 15 deletions pkg/runtime/processor/wfbackend/wfbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"sync"
"time"

"github.com/microsoft/durabletask-go/backend"

Expand Down Expand Up @@ -56,18 +57,18 @@ func New(opts Options) *workflowBackend {
}
}

func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) error {
wfbe.lock.Lock()
defer wfbe.lock.Unlock()
func (w *workflowBackend) Init(ctx context.Context, comp compapi.Component) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't rename variables for no reason, it causes unnecessary merge conflicts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention we use single letter variable names for method receiver variables. This gives overhead to the reader as it is unexpected. wfbe initially tripped my up as I was confused as to where this variable was coming from until I realized it was the receiver.

w.lock.Lock()
defer w.lock.Unlock()

if wfbe.backend != nil {
if w.backend != nil {
// Can only have 1 workflow backend component
return errors.New("cannot create more than one workflow backend component")
}

// Create the component
fName := comp.LogName()
beFactory, err := wfbe.registry.Create(comp.Spec.Type, comp.Spec.Version, fName)
beFactory, err := w.registry.Create(comp.Spec.Type, comp.Spec.Version, fName)
if err != nil {
log.Errorf("Error creating workflow backend component (%s): %v", fName, err)
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
Expand All @@ -79,14 +80,14 @@ func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) e
}

// Initialization
baseMetadata, err := wfbe.meta.ToBaseMetadata(comp)
baseMetadata, err := w.meta.ToBaseMetadata(comp)
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name)
return rterrors.NewInit(rterrors.InitComponentFailure, fName, err)
}

be, err := beFactory(wfbeComp.Metadata{
AppID: wfbe.appID,
AppID: w.appID,
Base: baseMetadata,
})
if err != nil {
Expand All @@ -96,21 +97,35 @@ func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) e

log.Infof("Using %s as workflow backend", comp.Spec.Type)
diag.DefaultMonitoring.ComponentInitialized(comp.Spec.Type)
wfbe.backend = be
w.backend = be
w.compStore.AddWorkflowBackend(comp.Name, be)

return nil
}

func (wfbe *workflowBackend) Close(comp compapi.Component) error {
return nil
func (w *workflowBackend) Close(comp compapi.Component) error {
w.lock.Lock()
defer w.lock.Unlock()

backend, ok := w.compStore.GetWorkflowBackend(comp.Name)
if !ok {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
defer w.compStore.DeleteWorkflowBackend(comp.Name)
w.backend = nil

return backend.Stop(ctx)
}

func (wfbe *workflowBackend) GetBackend() (backend.Backend, bool) {
wfbe.lock.Lock()
defer wfbe.lock.Unlock()
func (w *workflowBackend) Backend() (backend.Backend, bool) {
w.lock.Lock()
defer w.lock.Unlock()

if wfbe.backend == nil {
if w.backend == nil {
return nil, false
}
return wfbe.backend, true
return w.backend, true
}
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/wfengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewWorkflowEngine(appID string, spec config.WorkflowSpec, backendManager pr

var ok bool
if backendManager != nil {
engine.Backend, ok = backendManager.GetBackend()
engine.Backend, ok = backendManager.Backend()
}
if !ok {
// If no backend was initialized by the manager, create a backend backed by actors
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/framework/binary/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Build(t *testing.T, name string) {

t.Logf("Root dir: %q", rootDir)
t.Logf("Compiling %q binary to: %q", name, binPath)
cmd := exec.Command("go", "build", "-tags=allcomponents", "-v", "-o", binPath, "./cmd/"+name)
cmd := exec.Command("go", "build", "-tags=allcomponents,wfbackendsqlite", "-v", "-o", binPath, "./cmd/"+name)
cmd.Dir = rootDir
cmd.Stdout = ioout
cmd.Stderr = ioerr
Expand Down
8 changes: 6 additions & 2 deletions tests/integration/framework/process/daprd/daprd.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,19 @@ func (d *Daprd) WaitUntilAppHealth(t *testing.T, ctx context.Context) {
}
}

func (d *Daprd) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient {
func (d *Daprd) GRPCConn(t *testing.T, ctx context.Context) *grpc.ClientConn {
conn, err := grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", d.GRPCPort()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, conn.Close()) })

return rtv1.NewDaprClient(conn)
return conn
}

func (d *Daprd) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient {
return rtv1.NewDaprClient(d.GRPCConn(t, ctx))
}

func (d *Daprd) AppID() string {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/framework/process/daprd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func WithAppHealthProbeThreshold(threshold int) Option {

func WithResourceFiles(files ...string) Option {
return func(o *options) {
o.resourceFiles = files
o.resourceFiles = append(o.resourceFiles, files...)
}
}

Expand Down