Skip to content

Commit

Permalink
[Workflow] Make workflow engine configurable (and other improvements) (
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum committed Nov 21, 2023
1 parent a86f9d6 commit bef0ca2
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 136 deletions.
16 changes: 16 additions & 0 deletions pkg/apis/configuration/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ type ConfigurationSpec struct {
LoggingSpec *LoggingSpec `json:"logging,omitempty"`
// +optional
WasmSpec *WasmSpec `json:"wasm,omitempty"`
// +optional
WorkflowSpec *WorkflowSpec `json:"workflow,omitempty"`
}

// WorkflowSpec defines the configuration for Dapr workflows.
type WorkflowSpec struct {
// maxConcurrentWorkflowInvocations is the maximum number of concurrent workflow invocations that can be scheduled by a single Dapr instance.
// Attempted invocations beyond this will be queued until the number of concurrent invocations drops below this value.
// If omitted, the default value of 100 will be used.
// +optional
MaxConcurrentWorkflowInvocations int32 `json:"maxConcurrentWorkflowInvocations,omitempty"`
// maxConcurrentActivityInvocations is the maximum number of concurrent activities that can be processed by a single Dapr instance.
// Attempted invocations beyond this will be queued until the number of concurrent invocations drops below this value.
// If omitted, the default value of 100 will be used.
// +optional
MaxConcurrentActivityInvocations int32 `json:"maxConcurrentActivityInvocations,omitempty"`
}

// APISpec describes the configuration for Dapr APIs.
Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const (
DefaultNamespace = "default"
ActionPolicyApp = "app"
ActionPolicyGlobal = "global"

defaultMaxWorkflowConcurrentInvocations = 100
defaultMaxActivityConcurrentInvocations = 100
)

// Configuration is an internal (and duplicate) representation of Dapr's Configuration CRD.
Expand Down Expand Up @@ -112,6 +115,33 @@ type ConfigurationSpec struct {
ComponentsSpec *ComponentsSpec `json:"components,omitempty" yaml:"components,omitempty"`
LoggingSpec *LoggingSpec `json:"logging,omitempty" yaml:"logging,omitempty"`
WasmSpec *WasmSpec `json:"wasm,omitempty" yaml:"wasm,omitempty"`
WorkflowSpec *WorkflowSpec `json:"workflow,omitempty" yaml:"workflow,omitempty"`
}

// WorkflowSpec defines the configuration for Dapr workflows.
type WorkflowSpec struct {
// maxConcurrentWorkflowInvocations is the maximum number of concurrent workflow invocations that can be scheduled by a single Dapr instance.
// Attempted invocations beyond this will be queued until the number of concurrent invocations drops below this value.
// If omitted, the default value of 100 will be used.
MaxConcurrentWorkflowInvocations int32 `json:"maxConcurrentWorkflowInvocations,omitempty" yaml:"maxConcurrentWorkflowInvocations,omitempty"`
// maxConcurrentActivityInvocations is the maximum number of concurrent activities that can be processed by a single Dapr instance.
// Attempted invocations beyond this will be queued until the number of concurrent invocations drops below this value.
// If omitted, the default value of 100 will be used.
MaxConcurrentActivityInvocations int32 `json:"maxConcurrentActivityInvocations,omitempty" yaml:"maxConcurrentActivityInvocations,omitempty"`
}

func (w *WorkflowSpec) GetMaxConcurrentWorkflowInvocations() int32 {
if w == nil || w.MaxConcurrentWorkflowInvocations <= 0 {
return defaultMaxWorkflowConcurrentInvocations
}
return w.MaxConcurrentWorkflowInvocations
}

func (w *WorkflowSpec) GetMaxConcurrentActivityInvocations() int32 {
if w == nil || w.MaxConcurrentActivityInvocations <= 0 {
return defaultMaxActivityConcurrentInvocations
}
return w.MaxConcurrentActivityInvocations
}

type SecretsSpec struct {
Expand Down Expand Up @@ -363,6 +393,10 @@ func LoadDefaultConfiguration() *Configuration {
DefaultAction: AllowAccess,
TrustDomain: "public",
},
WorkflowSpec: &WorkflowSpec{
MaxConcurrentWorkflowInvocations: defaultMaxWorkflowConcurrentInvocations,
MaxConcurrentActivityInvocations: defaultMaxActivityConcurrentInvocations,
},
},
}
}
Expand Down Expand Up @@ -582,6 +616,18 @@ func (c Configuration) GetAPILoggingSpec() APILoggingSpec {
return *c.Spec.LoggingSpec.APILogging
}

// GetWorkflowSpec returns the Workflow spec.
// It's a short-hand that includes nil-checks for safety.
func (c *Configuration) GetWorkflowSpec() WorkflowSpec {
if c == nil || c.Spec.WorkflowSpec == nil {
return WorkflowSpec{
MaxConcurrentWorkflowInvocations: defaultMaxWorkflowConcurrentInvocations,
MaxConcurrentActivityInvocations: defaultMaxActivityConcurrentInvocations,
}
}
return *c.Spec.WorkflowSpec
}

// ToYAML returns the Configuration represented as YAML.
func (c *Configuration) ToYAML() (string, error) {
b, err := yaml.Marshal(c)
Expand Down
19 changes: 19 additions & 0 deletions pkg/config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,25 @@ func TestLoadStandaloneConfiguration(t *testing.T) {
assert.Equal(t, "1h", mtlsSpec.AllowedClockSkew)
})

t.Run("workflow spec - configured", func(t *testing.T) {
config, err := LoadStandaloneConfiguration("./testdata/workflow_config.yaml")
require.NoError(t, err)
workflowSpec := config.GetWorkflowSpec()
assert.Equal(t, int32(32), workflowSpec.MaxConcurrentWorkflowInvocations)
assert.Equal(t, int32(64), workflowSpec.MaxConcurrentActivityInvocations)
})

t.Run("workflow spec - defaults", func(t *testing.T) {
// Intentionally loading an unrelated config file to test defaults
config, err := LoadStandaloneConfiguration("./testdata/mtls_config.yaml")
require.NoError(t, err)
workflowSpec := config.GetWorkflowSpec()

// These are the documented default values. Changes to these defaults require changes to
assert.Equal(t, int32(100), workflowSpec.MaxConcurrentWorkflowInvocations)
assert.Equal(t, int32(100), workflowSpec.MaxConcurrentActivityInvocations)
})

t.Run("multiple configurations", func(t *testing.T) {
config, err := LoadStandaloneConfiguration("./testdata/feature_config.yaml", "./testdata/mtls_config.yaml")
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/testdata/override_gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ spec:
features:
- name: Test.Feature
enabled: true
workflow:
maxConcurrentWorkflowInvocations: 100
maxConcurrentActivityInvocations: 100
3 changes: 3 additions & 0 deletions pkg/config/testdata/override_spec_gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ accessControl:
features:
- name: Test.Feature
enabled: true
workflow:
maxConcurrentWorkflowInvocations: 100
maxConcurrentActivityInvocations: 100
9 changes: 9 additions & 0 deletions pkg/config/testdata/workflow_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprsystem
namespace: default
spec:
workflow:
maxConcurrentWorkflowInvocations: 32
maxConcurrentActivityInvocations: 64
6 changes: 3 additions & 3 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func newDaprRuntime(ctx context.Context,

grpc := createGRPCManager(sec, runtimeConfig, globalConfig)

wfe := wfengine.NewWorkflowEngine(wfengine.NewWorkflowConfig(runtimeConfig.id))
wfe := wfengine.NewWorkflowEngine(runtimeConfig.id, globalConfig.GetWorkflowSpec())
wfe.ConfigureGrpcExecutor()

channels := channels.New(channels.Options{
Expand Down Expand Up @@ -584,8 +584,8 @@ func (a *DaprRuntime) appHealthReadyInit(ctx context.Context) error {
a.daprUniversalAPI.SetActorRuntime(a.actor)
}
} else {
// If actors are not enabled, still invoke SetActorRuntime on the workflow engine with `nil` to unblock the goroutine
a.workflowEngine.SetActorRuntime(a.actor)
// If actors are not enabled, still invoke SetActorRuntime on the workflow engine with `nil` to unblock startup
a.workflowEngine.SetActorRuntime(nil)
}

// We set actors as initialized whether we have an actors runtime or not
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ DEBU[0000] activity-processor: waiting for new work items... app_id=wfapp
Unit tests can be run using the following `go` command from the repo root. Depending on the speed of your development machine, these tests should complete in less than 30 seconds.

```bash
go test ./pkg/runtime/wfengine/...
go test ./pkg/runtime/wfengine/... -tags=unit
```

If you're using VS Code, you can also run tests directly from the IDE.
Expand Down
33 changes: 19 additions & 14 deletions pkg/runtime/wfengine/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ const activityStateKey = "activityState"

type activityActor struct {
actorRuntime actors.Actors
scheduler workflowScheduler
scheduler activityScheduler
statesCache sync.Map
cachingDisabled bool
defaultTimeout time.Duration
reminderInterval time.Duration
config wfConfig
config actorsBackendConfig
}

// ActivityRequest represents a request by a worklow to invoke an activity.
Expand All @@ -54,13 +54,16 @@ type activityState struct {
EventPayload []byte
}

// activityScheduler is a func interface for pushing activity work items into the backend
type activityScheduler func(ctx context.Context, wi *backend.ActivityWorkItem) error

// NewActivityActor creates an internal activity actor for executing workflow activity logic.
func NewActivityActor(scheduler workflowScheduler, config wfConfig) *activityActor {
func NewActivityActor(scheduler activityScheduler, backendConfig actorsBackendConfig) *activityActor {
return &activityActor{
scheduler: scheduler,
defaultTimeout: 1 * time.Hour,
reminderInterval: 1 * time.Minute,
config: config,
config: backendConfig,
}
}

Expand Down Expand Up @@ -114,24 +117,26 @@ func (a *activityActor) InvokeReminder(ctx context.Context, actorID string, remi
defer cancelTimeout()

if err := a.executeActivity(timeoutCtx, actorID, reminderName, state.EventPayload); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
var recoverableErr *recoverableError
switch {
case errors.Is(err, context.DeadlineExceeded):
wfLogger.Warnf("%s: execution of '%s' timed-out and will be retried later: %v", actorID, reminderName, err)

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else if _, ok := err.(recoverableError); ok {
case errors.Is(err, context.Canceled):
wfLogger.Warnf("%s: received cancellation signal while waiting for activity execution '%s'", actorID, reminderName)
// Returning nil signals that we want the execution to be retried in the next period interval
return nil
case errors.As(err, &recoverableErr):
wfLogger.Warnf("%s: execution failed with a recoverable error and will be retried later: %v", actorID, err)

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else {
default:
wfLogger.Errorf("%s: execution failed with a non-recoverable error: %v", actorID, err)
// TODO: Reply with a failure - this requires support from durabletask-go to produce TaskFailure results
}
}

// TODO: Purge actor state based on some data retention policy

// We delete the reminder on success and on non-recoverable errors.
return actors.ErrReminderCanceled
}
Expand Down Expand Up @@ -162,7 +167,7 @@ func (a *activityActor) executeActivity(ctx context.Context, actorID string, nam
// introduce some kind of heartbeat protocol to help identify such cases.
callback := make(chan bool)
wi.Properties[CallbackChannelProperty] = callback
if err = a.scheduler.ScheduleActivity(ctx, wi); err != nil {
if err = a.scheduler(ctx, wi); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return newRecoverableError(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err))
}
Expand All @@ -177,7 +182,7 @@ loop:
if !t.Stop() {
<-t.C
}
return ctx.Err()
return ctx.Err() // will be retried
case <-t.C:
if deadline, ok := ctx.Deadline(); ok {
wfLogger.Warnf("%s: '%s' is still running - will keep waiting until %v", actorID, name, deadline)
Expand All @@ -191,7 +196,7 @@ loop:
if completed {
break loop
} else {
return newRecoverableError(errExecutionAborted)
return newRecoverableError(errExecutionAborted) // AbandonActivityWorkItem was called
}
}
}
Expand Down

0 comments on commit bef0ca2

Please sign in to comment.