Skip to content

Commit

Permalink
Workflow execution and scheduling latency (#7370)
Browse files Browse the repository at this point in the history
* Defining Workflow metrics

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Added logic to record metrics for:

- Operations
- Reminders
- Execution

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Added logic to record workflowRemindersTotal metric

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Added logic to Record Workflow/operations metrics for:
- Create Workflow
- Get Workflow
- Purge Workflow
- Add Events

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Add logic to record Failed workflow/operations metrics

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Record metrics for Successful and Failed Wokflow/Activity
executions.

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Merged failed/success metrics into one

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Record Latency metrics for Operations and Executions

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Questions regarding latency metrics

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Reducing functions to record metrics and code cleanup

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Added Operation test

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Added namespace and component key

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Add tests for Reminders metrics

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Added Execution metrics tests

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* fix: fixed test conditions

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* cleanup

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* fix

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* fixing minor changes

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* minor fix

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Fixing minor Nits

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Added metrics to docs

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Added comments for metrics description

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* Removed reminders count metrics

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>

* empty commit

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* fixes

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Rename workflow_metrics.go -> workflow_monitoring.go

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Updating dapr-metrics docs

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* typo

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Adding comment

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Avoid collecting metrics in case workflow is already completed before execution

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* linter fixes

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Update pkg/diagnostics/workflow_monitoring.go

Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Adding activity execution metrics as separate metrics

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* renaming componentName

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* adding workflow latency execution

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Adding IT for metrics wf

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* linter fixes

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Updating docs

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Removing componentKey from wf metrics

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* time calculation

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Add schedule latency and execution latency of workflow

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Add schedule latency and execution latency of workflow

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Add schedule latency and execution latency of workflow

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* update latency code

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* update latency code

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* update latency code

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Fix unit tests

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* add code comment

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Removing status from scheduling latency

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* save start workflowtime to metdata

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* save start workflowtime to metdata

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* save start workflowtime to metdata

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* refactor wfstarttime capture by reading it from history

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Update pkg/diagnostics/workflow_monitoring.go

Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Adding executionStartedTimestamp property to ExecutionStartedEvent message

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Adding executionStartedTimestamp property to ExecutionStartedEvent message

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Adding executionStartedTimestamp property to ExecutionStartedEvent message

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Adding executionStartedTimestamp property to ExecutionStartedEvent message

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Calculate dapr workflow latency

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Calculate dapr workflow latency

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

* Add integration test for workflow execution and scheduling latency

Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>

---------

Signed-off-by: prateek041 <prateeksingh9741@gmail.com>
Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>
Signed-off-by: MD Ashique <noorani.ashique5@gmail.com>
Co-authored-by: prateek041 <prateeksingh9741@gmail.com>
Co-authored-by: Shivam Kumar <shivamkm07@gmail.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
6 people committed Jan 30, 2024
1 parent 4d44561 commit f818e70
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ config/crd/bases/*.yaml
**/.project
**/.factorypath
google
dapr.sln

test_report*
coverage.txt
Expand Down
42 changes: 38 additions & 4 deletions pkg/diagnostics/workflow_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ type workflowMetrics struct {
activityExecutionCount *stats.Int64Measure
// activityExecutionLatency records time taken to run an activity to completion.
activityExecutionLatency *stats.Float64Measure
appID string
enabled bool
namespace string
// workflowExecutionLatency records time taken to run a workflow to completion.
workflowExecutionLatency *stats.Float64Measure
// workflowSchedulingLatency records time taken between workflow execution request and actual workflow execution
workflowSchedulingLatency *stats.Float64Measure
appID string
enabled bool
namespace string
}

func newWorkflowMetrics() *workflowMetrics {
Expand All @@ -79,6 +83,14 @@ func newWorkflowMetrics() *workflowMetrics {
"runtime/workflow/activity/execution/latency",
"The total time taken to run an activity to completion.",
stats.UnitMilliseconds),
workflowExecutionLatency: stats.Float64(
"runtime/workflow/execution/latency",
"The total time taken to run workflow to completion.",
stats.UnitMilliseconds),
workflowSchedulingLatency: stats.Float64(
"runtime/workflow/scheduling/latency",
"Interval between workflow execution request and workflow execution.",
stats.UnitMilliseconds),
}
}

Expand All @@ -97,7 +109,9 @@ func (w *workflowMetrics) Init(appID, namespace string) error {
diagUtils.NewMeasureView(w.workflowOperationLatency, []tag.Key{appIDKey, namespaceKey, operationKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(w.workflowExecutionCount, []tag.Key{appIDKey, namespaceKey, workflowNameKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.activityExecutionCount, []tag.Key{appIDKey, namespaceKey, activityNameKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.activityExecutionLatency, []tag.Key{appIDKey, namespaceKey, activityNameKey, statusKey}, defaultLatencyDistribution))
diagUtils.NewMeasureView(w.activityExecutionLatency, []tag.Key{appIDKey, namespaceKey, activityNameKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(w.workflowExecutionLatency, []tag.Key{appIDKey, namespaceKey, workflowNameKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(w.workflowSchedulingLatency, []tag.Key{appIDKey, namespaceKey, workflowNameKey}, defaultLatencyDistribution))
}

// WorkflowOperationEvent records total number of Successful/Failed workflow Operations requests. It also records latency for those requests.
Expand All @@ -123,6 +137,26 @@ func (w *workflowMetrics) WorkflowExecutionEvent(ctx context.Context, workflowNa
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowExecutionCount.Name(), appIDKey, w.appID, namespaceKey, w.namespace, workflowNameKey, workflowName, statusKey, status), w.workflowExecutionCount.M(1))
}

func (w *workflowMetrics) WorkflowExecutionLatency(ctx context.Context, workflowName, status string, elapsed float64) {
if !w.IsEnabled() {
return
}

if elapsed > 0 {
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowExecutionLatency.Name(), appIDKey, w.appID, namespaceKey, w.namespace, workflowNameKey, workflowName, statusKey, status), w.workflowExecutionLatency.M(elapsed))
}
}

func (w *workflowMetrics) WorkflowSchedulingLatency(ctx context.Context, workflowName string, elapsed float64) {
if !w.IsEnabled() {
return
}

if elapsed > 0 {
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowSchedulingLatency.Name(), appIDKey, w.appID, namespaceKey, w.namespace, workflowNameKey, workflowName), w.workflowSchedulingLatency.M(elapsed))
}
}

// ActivityExecutionEvent records total number of Successful/Failed/Recoverable workflow executions. It also records latency for these executions.
func (w *workflowMetrics) ActivityExecutionEvent(ctx context.Context, activityName, status string, elapsed float64) {
if !w.IsEnabled() {
Expand Down
27 changes: 27 additions & 0 deletions pkg/diagnostics/workflow_monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ func TestExecution(t *testing.T) {

t.Run("record workflow executions", func(t *testing.T) {
countMetricName := "runtime/workflow/execution/count"
executionLatencyMetricName := "runtime/workflow/execution/latency"
schedulingLatencyMetricName := "runtime/workflow/scheduling/latency"
workflowName := "test-workflow"

t.Run("Failed with retryable error", func(t *testing.T) {
w := initWorkflowMetrics()

Expand Down Expand Up @@ -255,5 +258,29 @@ func TestExecution(t *testing.T) {

allTagsPresent(t, v, viewData[0].Tags)
})

t.Run("workflow execution latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowExecutionLatency(context.Background(), workflowName, StatusSuccess, 20)

viewData, _ := view.RetrieveData(executionLatencyMetricName)
v := view.Find(executionLatencyMetricName)

allTagsPresent(t, v, viewData[0].Tags)
assert.InEpsilon(t, float64(20), viewData[0].Data.(*view.DistributionData).Min, 0)
})

t.Run("workflow scheduling latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowSchedulingLatency(context.Background(), workflowName, 10)

viewData, _ := view.RetrieveData(schedulingLatencyMetricName)
v := view.Find(schedulingLatencyMetricName)

allTagsPresent(t, v, viewData[0].Tags)
assert.InEpsilon(t, float64(10), viewData[0].Data.(*view.DistributionData).Min, 0)
})
})
}
46 changes: 44 additions & 2 deletions pkg/runtime/wfengine/backends/actors/workflow_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,18 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern

// The logic/for loop below purges/removes any leftover state from a completed or failed activity
transactionalRequests := make(map[string][]actors.TransactionalOperation)
var esHistoryEvent *backend.HistoryEvent

for _, e := range state.Inbox {
var taskID int32
if ts := e.GetTaskCompleted(); ts != nil {
taskID = ts.GetTaskScheduledId()
} else if tf := e.GetTaskFailed(); tf != nil {
taskID = tf.GetTaskScheduledId()
} else {
if es := e.GetExecutionStarted(); es != nil {
esHistoryEvent = e
}
continue
}
op := actors.TransactionalOperation{
Expand Down Expand Up @@ -498,11 +503,14 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern
}
return newRecoverableError(fmt.Errorf("failed to schedule a workflow execution: %w", err))
}
// Record metrics for workflow execution

wf.recordWorkflowSchedulingLatency(ctx, esHistoryEvent, workflowName)
wfExecutionElapsedTime := float64(0)

defer func() {
if executionStatus != "" {
// execution latency for workflow is not supported yet.
diag.DefaultWorkflowMonitoring.WorkflowExecutionEvent(ctx, workflowName, executionStatus)
diag.DefaultWorkflowMonitoring.WorkflowExecutionLatency(ctx, workflowName, executionStatus, wfExecutionElapsedTime)
}
}()

Expand Down Expand Up @@ -658,6 +666,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern
// Setting executionStatus to failed if workflow has failed/terminated/cancelled
executionStatus = diag.StatusFailed
}
wfExecutionElapsedTime = wf.calculateWorkflowExecutionLatency(state)
}
}
if runtimeState.IsCompleted() {
Expand All @@ -666,6 +675,39 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern
return nil
}

func (*workflowActor) calculateWorkflowExecutionLatency(state *workflowState) (wfExecutionElapsedTime float64) {
for _, e := range state.History {
if os := e.GetOrchestratorStarted(); os != nil {
return diag.ElapsedSince(e.GetTimestamp().AsTime())
}
}
return 0
}

func (*workflowActor) recordWorkflowSchedulingLatency(ctx context.Context, esHistoryEvent *backend.HistoryEvent, workflowName string) {
if esHistoryEvent == nil {
return
}

// If the event is an execution started event, then we need to record the scheduled start timestamp
if es := esHistoryEvent.GetExecutionStarted(); es != nil {
currentTimestamp := time.Now()
var scheduledStartTimestamp time.Time
timestamp := es.GetScheduledStartTimestamp()

if timestamp != nil {
scheduledStartTimestamp = timestamp.AsTime()
} else {
// if scheduledStartTimestamp is nil, then use the event timestamp to consider scheduling latency
// This case will happen when the workflow is created and started immediately
scheduledStartTimestamp = esHistoryEvent.GetTimestamp().AsTime()
}

wfSchedulingLatency := float64(currentTimestamp.Sub(scheduledStartTimestamp).Milliseconds())
diag.DefaultWorkflowMonitoring.WorkflowSchedulingLatency(ctx, workflowName, wfSchedulingLatency)
}
}

func (wf *workflowActor) loadInternalState(ctx context.Context) (*workflowState, error) {
// See if the state for this actor is already cached in memory
if !wf.cachingDisabled && wf.state != nil {
Expand Down
7 changes: 2 additions & 5 deletions tests/integration/suite/daprd/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"
"time"

prometheus "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -223,6 +222,8 @@ func (m *metrics) Run(t *testing.T, ctx context.Context) {
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_operation_count|app_id:myapp|namespace:|operation:create_workflow|status:success"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_execution_count|app_id:myapp|namespace:|status:success|workflow_name:workflow"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_activity_execution_count|activity_name:activity_success|app_id:myapp|namespace:|status:success"]))
assert.GreaterOrEqual(t, 1, int(metrics["dapr_runtime_workflow_execution_latency|app_id:myapp|namespace:|status:success|workflow_name:workflow"]))
assert.GreaterOrEqual(t, 1, int(metrics["dapr_runtime_workflow_scheduling_latency|app_id:myapp|namespace:|workflow_name:workflow"]))
})
t.Run("failed workflow execution", func(t *testing.T) {
id, err := taskhubClient.ScheduleNewOrchestration(ctx, "workflow", api.WithInput("activity_failure"))
Expand Down Expand Up @@ -266,10 +267,6 @@ func (m *metrics) getMetrics(t *testing.T, ctx context.Context) map[string]float

metrics := make(map[string]float64)
for _, mf := range metricFamilies {
if mf.GetType() != prometheus.MetricType_COUNTER {
continue
}

for _, m := range mf.GetMetric() {
key := mf.GetName()
for _, l := range m.GetLabel() {
Expand Down

0 comments on commit f818e70

Please sign in to comment.