Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workflow] Make workflow engine configurable (and other improvements) #7090

Merged
merged 22 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
708af2c
Workflow configuration support + code cleanup
cgillum Oct 24, 2023
e29ec26
Testing and logging improvements + fixes
cgillum Oct 24, 2023
60401b1
Adding configuration tests
cgillum Oct 24, 2023
8edf866
PR feedback and renamed properties to be a bit more specific
cgillum Oct 26, 2023
abaaf67
Merge branch 'master' into workflow-config
cgillum Oct 26, 2023
3583dfe
PR feedback: if/else --> switch
cgillum Oct 27, 2023
d773e3b
Merge branch 'master' into workflow-config
cgillum Oct 27, 2023
77f5a19
More PR feedback + increase default values
cgillum Oct 31, 2023
cf7d32d
Merge branch 'master' into workflow-config
cgillum Oct 31, 2023
5e54218
Use new getters
cgillum Oct 31, 2023
59193c5
PR Feedback: Revert defaults to their original values
cgillum Nov 10, 2023
78f7441
Merge branch 'master' into workflow-config
cgillum Nov 10, 2023
050d036
Merge branch 'master' into workflow-config
artursouza Nov 14, 2023
6a8913c
Fixing code comments
cgillum Nov 14, 2023
6c797b5
Merge branch 'master' into workflow-config
mukundansundar Nov 15, 2023
e32647e
Merge branch 'master' into workflow-config
mukundansundar Nov 15, 2023
9c01990
PR feedback and fix config unit test
cgillum Nov 15, 2023
6b77b7f
Merge branch 'master' into workflow-config
cgillum Nov 15, 2023
9ed79ee
Merge branch 'master' into workflow-config
ItalyPaleAle Nov 20, 2023
bf86875
Merge branch 'master' into workflow-config
mukundansundar Nov 21, 2023
dfeca70
Merge branch 'master' into workflow-config
mukundansundar Nov 21, 2023
f05ffc7
Merge branch 'master' into workflow-config
dapr-bot Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/apis/configuration/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ type ConfigurationSpec struct {
LoggingSpec *LoggingSpec `json:"logging,omitempty"`
// +optional
WasmSpec *WasmSpec `json:"wasm,omitempty"`
// +optional
WorkflowSpec *WorkflowSpec `json:"workflow,omitempty"`
}

// WorkflowSpec defines the configuration for Dapr workflows.
type WorkflowSpec struct {
// maxConcurrentWorkflowInvocations is the maximum number of concurrent workflow invocations that can be scheduled by a single Dapr instance.
// Attempted invocations beyond this will be queued until the number of concurrent invocations drops below this value.
// If omitted, the default value of 100 will be used.
// +optional
MaxConcurrentWorkflowInvocations int32 `json:"maxConcurrentWorkflowInvocations,omitempty"`
// maxConcurrentActivityInvocations is the maximum number of concurrent activities that can be processed by a single Dapr instance.
// Attempted invocations beyond this will be queued until the number of concurrent invocations drops below this value.
// If omitted, the default value of 100 will be used.
// +optional
MaxConcurrentActivityInvocations int32 `json:"maxConcurrentActivityInvocations,omitempty"`
}

// APISpec describes the configuration for Dapr APIs.
Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
DefaultNamespace = "default"
ActionPolicyApp = "app"
ActionPolicyGlobal = "global"

defaultMaxWorkflowConcurrentInvocations = 100
defaultMaxActivityConcurrentInvocations = 100
)

// Configuration is an internal (and duplicate) representation of Dapr's Configuration CRD.
Expand Down Expand Up @@ -112,6 +115,33 @@
ComponentsSpec *ComponentsSpec `json:"components,omitempty" yaml:"components,omitempty"`
LoggingSpec *LoggingSpec `json:"logging,omitempty" yaml:"logging,omitempty"`
WasmSpec *WasmSpec `json:"wasm,omitempty" yaml:"wasm,omitempty"`
WorkflowSpec *WorkflowSpec `json:"workflow,omitempty" yaml:"workflow,omitempty"`
}

// WorkflowSpec defines the configuration for Dapr workflows.
type WorkflowSpec struct {
cgillum marked this conversation as resolved.
Show resolved Hide resolved
// maxConcurrentWorkflowInvocations is the maximum number of concurrent workflow invocations that can be scheduled by a single Dapr instance.
// Attempted invocations beyond this will be queued until the number of concurrent invocations drops below this value.
// If omitted, the default value of 100 will be used.
MaxConcurrentWorkflowInvocations int32 `json:"maxConcurrentWorkflowInvocations,omitempty" yaml:"maxConcurrentWorkflowInvocations,omitempty"`
// maxConcurrentActivityInvocations is the maximum number of concurrent activities that can be processed by a single Dapr instance.
// Attempted invocations beyond this will be queued until the number of concurrent invocations drops below this value.
// If omitted, the default value of 100 will be used.
MaxConcurrentActivityInvocations int32 `json:"maxConcurrentActivityInvocations,omitempty" yaml:"maxConcurrentActivityInvocations,omitempty"`
}

func (w *WorkflowSpec) GetMaxConcurrentWorkflowInvocations() int32 {
if w == nil || w.MaxConcurrentWorkflowInvocations <= 0 {
return defaultMaxWorkflowConcurrentInvocations

Check warning on line 135 in pkg/config/configuration.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/configuration.go#L134-L135

Added lines #L134 - L135 were not covered by tests
}
return w.MaxConcurrentWorkflowInvocations

Check warning on line 137 in pkg/config/configuration.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/configuration.go#L137

Added line #L137 was not covered by tests
}

func (w *WorkflowSpec) GetMaxConcurrentActivityInvocations() int32 {
if w == nil || w.MaxConcurrentActivityInvocations <= 0 {
return defaultMaxActivityConcurrentInvocations

Check warning on line 142 in pkg/config/configuration.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/configuration.go#L141-L142

Added lines #L141 - L142 were not covered by tests
}
return w.MaxConcurrentActivityInvocations

Check warning on line 144 in pkg/config/configuration.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/configuration.go#L144

Added line #L144 was not covered by tests
}

type SecretsSpec struct {
Expand Down Expand Up @@ -363,6 +393,10 @@
DefaultAction: AllowAccess,
TrustDomain: "public",
},
WorkflowSpec: &WorkflowSpec{
MaxConcurrentWorkflowInvocations: defaultMaxWorkflowConcurrentInvocations,
MaxConcurrentActivityInvocations: defaultMaxActivityConcurrentInvocations,
},
},
}
}
Expand Down Expand Up @@ -582,6 +616,18 @@
return *c.Spec.LoggingSpec.APILogging
}

// GetWorkflowSpec returns the Workflow spec.
// It's a short-hand that includes nil-checks for safety.
func (c *Configuration) GetWorkflowSpec() WorkflowSpec {
if c == nil || c.Spec.WorkflowSpec == nil {
return WorkflowSpec{
cgillum marked this conversation as resolved.
Show resolved Hide resolved
MaxConcurrentWorkflowInvocations: defaultMaxWorkflowConcurrentInvocations,
MaxConcurrentActivityInvocations: defaultMaxActivityConcurrentInvocations,

Check warning on line 625 in pkg/config/configuration.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/configuration.go#L623-L625

Added lines #L623 - L625 were not covered by tests
}
}
return *c.Spec.WorkflowSpec
}

// ToYAML returns the Configuration represented as YAML.
func (c *Configuration) ToYAML() (string, error) {
b, err := yaml.Marshal(c)
Expand Down
19 changes: 19 additions & 0 deletions pkg/config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,25 @@ func TestLoadStandaloneConfiguration(t *testing.T) {
assert.Equal(t, "1h", mtlsSpec.AllowedClockSkew)
})

t.Run("workflow spec - configured", func(t *testing.T) {
config, err := LoadStandaloneConfiguration("./testdata/workflow_config.yaml")
require.NoError(t, err)
workflowSpec := config.GetWorkflowSpec()
assert.Equal(t, int32(32), workflowSpec.MaxConcurrentWorkflowInvocations)
assert.Equal(t, int32(64), workflowSpec.MaxConcurrentActivityInvocations)
})

t.Run("workflow spec - defaults", func(t *testing.T) {
// Intentionally loading an unrelated config file to test defaults
config, err := LoadStandaloneConfiguration("./testdata/mtls_config.yaml")
require.NoError(t, err)
workflowSpec := config.GetWorkflowSpec()

// These are the documented default values. Changes to these defaults require changes to
assert.Equal(t, int32(100), workflowSpec.MaxConcurrentWorkflowInvocations)
assert.Equal(t, int32(100), workflowSpec.MaxConcurrentActivityInvocations)
})

t.Run("multiple configurations", func(t *testing.T) {
config, err := LoadStandaloneConfiguration("./testdata/feature_config.yaml", "./testdata/mtls_config.yaml")
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/testdata/override_gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ spec:
features:
- name: Test.Feature
enabled: true
workflow:
maxConcurrentWorkflowInvocations: 100
maxConcurrentActivityInvocations: 100
3 changes: 3 additions & 0 deletions pkg/config/testdata/override_spec_gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ accessControl:
features:
- name: Test.Feature
enabled: true
workflow:
maxConcurrentWorkflowInvocations: 100
maxConcurrentActivityInvocations: 100
9 changes: 9 additions & 0 deletions pkg/config/testdata/workflow_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprsystem
namespace: default
spec:
workflow:
maxConcurrentWorkflowInvocations: 32
maxConcurrentActivityInvocations: 64
6 changes: 3 additions & 3 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@

grpc := createGRPCManager(sec, runtimeConfig, globalConfig)

wfe := wfengine.NewWorkflowEngine(wfengine.NewWorkflowConfig(runtimeConfig.id))
wfe := wfengine.NewWorkflowEngine(runtimeConfig.id, globalConfig.GetWorkflowSpec())
wfe.ConfigureGrpcExecutor()

channels := channels.New(channels.Options{
Expand Down Expand Up @@ -584,8 +584,8 @@
a.daprUniversalAPI.SetActorRuntime(a.actor)
}
} else {
// If actors are not enabled, still invoke SetActorRuntime on the workflow engine with `nil` to unblock the goroutine
a.workflowEngine.SetActorRuntime(a.actor)
// If actors are not enabled, still invoke SetActorRuntime on the workflow engine with `nil` to unblock startup
a.workflowEngine.SetActorRuntime(nil)

Check warning on line 588 in pkg/runtime/runtime.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/runtime.go#L588

Added line #L588 was not covered by tests
}

// We set actors as initialized whether we have an actors runtime or not
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ DEBU[0000] activity-processor: waiting for new work items... app_id=wfapp
Unit tests can be run using the following `go` command from the repo root. Depending on the speed of your development machine, these tests should complete in less than 30 seconds.

```bash
go test ./pkg/runtime/wfengine/...
go test ./pkg/runtime/wfengine/... -tags=unit
```

If you're using VS Code, you can also run tests directly from the IDE.
Expand Down
33 changes: 19 additions & 14 deletions pkg/runtime/wfengine/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@

type activityActor struct {
actorRuntime actors.Actors
scheduler workflowScheduler
scheduler activityScheduler
statesCache sync.Map
cachingDisabled bool
defaultTimeout time.Duration
reminderInterval time.Duration
config wfConfig
config actorsBackendConfig
}

// ActivityRequest represents a request by a worklow to invoke an activity.
Expand All @@ -54,13 +54,16 @@
EventPayload []byte
}

// activityScheduler is a func interface for pushing activity work items into the backend
type activityScheduler func(ctx context.Context, wi *backend.ActivityWorkItem) error

// NewActivityActor creates an internal activity actor for executing workflow activity logic.
func NewActivityActor(scheduler workflowScheduler, config wfConfig) *activityActor {
func NewActivityActor(scheduler activityScheduler, backendConfig actorsBackendConfig) *activityActor {
return &activityActor{
scheduler: scheduler,
defaultTimeout: 1 * time.Hour,
reminderInterval: 1 * time.Minute,
config: config,
config: backendConfig,
}
}

Expand Down Expand Up @@ -114,24 +117,26 @@
defer cancelTimeout()

if err := a.executeActivity(timeoutCtx, actorID, reminderName, state.EventPayload); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
var recoverableErr *recoverableError
switch {
case errors.Is(err, context.DeadlineExceeded):
wfLogger.Warnf("%s: execution of '%s' timed-out and will be retried later: %v", actorID, reminderName, err)

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else if _, ok := err.(recoverableError); ok {
case errors.Is(err, context.Canceled):
wfLogger.Warnf("%s: received cancellation signal while waiting for activity execution '%s'", actorID, reminderName)

Check warning on line 127 in pkg/runtime/wfengine/activity.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/activity.go#L126-L127

Added lines #L126 - L127 were not covered by tests
// Returning nil signals that we want the execution to be retried in the next period interval
return nil
case errors.As(err, &recoverableErr):

Check warning on line 130 in pkg/runtime/wfengine/activity.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/activity.go#L129-L130

Added lines #L129 - L130 were not covered by tests
wfLogger.Warnf("%s: execution failed with a recoverable error and will be retried later: %v", actorID, err)

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else {
default:

Check warning on line 134 in pkg/runtime/wfengine/activity.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/activity.go#L134

Added line #L134 was not covered by tests
wfLogger.Errorf("%s: execution failed with a non-recoverable error: %v", actorID, err)
// TODO: Reply with a failure - this requires support from durabletask-go to produce TaskFailure results
}
}

// TODO: Purge actor state based on some data retention policy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this TODO since this work was already done in v1.11.


// We delete the reminder on success and on non-recoverable errors.
return actors.ErrReminderCanceled
}
Expand Down Expand Up @@ -162,7 +167,7 @@
// introduce some kind of heartbeat protocol to help identify such cases.
callback := make(chan bool)
wi.Properties[CallbackChannelProperty] = callback
if err = a.scheduler.ScheduleActivity(ctx, wi); err != nil {
if err = a.scheduler(ctx, wi); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return newRecoverableError(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err))
}
Expand All @@ -177,7 +182,7 @@
if !t.Stop() {
<-t.C
}
return ctx.Err()
return ctx.Err() // will be retried
case <-t.C:
if deadline, ok := ctx.Deadline(); ok {
wfLogger.Warnf("%s: '%s' is still running - will keep waiting until %v", actorID, name, deadline)
Expand All @@ -191,7 +196,7 @@
if completed {
break loop
} else {
return newRecoverableError(errExecutionAborted)
return newRecoverableError(errExecutionAborted) // AbandonActivityWorkItem was called

Check warning on line 199 in pkg/runtime/wfengine/activity.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/activity.go#L199

Added line #L199 was not covered by tests
}
}
}
Expand Down