Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workflow] Make workflow engine configurable (and other improvements) #7090

Merged
merged 22 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
708af2c
Workflow configuration support + code cleanup
cgillum Oct 24, 2023
e29ec26
Testing and logging improvements + fixes
cgillum Oct 24, 2023
60401b1
Adding configuration tests
cgillum Oct 24, 2023
8edf866
PR feedback and renamed properties to be a bit more specific
cgillum Oct 26, 2023
abaaf67
Merge branch 'master' into workflow-config
cgillum Oct 26, 2023
3583dfe
PR feedback: if/else --> switch
cgillum Oct 27, 2023
d773e3b
Merge branch 'master' into workflow-config
cgillum Oct 27, 2023
77f5a19
More PR feedback + increase default values
cgillum Oct 31, 2023
cf7d32d
Merge branch 'master' into workflow-config
cgillum Oct 31, 2023
5e54218
Use new getters
cgillum Oct 31, 2023
59193c5
PR Feedback: Revert defaults to their original values
cgillum Nov 10, 2023
78f7441
Merge branch 'master' into workflow-config
cgillum Nov 10, 2023
050d036
Merge branch 'master' into workflow-config
artursouza Nov 14, 2023
6a8913c
Fixing code comments
cgillum Nov 14, 2023
6c797b5
Merge branch 'master' into workflow-config
mukundansundar Nov 15, 2023
e32647e
Merge branch 'master' into workflow-config
mukundansundar Nov 15, 2023
9c01990
PR feedback and fix config unit test
cgillum Nov 15, 2023
6b77b7f
Merge branch 'master' into workflow-config
cgillum Nov 15, 2023
9ed79ee
Merge branch 'master' into workflow-config
ItalyPaleAle Nov 20, 2023
bf86875
Merge branch 'master' into workflow-config
mukundansundar Nov 21, 2023
dfeca70
Merge branch 'master' into workflow-config
mukundansundar Nov 21, 2023
f05ffc7
Merge branch 'master' into workflow-config
dapr-bot Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ type ConfigurationSpec struct {
ComponentsSpec *ComponentsSpec `json:"components,omitempty" yaml:"components,omitempty"`
LoggingSpec *LoggingSpec `json:"logging,omitempty" yaml:"logging,omitempty"`
WasmSpec *WasmSpec `json:"wasm,omitempty" yaml:"wasm,omitempty"`
WorkflowSpec *WorkflowSpec `json:"workflow,omitempty" yaml:"workflow,omitempty"`
}

// WorkflowSpec defines the configuration for Dapr workflows.
type WorkflowSpec struct {
cgillum marked this conversation as resolved.
Show resolved Hide resolved
// MaxConcurrentWorkflows is the maximum number of concurrent workflows that can be processed by a single Dapr instance.
// If omitted, the default value of 1000 will be used.
cgillum marked this conversation as resolved.
Show resolved Hide resolved
MaxConcurrentWorkflows int32 `json:"maxConcurrentWorkflows,omitempty" yaml:"maxConcurrentWorkflows,omitempty"`
// MaxConcurrentActivities is the maximum number of concurrent activities that can be processed by a single Dapr instance.
// If omitted, the default value of 1000 will be used.
cgillum marked this conversation as resolved.
Show resolved Hide resolved
MaxConcurrentActivities int32 `json:"maxConcurrentActivities,omitempty" yaml:"maxConcurrentActivities,omitempty"`
}

type SecretsSpec struct {
Expand Down Expand Up @@ -582,6 +593,18 @@ func (c Configuration) GetAPILoggingSpec() APILoggingSpec {
return *c.Spec.LoggingSpec.APILogging
}

// GetWorkflowSpec returns the Workflow spec.
// It's a short-hand that includes nil-checks for safety.
func (c *Configuration) GetWorkflowSpec() WorkflowSpec {
if c == nil || c.Spec.WorkflowSpec == nil {
return WorkflowSpec{
cgillum marked this conversation as resolved.
Show resolved Hide resolved
MaxConcurrentWorkflows: 100,
cgillum marked this conversation as resolved.
Show resolved Hide resolved
MaxConcurrentActivities: 100,
}
}
return *c.Spec.WorkflowSpec
}

// ToYAML returns the Configuration represented as YAML.
func (c *Configuration) ToYAML() (string, error) {
b, err := yaml.Marshal(c)
Expand Down
19 changes: 19 additions & 0 deletions pkg/config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,25 @@ func TestLoadStandaloneConfiguration(t *testing.T) {
assert.Equal(t, "1h", mtlsSpec.AllowedClockSkew)
})

t.Run("workflow spec - configured", func(t *testing.T) {
config, err := LoadStandaloneConfiguration("./testdata/workflow_config.yaml")
require.NoError(t, err)
workflowSpec := config.GetWorkflowSpec()
assert.Equal(t, int32(32), workflowSpec.MaxConcurrentWorkflows)
assert.Equal(t, int32(64), workflowSpec.MaxConcurrentActivities)
})

t.Run("workflow spec - defaults", func(t *testing.T) {
// Intentionally loading an unrelated config file to test defaults
config, err := LoadStandaloneConfiguration("./testdata/mtls_config.yaml")
require.NoError(t, err)
workflowSpec := config.GetWorkflowSpec()

// These are the documented default values. Changes to these defaults require changes to documentation.
assert.Equal(t, int32(100), workflowSpec.MaxConcurrentWorkflows)
assert.Equal(t, int32(100), workflowSpec.MaxConcurrentActivities)
})

t.Run("multiple configurations", func(t *testing.T) {
config, err := LoadStandaloneConfiguration("./testdata/feature_config.yaml", "./testdata/mtls_config.yaml")
require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/testdata/workflow_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprsystem
namespace: default
spec:
workflow:
maxConcurrentWorkflows: 32
maxConcurrentActivities: 64
6 changes: 3 additions & 3 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@

grpc := createGRPCManager(sec, runtimeConfig, globalConfig)

wfe := wfengine.NewWorkflowEngine(wfengine.NewWorkflowConfig(runtimeConfig.id))
wfe := wfengine.NewWorkflowEngine(runtimeConfig.id, globalConfig.GetWorkflowSpec())
wfe.ConfigureGrpcExecutor()

channels := channels.New(channels.Options{
Expand Down Expand Up @@ -586,8 +586,8 @@
a.daprUniversalAPI.SetActorRuntime(a.actor)
}
} else {
// If actors are not enabled, still invoke SetActorRuntime on the workflow engine with `nil` to unblock the goroutine
a.workflowEngine.SetActorRuntime(a.actor)
// If actors are not enabled, still invoke SetActorRuntime on the workflow engine with `nil` to unblock startup
a.workflowEngine.SetActorRuntime(nil)

Check warning on line 590 in pkg/runtime/runtime.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/runtime.go#L590

Added line #L590 was not covered by tests
}

// We set actors as initialized whether we have an actors runtime or not
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ DEBU[0000] activity-processor: waiting for new work items... app_id=wfapp
Unit tests can be run using the following `go` command from the repo root. Depending on the speed of your development machine, these tests should complete in less than 30 seconds.

```bash
go test ./pkg/runtime/wfengine/...
go test ./pkg/runtime/wfengine/... -tags=unit
```

If you're using VS Code, you can also run tests directly from the IDE.
Expand Down
24 changes: 15 additions & 9 deletions pkg/runtime/wfengine/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@

type activityActor struct {
actorRuntime actors.Actors
scheduler workflowScheduler
scheduler activityScheduler
statesCache sync.Map
cachingDisabled bool
defaultTimeout time.Duration
reminderInterval time.Duration
config wfConfig
config actorsBackendConfig
}

// ActivityRequest represents a request by a worklow to invoke an activity.
Expand All @@ -54,13 +54,16 @@
EventPayload []byte
}

// activityScheduler is a func interface for pushing activity work items into the backend
type activityScheduler func(ctx context.Context, wi *backend.ActivityWorkItem) error

// NewActivityActor creates an internal activity actor for executing workflow activity logic.
func NewActivityActor(scheduler workflowScheduler, config wfConfig) *activityActor {
func NewActivityActor(scheduler activityScheduler, backendConfig actorsBackendConfig) *activityActor {
return &activityActor{
scheduler: scheduler,
defaultTimeout: 1 * time.Hour,
reminderInterval: 1 * time.Minute,
config: config,
config: backendConfig,
}
}

Expand Down Expand Up @@ -117,6 +120,11 @@
if errors.Is(err, context.DeadlineExceeded) {
cgillum marked this conversation as resolved.
Show resolved Hide resolved
wfLogger.Warnf("%s: execution of '%s' timed-out and will be retried later: %v", actorID, reminderName, err)

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else if errors.Is(err, context.Canceled) {
wfLogger.Warnf("%s: received cancellation signal while waiting for activity execution '%s'", actorID, reminderName)

Check warning on line 126 in pkg/runtime/wfengine/activity.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/activity.go#L126

Added line #L126 was not covered by tests

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else if _, ok := err.(recoverableError); ok {
Expand All @@ -130,8 +138,6 @@
}
}

// TODO: Purge actor state based on some data retention policy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this TODO since this work was already done in v1.11.


// We delete the reminder on success and on non-recoverable errors.
return actors.ErrReminderCanceled
}
Expand Down Expand Up @@ -162,7 +168,7 @@
// introduce some kind of heartbeat protocol to help identify such cases.
callback := make(chan bool)
wi.Properties[CallbackChannelProperty] = callback
if err = a.scheduler.ScheduleActivity(ctx, wi); err != nil {
if err = a.scheduler(ctx, wi); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return newRecoverableError(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err))
}
Expand All @@ -177,7 +183,7 @@
if !t.Stop() {
<-t.C
}
return ctx.Err()
return ctx.Err() // will be retried
case <-t.C:
if deadline, ok := ctx.Deadline(); ok {
wfLogger.Warnf("%s: '%s' is still running - will keep waiting until %v", actorID, name, deadline)
Expand All @@ -191,7 +197,7 @@
if completed {
break loop
} else {
return newRecoverableError(errExecutionAborted)
return newRecoverableError(errExecutionAborted) // AbandonActivityWorkItem was called

Check warning on line 200 in pkg/runtime/wfengine/activity.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/activity.go#L200

Added line #L200 was not covered by tests
}
}
}
Expand Down
119 changes: 87 additions & 32 deletions pkg/runtime/wfengine/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,52 +25,99 @@

"github.com/dapr/dapr/pkg/actors"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/utils"
)

// workflowScheduler is an interface for pushing work items into the backend
type workflowScheduler interface {
ScheduleWorkflow(ctx context.Context, wi *backend.OrchestrationWorkItem) error
ScheduleActivity(ctx context.Context, wi *backend.ActivityWorkItem) error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced the workflowScheduler interface with two separate func definitions: workflowScheduler and activityScheduler, defined in workflow.go and activity.go respectively. This change made it easier to refactor some of the configuration. No changes were made to the required method signatures.

// actorsBackendConfig is the configuration for the workflow engine's actors backend
type actorsBackendConfig struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct and the functions that follow were moved from wfengine.go to here since they are specific to the workflow backend. No changes were made to these.

AppID string
workflowActorType string
activityActorType string
}

// NewActorsBackendConfig creates a new workflow engine configuration
func NewActorsBackendConfig(appID string) actorsBackendConfig {
return actorsBackendConfig{
AppID: appID,
workflowActorType: actors.InternalActorTypePrefix + utils.GetNamespaceOrDefault(defaultNamespace) + utils.DotDelimiter + appID + utils.DotDelimiter + WorkflowNameLabelKey,
activityActorType: actors.InternalActorTypePrefix + utils.GetNamespaceOrDefault(defaultNamespace) + utils.DotDelimiter + appID + utils.DotDelimiter + ActivityNameLabelKey,
}
}

// String implements fmt.Stringer and is primarily used for debugging purposes.
func (c *actorsBackendConfig) String() string {
if c == nil {
return "(nil)"

Check warning on line 50 in pkg/runtime/wfengine/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backend.go#L49-L50

Added lines #L49 - L50 were not covered by tests
}
return fmt.Sprintf("AppID:'%s', workflowActorType:'%s', activityActorType:'%s'", c.AppID, c.workflowActorType, c.activityActorType)

Check warning on line 52 in pkg/runtime/wfengine/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backend.go#L52

Added line #L52 was not covered by tests
}

type actorBackend struct {
actors actors.Actors
orchestrationWorkItemChan chan *backend.OrchestrationWorkItem
activityWorkItemChan chan *backend.ActivityWorkItem
startedOnce sync.Once
config wfConfig
config actorsBackendConfig
workflowActor *workflowActor
activityActor *activityActor
}

func NewActorBackend(engine *WorkflowEngine) *actorBackend {
func NewActorBackend(appID string) *actorBackend {
backendConfig := NewActorsBackendConfig(appID)

// These channels are used by actors to call into this backend object
orchestrationWorkItemChan := make(chan *backend.OrchestrationWorkItem)
activityWorkItemChan := make(chan *backend.ActivityWorkItem)

return &actorBackend{
orchestrationWorkItemChan: make(chan *backend.OrchestrationWorkItem),
activityWorkItemChan: make(chan *backend.ActivityWorkItem),
config: engine.config,
orchestrationWorkItemChan: orchestrationWorkItemChan,
activityWorkItemChan: activityWorkItemChan,
config: backendConfig,
workflowActor: NewWorkflowActor(getWorkflowScheduler(orchestrationWorkItemChan), backendConfig),
activityActor: NewActivityActor(getActivityScheduler(activityWorkItemChan), backendConfig),
}
}

func (be *actorBackend) SetActorRuntime(actors actors.Actors) {
be.actors = actors
// getWorkflowScheduler returns a workflowScheduler func that sends an orchestration work item to the Durable Task Framework.
func getWorkflowScheduler(orchestrationWorkItemChan chan *backend.OrchestrationWorkItem) workflowScheduler {
return func(ctx context.Context, wi *backend.OrchestrationWorkItem) error {
wfLogger.Debugf("%s: scheduling workflow execution with durabletask engine", wi.InstanceID)
select {
case <-ctx.Done(): // <-- engine is shutting down or a caller timeout expired
return ctx.Err()

Check warning on line 87 in pkg/runtime/wfengine/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backend.go#L86-L87

Added lines #L86 - L87 were not covered by tests
case orchestrationWorkItemChan <- wi: // blocks until the engine is ready to process the work item
return nil
}
}
}

// ScheduleActivity implements workflowScheduler
func (be *actorBackend) ScheduleActivity(ctx context.Context, wi *backend.ActivityWorkItem) error {
select {
case <-ctx.Done():
return ctx.Err()
case be.activityWorkItemChan <- wi:
return nil
// getActivityScheduler returns an activityScheduler func that sends an activity work item to the Durable Task Framework.
func getActivityScheduler(activityWorkItemChan chan *backend.ActivityWorkItem) activityScheduler {
return func(ctx context.Context, wi *backend.ActivityWorkItem) error {
wfLogger.Debugf(
"%s: scheduling [%s#%d] activity execution with durabletask engine",
wi.InstanceID,
wi.NewEvent.GetTaskScheduled().GetName(),
wi.NewEvent.GetEventId())
select {
case <-ctx.Done(): // engine is shutting down
return ctx.Err()

Check warning on line 104 in pkg/runtime/wfengine/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backend.go#L103-L104

Added lines #L103 - L104 were not covered by tests
case activityWorkItemChan <- wi: // blocks until the engine is ready to process the work item
return nil
}
}
}

// ScheduleWorkflow implements workflowScheduler
func (be *actorBackend) ScheduleWorkflow(ctx context.Context, wi *backend.OrchestrationWorkItem) error {
select {
case <-ctx.Done():
return ctx.Err()
case be.orchestrationWorkItemChan <- wi:
return nil
}
// InternalActors returns a map of internal actors that are used to implement workflows
func (be *actorBackend) GetInternalActorsMap() map[string]actors.InternalActor {
internalActors := make(map[string]actors.InternalActor)
internalActors[be.config.workflowActorType] = be.workflowActor
internalActors[be.config.activityActorType] = be.activityActor
return internalActors
}

func (be *actorBackend) SetActorRuntime(actors actors.Actors) {
be.actors = actors
}

// CreateOrchestrationInstance implements backend.Backend and creates a new workflow instance.
Expand Down Expand Up @@ -210,12 +257,18 @@

// GetActivityWorkItem implements backend.Backend
func (be *actorBackend) GetActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error) {
// Wait for the workflow actor to signal us with some work to do
// Wait for the activity actor to signal us with some work to do
wfLogger.Debug("Actor backend is waiting for an activity actor to schedule an invocation.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These debug logs help make it easier to understand what's going on if a workflow or activity doesn't run when we expect it to (a problem that we've often seen but lacked clarity on what's actually stuck).

select {
case <-ctx.Done():
return nil, ctx.Err()
case wi := <-be.activityWorkItemChan:
wfLogger.Debugf(
"Actor backend received a [%s#%d] activity task for workflow '%s'.",
wi.NewEvent.GetTaskScheduled().GetName(),
wi.NewEvent.GetEventId(),
wi.InstanceID)
return wi, nil
case <-ctx.Done():
return nil, ctx.Err()

Check warning on line 271 in pkg/runtime/wfengine/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backend.go#L270-L271

Added lines #L270 - L271 were not covered by tests
}
}

Expand All @@ -227,11 +280,13 @@
// GetOrchestrationWorkItem implements backend.Backend
func (be *actorBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error) {
// Wait for the workflow actor to signal us with some work to do
wfLogger.Debug("Actor backend is waiting for a workflow actor to schedule an invocation.")
select {
case <-ctx.Done():
return nil, ctx.Err()
case wi := <-be.orchestrationWorkItemChan:
wfLogger.Debugf("Actor backend received a workflow task for workflow '%s'.", wi.InstanceID)
return wi, nil
case <-ctx.Done():
return nil, ctx.Err()

Check warning on line 289 in pkg/runtime/wfengine/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backend.go#L288-L289

Added lines #L288 - L289 were not covered by tests
}
}

Expand Down Expand Up @@ -266,7 +321,7 @@

// String displays the type information
func (be *actorBackend) String() string {
return fmt.Sprintf("dapr.actors/v1-alpha")
return "dapr.actors/v1-beta"
cgillum marked this conversation as resolved.
Show resolved Hide resolved
}

func (be *actorBackend) validateConfiguration() error {
Expand Down