-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Workflow backends: ignore on hot reloading #7433
Changes from all commits
ed5bc00
2b15450
6bc848d
ba58da7
b10fe79
80e4887
c0358f6
d41a558
0d130fb
b33a1e4
0227d6f
0607ef9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
Copyright 2024 The Dapr Authors | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package compstore | ||
|
||
import ( | ||
"github.com/microsoft/durabletask-go/backend" | ||
) | ||
|
||
func (c *ComponentStore) AddWorkflowBackend(name string, backend backend.Backend) { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
c.workflowBackends[name] = backend | ||
} | ||
|
||
func (c *ComponentStore) GetWorkflowBackend(name string) (backend.Backend, bool) { | ||
c.lock.RLock() | ||
defer c.lock.RUnlock() | ||
backend, ok := c.workflowBackends[name] | ||
return backend, ok | ||
} | ||
|
||
func (c *ComponentStore) ListWorkflowBackends() map[string]backend.Backend { | ||
c.lock.RLock() | ||
defer c.lock.RUnlock() | ||
return c.workflowBackends | ||
} | ||
|
||
func (c *ComponentStore) DeleteWorkflowBackend(name string) { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
delete(c.workflowBackends, name) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ import ( | |
"context" | ||
"errors" | ||
"sync" | ||
"time" | ||
|
||
"github.com/microsoft/durabletask-go/backend" | ||
|
||
|
@@ -56,18 +57,18 @@ func New(opts Options) *workflowBackend { | |
} | ||
} | ||
|
||
func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) error { | ||
wfbe.lock.Lock() | ||
defer wfbe.lock.Unlock() | ||
func (w *workflowBackend) Init(ctx context.Context, comp compapi.Component) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't rename variables for no reason, it causes unnecessary merge conflicts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By convention we use single letter variable names for method receiver variables. This gives overhead to the reader as it is unexpected. |
||
w.lock.Lock() | ||
defer w.lock.Unlock() | ||
|
||
if wfbe.backend != nil { | ||
if w.backend != nil { | ||
// Can only have 1 workflow backend component | ||
return errors.New("cannot create more than one workflow backend component") | ||
} | ||
|
||
// Create the component | ||
fName := comp.LogName() | ||
beFactory, err := wfbe.registry.Create(comp.Spec.Type, comp.Spec.Version, fName) | ||
beFactory, err := w.registry.Create(comp.Spec.Type, comp.Spec.Version, fName) | ||
if err != nil { | ||
log.Errorf("Error creating workflow backend component (%s): %v", fName, err) | ||
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name) | ||
|
@@ -79,14 +80,14 @@ func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) e | |
} | ||
|
||
// Initialization | ||
baseMetadata, err := wfbe.meta.ToBaseMetadata(comp) | ||
baseMetadata, err := w.meta.ToBaseMetadata(comp) | ||
if err != nil { | ||
diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name) | ||
return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) | ||
} | ||
|
||
be, err := beFactory(wfbeComp.Metadata{ | ||
AppID: wfbe.appID, | ||
AppID: w.appID, | ||
Base: baseMetadata, | ||
}) | ||
if err != nil { | ||
|
@@ -96,21 +97,35 @@ func (wfbe *workflowBackend) Init(ctx context.Context, comp compapi.Component) e | |
|
||
log.Infof("Using %s as workflow backend", comp.Spec.Type) | ||
diag.DefaultMonitoring.ComponentInitialized(comp.Spec.Type) | ||
wfbe.backend = be | ||
w.backend = be | ||
w.compStore.AddWorkflowBackend(comp.Name, be) | ||
|
||
return nil | ||
} | ||
|
||
func (wfbe *workflowBackend) Close(comp compapi.Component) error { | ||
return nil | ||
func (w *workflowBackend) Close(comp compapi.Component) error { | ||
w.lock.Lock() | ||
defer w.lock.Unlock() | ||
|
||
backend, ok := w.compStore.GetWorkflowBackend(comp.Name) | ||
if !ok { | ||
return nil | ||
} | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) | ||
defer cancel() | ||
defer w.compStore.DeleteWorkflowBackend(comp.Name) | ||
w.backend = nil | ||
|
||
return backend.Stop(ctx) | ||
} | ||
|
||
func (wfbe *workflowBackend) GetBackend() (backend.Backend, bool) { | ||
wfbe.lock.Lock() | ||
defer wfbe.lock.Unlock() | ||
func (w *workflowBackend) Backend() (backend.Backend, bool) { | ||
w.lock.Lock() | ||
defer w.lock.Unlock() | ||
|
||
if wfbe.backend == nil { | ||
if w.backend == nil { | ||
return nil, false | ||
} | ||
return wfbe.backend, true | ||
return w.backend, true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the correct type for the workflow backend implementation interface. Notice the other component types are using the interface types defined in
components-contrib
however backends is usingdurabletask-go
directly. This wasn't picked up before as this field wasn't previously being used (workflowbackend.go
didn't exist).wfbe.WorkflowBackend
is a factory func type.