Skip to content

Commit

Permalink
Enhance workflow logs (dapr#7222)
Browse files Browse the repository at this point in the history
* Adding more logs for workflow executions

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* fix typo

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Adding quotes to be consistent

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* capitalizing log start

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

---------

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Loong Dai <long.dai@intel.com>
  • Loading branch information
3 people authored and Deepanshu Agarwal committed Dec 11, 2023
1 parent da0dd2b commit 4d9edb8
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 32 deletions.
26 changes: 15 additions & 11 deletions pkg/runtime/wfengine/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (a *activityActor) SetActorRuntime(actorsRuntime actors.Actors) {
// returns immediately after creating the reminder, enabling the workflow to continue processing other events
// in parallel.
func (a *activityActor) InvokeMethod(ctx context.Context, actorID string, methodName string, data []byte) (any, error) {
wfLogger.Debugf("Activity actor '%s': invoking method '%s'", actorID, methodName)
var ar ActivityRequest
if err := actors.DecodeInternalActorData(bytes.NewReader(data), &ar); err != nil {
return nil, fmt.Errorf("failed to decode activity request: %w", err)
Expand Down Expand Up @@ -108,7 +109,7 @@ func (a *activityActor) InvokeMethod(ctx context.Context, actorID string, method

// InvokeReminder implements actors.InternalActor and executes the activity logic.
func (a *activityActor) InvokeReminder(ctx context.Context, actorID string, reminderName string, data []byte, dueTime string, period string) error {
wfLogger.Debugf("Invoking reminder '%s' on activity actor '%s'", reminderName, actorID)
wfLogger.Debugf("Activity actor '%s': invoking reminder '%s'", actorID, reminderName)

state, _ := a.loadActivityState(ctx, actorID)
// TODO: On error, reply with a failure - this requires support from durabletask-go to produce TaskFailure results
Expand All @@ -120,19 +121,19 @@ func (a *activityActor) InvokeReminder(ctx context.Context, actorID string, remi
var recoverableErr *recoverableError
switch {
case errors.Is(err, context.DeadlineExceeded):
wfLogger.Warnf("%s: execution of '%s' timed-out and will be retried later: %v", actorID, reminderName, err)
wfLogger.Warnf("Activity actor '%s': execution of '%s' timed-out and will be retried later: %v", actorID, reminderName, err)
// Returning nil signals that we want the execution to be retried in the next period interval
return nil
case errors.Is(err, context.Canceled):
wfLogger.Warnf("%s: received cancellation signal while waiting for activity execution '%s'", actorID, reminderName)
wfLogger.Warnf("Activity actor '%s': received cancellation signal while waiting for activity execution '%s'", actorID, reminderName)
// Returning nil signals that we want the execution to be retried in the next period interval
return nil
case errors.As(err, &recoverableErr):
wfLogger.Warnf("%s: execution failed with a recoverable error and will be retried later: %v", actorID, err)
wfLogger.Warnf("Activity actor '%s': execution failed with a recoverable error and will be retried later: %v", actorID, err)
// Returning nil signals that we want the execution to be retried in the next period interval
return nil
default:
wfLogger.Errorf("%s: execution failed with a non-recoverable error: %v", actorID, err)
wfLogger.Errorf("Activity actor '%s': execution failed with a non-recoverable error: %v", actorID, err)
// TODO: Reply with a failure - this requires support from durabletask-go to produce TaskFailure results
}
}
Expand All @@ -149,7 +150,7 @@ func (a *activityActor) executeActivity(ctx context.Context, actorID string, nam

endIndex := strings.Index(actorID, "::")
if endIndex < 0 {
return fmt.Errorf("invalid activity actor ID: %s", actorID)
return fmt.Errorf("invalid activity actor ID: '%s'", actorID)
}
workflowID := actorID[0:endIndex]

Expand All @@ -167,6 +168,7 @@ func (a *activityActor) executeActivity(ctx context.Context, actorID string, nam
// introduce some kind of heartbeat protocol to help identify such cases.
callback := make(chan bool)
wi.Properties[CallbackChannelProperty] = callback
wfLogger.Debugf("Activity actor '%s': scheduling activity '%s' for workflow with instanceId '%s'", actorID, name, wi.InstanceID)
if err = a.scheduler(ctx, wi); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return newRecoverableError(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err))
Expand All @@ -185,9 +187,9 @@ loop:
return ctx.Err() // will be retried
case <-t.C:
if deadline, ok := ctx.Deadline(); ok {
wfLogger.Warnf("%s: '%s' is still running - will keep waiting until %v", actorID, name, deadline)
wfLogger.Warnf("Activity actor '%s': '%s' is still running - will keep waiting until '%v'", actorID, name, deadline)
} else {
wfLogger.Warnf("%s: '%s' is still running - will keep waiting indefinitely", actorID, name)
wfLogger.Warnf("Activity actor '%s': '%s' is still running - will keep waiting indefinitely", actorID, name)
}
case completed := <-callback:
if !t.Stop() {
Expand All @@ -200,6 +202,7 @@ loop:
}
}
}
wfLogger.Debugf("Activity actor '%s': activity '%s' completed for workflow with instanceId '%s' ", actorID, name, wi.InstanceID)

// publish the result back to the workflow actor as a new event to be processed
resultData, err := backend.MarshalHistoryEvent(wi.Result)
Expand Down Expand Up @@ -228,7 +231,7 @@ func (*activityActor) InvokeTimer(ctx context.Context, actorID string, timerName

// DeactivateActor implements actors.InternalActor
func (a *activityActor) DeactivateActor(ctx context.Context, actorID string) error {
wfLogger.Debugf("Deactivating activity actor '%s'", actorID)
wfLogger.Debugf("Activity actor '%s': deactivating", actorID)
a.statesCache.Delete(actorID)
return nil
}
Expand All @@ -241,7 +244,7 @@ func (a *activityActor) loadActivityState(ctx context.Context, actorID string) (
}

// Loading from the state store is only expected in process failure recovery scenarios.
wfLogger.Debugf("%s: loading activity state", actorID)
wfLogger.Debugf("Activity actor '%s': loading activity state", actorID)

req := actors.GetStateRequest{
ActorType: a.config.activityActorType,
Expand Down Expand Up @@ -288,6 +291,7 @@ func (a *activityActor) saveActivityState(ctx context.Context, actorID string, s
}

func (a *activityActor) purgeActivityState(ctx context.Context, actorID string) error {
wfLogger.Debugf("Activity actor '%s': purging activity state", actorID)
req := actors.TransactionalRequest{
ActorType: a.config.activityActorType,
ActorID: actorID,
Expand All @@ -307,7 +311,7 @@ func (a *activityActor) purgeActivityState(ctx context.Context, actorID string)

func (a *activityActor) createReliableReminder(ctx context.Context, actorID string, data any) error {
const reminderName = "run-activity"
wfLogger.Debugf("%s: creating reminder '%s' for immediate execution", actorID, reminderName)
wfLogger.Debugf("Activity actor '%s': creating reminder '%s' for immediate execution", actorID, reminderName)
dataEnc, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to encode data as JSON: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/runtime/wfengine/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *workflowEngineComponent) Start(ctx context.Context, req *workflows.Star
return nil, fmt.Errorf("unable to start workflow: %w", err)
}

c.logger.Debugf("Created new workflow instance with ID '%s'", workflowID)
c.logger.Debugf("Created new workflow '%s' instance with ID '%s'", req.WorkflowName, workflowID)
res := &workflows.StartResponse{
InstanceID: string(workflowID),
}
Expand Down Expand Up @@ -143,6 +143,7 @@ func (c *workflowEngineComponent) Purge(ctx context.Context, req *workflows.Purg
}
return fmt.Errorf("failed to Purge workflow %s: %w", req.InstanceID, err)
}
c.logger.Debugf("Purging workflow instance '%s'", req.InstanceID)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/wfengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (

var (
wfLogger = logger.NewLogger("dapr.runtime.wfengine")
wfBackendLogger = logger.NewLogger("wfengine.backend")
wfBackendLogger = logger.NewLogger("wfengine.durabletask.backend")
errExecutionAborted = errors.New("execution aborted")
)

Expand Down
50 changes: 31 additions & 19 deletions pkg/runtime/wfengine/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (wf *workflowActor) SetActorRuntime(actorRuntime actors.Actors) {

// InvokeMethod implements actors.InternalActor
func (wf *workflowActor) InvokeMethod(ctx context.Context, actorID string, methodName string, request []byte) (result any, err error) {
wfLogger.Debugf("invoking method '%s' on workflow actor '%s'", methodName, actorID)
wfLogger.Debugf("Workflow actor '%s': invoking method '%s'", actorID, methodName)

switch methodName {
case CreateWorkflowInstanceMethod:
Expand All @@ -112,7 +112,7 @@ func (wf *workflowActor) InvokeMethod(ctx context.Context, actorID string, metho

// InvokeReminder implements actors.InternalActor
func (wf *workflowActor) InvokeReminder(ctx context.Context, actorID string, reminderName string, data []byte, dueTime string, period string) error {
wfLogger.Debugf("invoking reminder '%s' on workflow actor '%s'", reminderName, actorID)
wfLogger.Debugf("Workflow actor '%s': invoking reminder '%s'", actorID, reminderName)

// Workflow executions should never take longer than a few seconds at the most
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, wf.defaultTimeout)
Expand All @@ -121,22 +121,22 @@ func (wf *workflowActor) InvokeReminder(ctx context.Context, actorID string, rem
if err != nil {
var re recoverableError
if errors.Is(err, context.DeadlineExceeded) {
wfLogger.Warnf("%s: execution timed-out and will be retried later: %v", actorID, err)
wfLogger.Warnf("Workflow actor '%s': execution timed-out and will be retried later: '%v'", actorID, err)

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else if errors.Is(err, context.Canceled) {
wfLogger.Warnf("%s: execution was canceled (process shutdown?) and will be retried later: %v", actorID, err)
wfLogger.Warnf("Workflow actor '%s': execution was canceled (process shutdown?) and will be retried later: '%v'", actorID, err)

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else if errors.As(err, &re) {
wfLogger.Warnf("%s: execution failed with a recoverable error and will be retried later: %v", actorID, re)
wfLogger.Warnf("Workflow actor '%s': execution failed with a recoverable error and will be retried later: '%v'", actorID, re)

// Returning nil signals that we want the execution to be retried in the next period interval
return nil
} else {
wfLogger.Errorf("%s: execution failed with a non-recoverable error: %v", actorID, err)
wfLogger.Errorf("Workflow actor '%s': execution failed with a non-recoverable error: %v", actorID, err)
}
}

Expand All @@ -151,7 +151,7 @@ func (wf *workflowActor) InvokeTimer(ctx context.Context, actorID string, timerN

// DeactivateActor implements actors.InternalActor
func (wf *workflowActor) DeactivateActor(ctx context.Context, actorID string) error {
wfLogger.Debugf("deactivating workflow actor '%s'", actorID)
wfLogger.Debugf("Workflow actor '%s': deactivating", actorID)
wf.states.Delete(actorID)
return nil
}
Expand All @@ -174,8 +174,14 @@ func (wf *workflowActor) createWorkflowInstance(ctx context.Context, actorID str
if err != nil {
return err
}
if startEvent.GetExecutionStarted() == nil {
if es := startEvent.GetExecutionStarted(); es == nil {
return errors.New("invalid execution start event")
} else {
if es.ParentInstance == nil {
wfLogger.Debugf("Workflow actor '%s': creating workflow '%s' with instanceId '%s'", actorID, es.Name, es.OrchestrationInstance.InstanceId)
} else {
wfLogger.Debugf("Workflow actor '%s': creating child workflow '%s' with instanceId '%s' parentWorkflow '%s' parentWorkflowId '%s'", es.Name, es.OrchestrationInstance.InstanceId, es.ParentInstance.Name, es.ParentInstance.OrchestrationInstance.InstanceId)
}
}

// We block (re)creation of existing workflows unless they are in a completed state
Expand All @@ -188,7 +194,7 @@ func (wf *workflowActor) createWorkflowInstance(ctx context.Context, actorID str
if wf.activityResultAwaited.Load() {
return fmt.Errorf("a terminated workflow with ID '%s' is already awaiting an activity result", actorID)
}
wfLogger.Infof("%s: workflow was previously completed and is being recreated", actorID)
wfLogger.Infof("Workflow actor '%s': workflow was previously completed and is being recreated", actorID)
state.Reset()
}

Expand Down Expand Up @@ -285,6 +291,7 @@ func (wf *workflowActor) addWorkflowEvent(ctx context.Context, actorID string, h
if err != nil {
return err
}
wfLogger.Debugf("Workflow actor '%s': adding event '%v' to the workflow inbox", actorID, e)
state.AddToInbox(e)

if _, err := wf.createReliableReminder(ctx, actorID, "new-event", nil, 0); err != nil {
Expand All @@ -310,7 +317,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
return err
}
if timerData.Generation < state.Generation {
wfLogger.Infof("%s: ignoring durable timer from previous generation '%v'", actorID, timerData.Generation)
wfLogger.Infof("Workflow actor '%s': ignoring durable timer from previous generation '%v'", actorID, timerData.Generation)
return nil
} else {
e, eventErr := backend.UnmarshalHistoryEvent(timerData.Bytes)
Expand All @@ -325,7 +332,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
if len(state.Inbox) == 0 {
// This can happen after multiple events are processed in batches; there may still be reminders around
// for some of those already processed events.
wfLogger.Debugf("%s: ignoring run request for reminder '%s' because the workflow inbox is empty", reminderName, actorID)
wfLogger.Debugf("Workflow actor '%s': ignoring run request for reminder '%s' because the workflow inbox is empty", actorID, reminderName)
return nil
}

Expand Down Expand Up @@ -378,7 +385,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
// will trigger this callback channel.
callback := make(chan bool)
wi.Properties[CallbackChannelProperty] = callback

wfLogger.Debugf("Workflow actor '%s': scheduling workflow execution with instanceId '%s'", actorID, wi.InstanceID)
// Schedule the workflow execution by signaling the backend
err = wf.scheduler(ctx, wi)
if err != nil {
Expand All @@ -396,10 +403,12 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
return newRecoverableError(errExecutionAborted)
}
}
wfLogger.Debugf("Workflow actor '%s': workflow execution returned with status '%s' instanceId '%s'", actorID, runtimeState.RuntimeStatus().String(), wi.InstanceID)

// Increment the generation counter if the workflow used continue-as-new. Subsequent actions below
// will use this updated generation value for their duplication execution handling.
if runtimeState.ContinuedAsNew() {
wfLogger.Debugf("Workflow actor '%s': workflow with instanceId '%s' continued as new", actorID, wi.InstanceID)
state.Generation += 1
}

Expand All @@ -420,8 +429,9 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
}
reminderPrefix := fmt.Sprintf("timer-%d", tf.TimerId)
data := NewDurableTimer(timerBytes, state.Generation)
wfLogger.Debugf("Workflow actor '%s': creating reminder '%s' for the durable timer", actorID, reminderPrefix)
if _, err := wf.createReliableReminder(ctx, actorID, reminderPrefix, data, delay); err != nil {
return newRecoverableError(fmt.Errorf("actor %s failed to create reminder for timer: %w", actorID, err))
return newRecoverableError(fmt.Errorf("actor '%s' failed to create reminder for timer: %w", actorID, err))
}
}
}
Expand All @@ -434,7 +444,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
} else if msg.HistoryEvent.GetSubOrchestrationInstanceCompleted() != nil || msg.HistoryEvent.GetSubOrchestrationInstanceFailed() != nil {
reqsByName[AddWorkflowEventMethod] = append(reqsByName[AddWorkflowEventMethod], msg)
} else {
wfLogger.Warn("don't know how to process outbound message %v", msg)
wfLogger.Warnf("Workflow actor '%s': don't know how to process outbound message '%v'", actorID, msg)
}
}

Expand All @@ -443,7 +453,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
for _, e := range runtimeState.PendingTasks() {
ts := e.GetTaskScheduled()
if ts == nil {
wfLogger.Warn("Unable to process task %v", e)
wfLogger.Warnf("Workflow actor '%s': unable to process task '%v'", actorID, e)
continue
}

Expand All @@ -459,6 +469,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
}
targetActorID := getActivityActorID(actorID, e.EventId, state.Generation)

wfLogger.Debugf("Workflow actor '%s': invoking execute method on activity actor '%s'", actorID, targetActorID)
req := invokev1.
NewInvokeMethodRequest("Execute").
WithActor(wf.config.activityActorType, targetActorID).
Expand All @@ -471,7 +482,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
resp, err := wf.actors.Call(ctx, req)
if err != nil {
if errors.Is(err, ErrDuplicateInvocation) {
wfLogger.Warnf("%s: activity invocation %s::%d was flagged as a duplicate and will be skipped", actorID, ts.Name, e.EventId)
wfLogger.Warnf("Workflow actor '%s': activity invocation '%s::%d' was flagged as a duplicate and will be skipped", actorID, ts.Name, e.EventId)
continue
}
return newRecoverableError(fmt.Errorf("failed to invoke activity actor '%s' to execute '%s': %w", targetActorID, ts.Name, err))
Expand All @@ -487,6 +498,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
return err
}

wfLogger.Debugf("Workflow actor '%s': invoking method '%s' on workflow actor '%s'", actorID, method, msg.TargetInstanceID)
req := invokev1.
NewInvokeMethodRequest(method).
WithActor(wf.config.workflowActorType, msg.TargetInstanceID).
Expand Down Expand Up @@ -519,7 +531,7 @@ func (wf *workflowActor) loadInternalState(ctx context.Context, actorID string)
}

// state is not cached, so try to load it from the state store
wfLogger.Debugf("%s: loading workflow state", actorID)
wfLogger.Debugf("Workflow actor '%s': loading workflow state", actorID)
state, err := LoadWorkflowState(ctx, wf.actors, actorID, wf.config)
if err != nil {
return nil, err
Expand All @@ -542,7 +554,7 @@ func (wf *workflowActor) saveInternalState(ctx context.Context, actorID string,
return err
}

wfLogger.Debugf("%s: saving %d keys to actor state store", actorID, len(req.Operations))
wfLogger.Debugf("Workflow actor '%s': saving %d keys to actor state store", actorID, len(req.Operations))
if err = wf.actors.TransactionalStateOperation(ctx, req); err != nil {
return err
}
Expand All @@ -560,7 +572,7 @@ func (wf *workflowActor) saveInternalState(ctx context.Context, actorID string,
func (wf *workflowActor) createReliableReminder(ctx context.Context, actorID string, namePrefix string, data any, delay time.Duration) (string, error) {
// Reminders need to have unique names or else they may not fire in certain race conditions.
reminderName := fmt.Sprintf("%s-%s", namePrefix, uuid.NewString()[:8])
wfLogger.Debugf("%s: creating '%s' reminder with DueTime = %s", actorID, reminderName, delay)
wfLogger.Debugf("Workflow actor '%s': creating '%s' reminder with DueTime = '%s'", actorID, reminderName, delay)

dataEnc, err := json.Marshal(data)
if err != nil {
Expand Down

0 comments on commit 4d9edb8

Please sign in to comment.