Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow execution and scheduling latency #7370

Merged
merged 84 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
2203b81
Defining Workflow metrics
prateek041 Nov 6, 2023
fc670b3
Added logic to record metrics for:
prateek041 Nov 10, 2023
733bbfb
Added logic to record workflowRemindersTotal metric
prateek041 Nov 10, 2023
9519bb8
Added logic to Record Workflow/operations metrics for:
prateek041 Nov 21, 2023
4927cc4
Add logic to record Failed workflow/operations metrics
prateek041 Nov 21, 2023
a4025ed
Record metrics for Successful and Failed Wokflow/Activity
prateek041 Nov 21, 2023
9a3c080
Merged failed/success metrics into one
prateek041 Nov 26, 2023
1322841
Record Latency metrics for Operations and Executions
prateek041 Nov 26, 2023
6b1bf39
Questions regarding latency metrics
prateek041 Nov 26, 2023
624d435
Reducing functions to record metrics and code cleanup
prateek041 Dec 7, 2023
751d867
Added Operation test
prateek041 Dec 10, 2023
d139476
Added namespace and component key
prateek041 Dec 10, 2023
8dbee6e
Add tests for Reminders metrics
prateek041 Dec 10, 2023
7d35ed0
Added Execution metrics tests
prateek041 Dec 10, 2023
dbacff4
fix: fixed test conditions
prateek041 Dec 11, 2023
76a85da
cleanup
prateek041 Dec 15, 2023
9881dd8
resolving conflicts
prateek041 Dec 15, 2023
374df01
fix
prateek041 Dec 15, 2023
5238fa0
fixing minor changes
prateek041 Dec 18, 2023
a6ed62b
minor fix
prateek041 Dec 21, 2023
4260b94
Merge branch 'master' into workflow-metrics
prateek041 Dec 21, 2023
665b88e
Fixing minor Nits
prateek041 Dec 23, 2023
55e4373
Added metrics to docs
prateek041 Dec 25, 2023
612af02
Added comments for metrics description
prateek041 Dec 26, 2023
4a2581a
Removed reminders count metrics
prateek041 Dec 26, 2023
3ccbbe1
Merge branch 'master' into workflow-metrics
prateek041 Dec 26, 2023
f00c5cd
empty commit
shivamkm07 Dec 27, 2023
4c1d076
fixes
shivamkm07 Dec 27, 2023
eb920b8
Rename workflow_metrics.go -> workflow_monitoring.go
shivamkm07 Dec 28, 2023
4ee9224
Updating dapr-metrics docs
shivamkm07 Dec 28, 2023
0e899e2
typo
shivamkm07 Dec 28, 2023
b9b7ebc
Adding comment
shivamkm07 Dec 28, 2023
bc847c9
Avoid collecting metrics in case workflow is already completed before…
shivamkm07 Dec 28, 2023
d61f24e
linter fixes
shivamkm07 Jan 2, 2024
39d4ef8
Merge branch 'master' into workflow-metrics
shivamkm07 Jan 3, 2024
666f1bf
Update pkg/diagnostics/workflow_monitoring.go
shivamkm07 Jan 5, 2024
6126529
Merge branch 'master' into workflow-metrics
shivamkm07 Jan 8, 2024
29747f8
Adding activity execution metrics as separate metrics
shivamkm07 Jan 8, 2024
da4cb40
renaming componentName
shivamkm07 Jan 8, 2024
ed05c27
adding workflow latency execution
ASHIQUEMD Jan 8, 2024
4cbccba
Adding IT for metrics wf
shivamkm07 Jan 9, 2024
ec1b5fb
Merge branch 'master' into workflow-metrics
shivamkm07 Jan 9, 2024
508978f
linter fixes
shivamkm07 Jan 9, 2024
2c7355b
Merge branch 'master' into workflow-metrics
shivamkm07 Jan 10, 2024
56d0808
Merge branch 'master' into workflow-metrics
mukundansundar Jan 11, 2024
0907987
Merge branch 'master' into workflow-metrics
mukundansundar Jan 11, 2024
55ec256
Updating docs
shivamkm07 Jan 11, 2024
32aab9d
Removing componentKey from wf metrics
shivamkm07 Jan 11, 2024
10074e3
Merge latest changes of PR7152
ASHIQUEMD Jan 11, 2024
bd03e78
time calculation
ASHIQUEMD Jan 11, 2024
f02c2b2
Add schedule latency and execution latency of workflow
ASHIQUEMD Jan 11, 2024
fa8476d
Add schedule latency and execution latency of workflow
ASHIQUEMD Jan 11, 2024
87759c6
Add schedule latency and execution latency of workflow
ASHIQUEMD Jan 12, 2024
7be24f9
Add schedule latency and execution latency of workflow
ASHIQUEMD Jan 12, 2024
6a34086
update latency code
ASHIQUEMD Jan 12, 2024
f702699
update latency code
ASHIQUEMD Jan 12, 2024
866b757
update latency code
ASHIQUEMD Jan 12, 2024
f9cf2ef
Merge branch 'master' of https://github.com/dapr/dapr into workflow-m…
ASHIQUEMD Jan 15, 2024
f5db2e3
Fix unit tests
ASHIQUEMD Jan 15, 2024
6f8c058
add code comment
ASHIQUEMD Jan 15, 2024
5bed4b5
Merge branch 'master' into workflow-metrics
ASHIQUEMD Jan 15, 2024
e8e4323
Merge branch 'master' into workflow-metrics
yaron2 Jan 15, 2024
4d27037
Removing status from scheduling latency
ASHIQUEMD Jan 16, 2024
41c4586
Merge branch 'workflow-metrics' of https://github.com/ASHIQUEMD/dapr …
ASHIQUEMD Jan 16, 2024
68b2dfa
save start workflowtime to metdata
ASHIQUEMD Jan 17, 2024
b64a6fa
save start workflowtime to metdata
ASHIQUEMD Jan 17, 2024
8ad0237
save start workflowtime to metdata
ASHIQUEMD Jan 17, 2024
9378f9d
refactor wfstarttime capture by reading it from history
ASHIQUEMD Jan 17, 2024
b531abe
Update pkg/diagnostics/workflow_monitoring.go
ASHIQUEMD Jan 17, 2024
4160325
Merge branch 'master' into workflow-metrics
ASHIQUEMD Jan 17, 2024
a733773
Adding executionStartedTimestamp property to ExecutionStartedEvent me…
ASHIQUEMD Jan 17, 2024
baef5f4
Adding executionStartedTimestamp property to ExecutionStartedEvent me…
ASHIQUEMD Jan 18, 2024
b500b0e
Adding executionStartedTimestamp property to ExecutionStartedEvent me…
ASHIQUEMD Jan 18, 2024
ef7b35c
Merge branch 'master' into workflow-metrics
ASHIQUEMD Jan 18, 2024
eaa01d6
Merge branch 'master' into workflow-metrics
ASHIQUEMD Jan 19, 2024
f6419bd
Adding executionStartedTimestamp property to ExecutionStartedEvent me…
ASHIQUEMD Jan 19, 2024
c9a2029
Merge branch 'master' into workflow-metrics
ASHIQUEMD Jan 25, 2024
98d8190
Calculate dapr workflow latency
ASHIQUEMD Jan 25, 2024
bf5b8f8
Calculate dapr workflow latency
ASHIQUEMD Jan 25, 2024
12da6a5
Merge branch 'master' into workflow-metrics
ASHIQUEMD Jan 29, 2024
ac1ba7b
Add integration test for workflow execution and scheduling latency
ASHIQUEMD Jan 29, 2024
d4aa986
Merge branch 'master' into workflow-metrics
ASHIQUEMD Jan 29, 2024
f3871bf
Merge branch 'master' into workflow-metrics
mukundansundar Jan 29, 2024
17d83ff
Merge branch 'master' into workflow-metrics
mukundansundar Jan 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 38 additions & 4 deletions pkg/diagnostics/workflow_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@
activityExecutionCount *stats.Int64Measure
// activityExecutionLatency records time taken to run an activity to completion.
activityExecutionLatency *stats.Float64Measure
appID string
enabled bool
namespace string
// workflowExecutionLatency records time taken to run a workflow to completion.
workflowExecutionLatency *stats.Float64Measure
// workflowSchedulingLatency records time taken between workflow execution request and actual workflow execution
workflowSchedulingLatency *stats.Float64Measure
appID string
enabled bool
namespace string
}

func newWorkflowMetrics() *workflowMetrics {
Expand All @@ -79,6 +83,14 @@
"runtime/workflow/activity/execution/latency",
"The total time taken to run an activity to completion.",
stats.UnitMilliseconds),
workflowExecutionLatency: stats.Float64(
"runtime/workflow/execution/latency",
"The total time taken to run workflow to completion.",
stats.UnitMilliseconds),
workflowSchedulingLatency: stats.Float64(
"runtime/workflow/scheduling/latency",
"The time taken between workflow execution request and actual workflow execution.",
ASHIQUEMD marked this conversation as resolved.
Show resolved Hide resolved
stats.UnitMilliseconds),
}
}

Expand All @@ -97,7 +109,9 @@
diagUtils.NewMeasureView(w.workflowOperationLatency, []tag.Key{appIDKey, namespaceKey, operationKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(w.workflowExecutionCount, []tag.Key{appIDKey, namespaceKey, workflowNameKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.activityExecutionCount, []tag.Key{appIDKey, namespaceKey, activityNameKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.activityExecutionLatency, []tag.Key{appIDKey, namespaceKey, activityNameKey, statusKey}, defaultLatencyDistribution))
diagUtils.NewMeasureView(w.activityExecutionLatency, []tag.Key{appIDKey, namespaceKey, activityNameKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(w.workflowExecutionLatency, []tag.Key{appIDKey, namespaceKey, workflowNameKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(w.workflowSchedulingLatency, []tag.Key{appIDKey, namespaceKey, workflowNameKey, statusKey}, defaultLatencyDistribution))
}

// WorkflowOperationEvent records total number of Successful/Failed workflow Operations requests. It also records latency for those requests.
Expand All @@ -123,6 +137,26 @@
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowExecutionCount.Name(), appIDKey, w.appID, namespaceKey, w.namespace, workflowNameKey, workflowName, statusKey, status), w.workflowExecutionCount.M(1))
}

func (w *workflowMetrics) WorkflowExecutionLatency(ctx context.Context, workflowName, status string, elapsed float64) {
ASHIQUEMD marked this conversation as resolved.
Show resolved Hide resolved
if !w.IsEnabled() {
return

Check warning on line 142 in pkg/diagnostics/workflow_monitoring.go

View check run for this annotation

Codecov / codecov/patch

pkg/diagnostics/workflow_monitoring.go#L142

Added line #L142 was not covered by tests
}

if elapsed > 0 {
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowExecutionLatency.Name(), appIDKey, w.appID, namespaceKey, w.namespace, workflowNameKey, workflowName, statusKey, status), w.workflowExecutionLatency.M(elapsed))
}
}

func (w *workflowMetrics) WorkflowSchedulingLatency(ctx context.Context, workflowName, status string, elapsed float64) {
if !w.IsEnabled() {
return

Check warning on line 152 in pkg/diagnostics/workflow_monitoring.go

View check run for this annotation

Codecov / codecov/patch

pkg/diagnostics/workflow_monitoring.go#L152

Added line #L152 was not covered by tests
}

if elapsed > 0 {
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowSchedulingLatency.Name(), appIDKey, w.appID, namespaceKey, w.namespace, workflowNameKey, workflowName, statusKey, status), w.workflowSchedulingLatency.M(elapsed))
}
}

// ActivityExecutionEvent records total number of Successful/Failed/Recoverable workflow executions. It also records latency for these executions.
func (w *workflowMetrics) ActivityExecutionEvent(ctx context.Context, activityName, status string, elapsed float64) {
if !w.IsEnabled() {
Expand Down
27 changes: 27 additions & 0 deletions pkg/diagnostics/workflow_monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ func TestExecution(t *testing.T) {

t.Run("record workflow executions", func(t *testing.T) {
countMetricName := "runtime/workflow/execution/count"
executionLatencyMetricName := "runtime/workflow/execution/latency"
schedulingLatencyMetricName := "runtime/workflow/scheduling/latency"
workflowName := "test-workflow"

t.Run("Failed with retryable error", func(t *testing.T) {
w := initWorkflowMetrics()

Expand Down Expand Up @@ -255,5 +258,29 @@ func TestExecution(t *testing.T) {

allTagsPresent(t, v, viewData[0].Tags)
})

t.Run("workflow execution latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowExecutionLatency(context.Background(), workflowName, StatusSuccess, 20)

viewData, _ := view.RetrieveData(executionLatencyMetricName)
v := view.Find(executionLatencyMetricName)

allTagsPresent(t, v, viewData[0].Tags)
assert.InEpsilon(t, float64(20), viewData[0].Data.(*view.DistributionData).Min, 0)
})

t.Run("workflow scheduling latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowSchedulingLatency(context.Background(), workflowName, StatusSuccess, 10)

viewData, _ := view.RetrieveData(schedulingLatencyMetricName)
v := view.Find(schedulingLatencyMetricName)

allTagsPresent(t, v, viewData[0].Tags)
assert.InEpsilon(t, float64(10), viewData[0].Data.(*view.DistributionData).Min, 0)
})
})
}
31 changes: 29 additions & 2 deletions pkg/runtime/wfengine/backends/actors/workflow_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,24 @@

// The logic/for loop below purges/removes any leftover state from a completed or failed activity
transactionalRequests := make(map[string][]actors.TransactionalOperation)
isWorkflowExecutionStartedEvent := false
scheduledStartTimestamp := time.Time{}

Check warning on line 421 in pkg/runtime/wfengine/backends/actors/workflow_actor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflow_actor.go#L420-L421

Added lines #L420 - L421 were not covered by tests
ASHIQUEMD marked this conversation as resolved.
Show resolved Hide resolved

for _, e := range state.Inbox {
var taskID int32
if ts := e.GetTaskCompleted(); ts != nil {
taskID = ts.GetTaskScheduledId()
} else if tf := e.GetTaskFailed(); tf != nil {
taskID = tf.GetTaskScheduledId()
} else {
// If the event is an execution started event, then we need to record the scheduled start timestamp
if es := e.GetExecutionStarted(); es != nil {
isWorkflowExecutionStartedEvent = true
ASHIQUEMD marked this conversation as resolved.
Show resolved Hide resolved
timestamp := es.GetScheduledStartTimestamp()
if timestamp != nil {
scheduledStartTimestamp = timestamp.AsTime()

Check warning on line 435 in pkg/runtime/wfengine/backends/actors/workflow_actor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflow_actor.go#L431-L435

Added lines #L431 - L435 were not covered by tests
}
}
continue
}
op := actors.TransactionalOperation{
Expand Down Expand Up @@ -482,11 +493,14 @@
}
return newRecoverableError(fmt.Errorf("failed to schedule a workflow execution: %w", err))
}
// Record metrics for workflow execution

wf.recordWorkflowSchedulingLatency(ctx, isWorkflowExecutionStartedEvent, state, scheduledStartTimestamp, workflowName, executionStatus)
ASHIQUEMD marked this conversation as resolved.
Show resolved Hide resolved
wfExecutionElapsedTime := float64(0)

Check warning on line 498 in pkg/runtime/wfengine/backends/actors/workflow_actor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflow_actor.go#L497-L498

Added lines #L497 - L498 were not covered by tests

defer func() {
if executionStatus != "" {
// execution latency for workflow is not supported yet.
diag.DefaultWorkflowMonitoring.WorkflowExecutionEvent(ctx, workflowName, executionStatus)
diag.DefaultWorkflowMonitoring.WorkflowExecutionLatency(ctx, workflowName, executionStatus, wfExecutionElapsedTime)

Check warning on line 503 in pkg/runtime/wfengine/backends/actors/workflow_actor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflow_actor.go#L503

Added line #L503 was not covered by tests
}
}()

Expand Down Expand Up @@ -635,11 +649,24 @@
// Setting executionStatus to failed if workflow has failed/terminated/cancelled
executionStatus = diag.StatusFailed
}
wfExecutionElapsedTime = diag.ElapsedSince(state.workflowStartTime)

Check warning on line 652 in pkg/runtime/wfengine/backends/actors/workflow_actor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflow_actor.go#L652

Added line #L652 was not covered by tests
ASHIQUEMD marked this conversation as resolved.
Show resolved Hide resolved
}
}
return nil
}

func (wf *workflowActor) recordWorkflowSchedulingLatency(ctx context.Context, isWorkflowExecutionStartedEvent bool, state *workflowState, scheduledStartTimestamp time.Time, workflowName string, executionStatus string) {
if isWorkflowExecutionStartedEvent {
state.workflowStartTime = time.Now()

Check warning on line 660 in pkg/runtime/wfengine/backends/actors/workflow_actor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflow_actor.go#L659-L660

Added lines #L659 - L660 were not covered by tests
// If scheduledStartTimestamp is zero, then scheduling latency is zero
ASHIQUEMD marked this conversation as resolved.
Show resolved Hide resolved
if !scheduledStartTimestamp.IsZero() {

Check warning on line 662 in pkg/runtime/wfengine/backends/actors/workflow_actor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflow_actor.go#L662

Added line #L662 was not covered by tests
// get the time diff between state.workflowStartTime and scheduledStartTimestamp
wfSchedulingLatency := float64(state.workflowStartTime.Sub(scheduledStartTimestamp).Milliseconds())
diag.DefaultWorkflowMonitoring.WorkflowSchedulingLatency(ctx, workflowName, executionStatus, wfSchedulingLatency)

Check warning on line 665 in pkg/runtime/wfengine/backends/actors/workflow_actor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflow_actor.go#L664-L665

Added lines #L664 - L665 were not covered by tests
}
}
}

func (wf *workflowActor) loadInternalState(ctx context.Context, actorID string) (*workflowState, error) {
if !wf.cachingDisabled {
// see if the state for this actor is already cached in memory
Expand Down
35 changes: 29 additions & 6 deletions pkg/runtime/wfengine/backends/actors/workflowstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
)

const (
inboxKeyPrefix = "inbox"
historyKeyPrefix = "history"
customStatusKey = "customStatus"
metadataKey = "metadata"
inboxKeyPrefix = "inbox"
historyKeyPrefix = "history"
customStatusKey = "customStatus"
workflowStartTimeKey = "workflowStartTime"
metadataKey = "metadata"
)

type workflowState struct {
Expand All @@ -45,6 +46,7 @@
inboxRemovedCount int
historyAddedCount int
historyRemovedCount int
workflowStartTime time.Time
config actorsBackendConfig
}

Expand Down Expand Up @@ -127,6 +129,14 @@
return nil, err
}

// if s.workflowStartTime is not nil then append req.operations with a new operation to update the workflowStartTime
if !s.workflowStartTime.IsZero() {
ASHIQUEMD marked this conversation as resolved.
Show resolved Hide resolved
req.Operations = append(req.Operations, actors.TransactionalOperation{
Operation: actors.Upsert,
Request: actors.TransactionalUpsert{Key: workflowStartTimeKey, Value: s.workflowStartTime},
})

Check warning on line 137 in pkg/runtime/wfengine/backends/actors/workflowstate.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflowstate.go#L134-L137

Added lines #L134 - L137 were not covered by tests
}

// We update the custom status only when the workflow itself has been updated, and not when
// we're saving changes only to the workflow inbox.
// CONSIDER: Only save custom status if it has changed. However, need a way to track this.
Expand Down Expand Up @@ -252,8 +262,8 @@
bulkReq := &actors.GetBulkStateRequest{
ActorType: config.workflowActorType,
ActorID: actorID,
// Initializing with size for all the inbox, history, and custom status
Keys: make([]string, metadata.InboxLength+metadata.HistoryLength+1),
// Initializing with size for all the inbox, history, custom status and workflow execution start time
Keys: make([]string, metadata.InboxLength+metadata.HistoryLength+2),
}

var n int
Expand All @@ -268,6 +278,8 @@
n++
}

bulkReq.Keys[n] = workflowStartTimeKey

// Perform the request
bulkRes, err := actorRuntime.GetBulkState(ctx, bulkReq)
if err != nil {
Expand Down Expand Up @@ -305,6 +317,13 @@
}
}

if len(bulkRes[workflowStartTimeKey]) > 0 {
err = json.Unmarshal(bulkRes[workflowStartTimeKey], &state.workflowStartTime)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON from workflow start time key entry: %w", err)

Check warning on line 323 in pkg/runtime/wfengine/backends/actors/workflowstate.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflowstate.go#L321-L323

Added lines #L321 - L323 were not covered by tests
}
}

wfLogger.Infof("%s: loaded %d state records in %v", actorID, loadedRecords, time.Since(loadStartTime))
return state, nil
}
Expand Down Expand Up @@ -336,6 +355,10 @@
Operation: actors.Delete,
Request: actors.TransactionalDelete{Key: metadataKey},
},
actors.TransactionalOperation{
Operation: actors.Delete,
Request: actors.TransactionalDelete{Key: workflowStartTimeKey},
},

Check warning on line 361 in pkg/runtime/wfengine/backends/actors/workflowstate.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backends/actors/workflowstate.go#L358-L361

Added lines #L358 - L361 were not covered by tests
)

return req, nil
Expand Down